mirror of
https://github.com/zitadel/zitadel.git
synced 2025-03-01 00:27:24 +00:00
* push with timeout * test: config for eventstore (cherry picked from commit b9156da76d0f03075589b50eafbf9f48160a0301) Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
parent
3e52beaf89
commit
d21bb902f1
@ -320,6 +320,9 @@ Actions:
|
|||||||
- localhost
|
- localhost
|
||||||
- "127.0.0.1"
|
- "127.0.0.1"
|
||||||
|
|
||||||
|
Eventstore:
|
||||||
|
PushTimeout: 15s
|
||||||
|
|
||||||
DefaultInstance:
|
DefaultInstance:
|
||||||
InstanceName:
|
InstanceName:
|
||||||
DefaultLanguage: en
|
DefaultLanguage: en
|
||||||
|
@ -62,7 +62,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
dbClient, err := database.Connect(config.Database, false)
|
dbClient, err := database.Connect(config.Database, false)
|
||||||
logging.OnError(err).Fatal("unable to connect to database")
|
logging.OnError(err).Fatal("unable to connect to database")
|
||||||
|
|
||||||
eventstoreClient, err := eventstore.Start(dbClient)
|
eventstoreClient, err := eventstore.Start(&eventstore.Config{Client: dbClient})
|
||||||
logging.OnError(err).Fatal("unable to start eventstore")
|
logging.OnError(err).Fatal("unable to start eventstore")
|
||||||
migration.RegisterMappers(eventstoreClient)
|
migration.RegisterMappers(eventstoreClient)
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/zitadel/zitadel/internal/config/systemdefaults"
|
"github.com/zitadel/zitadel/internal/config/systemdefaults"
|
||||||
"github.com/zitadel/zitadel/internal/crypto"
|
"github.com/zitadel/zitadel/internal/crypto"
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/id"
|
"github.com/zitadel/zitadel/internal/id"
|
||||||
"github.com/zitadel/zitadel/internal/query/projection"
|
"github.com/zitadel/zitadel/internal/query/projection"
|
||||||
static_config "github.com/zitadel/zitadel/internal/static/config"
|
static_config "github.com/zitadel/zitadel/internal/static/config"
|
||||||
@ -60,6 +61,7 @@ type Config struct {
|
|||||||
CustomerPortal string
|
CustomerPortal string
|
||||||
Machine *id.Config
|
Machine *id.Config
|
||||||
Actions *actions.Config
|
Actions *actions.Config
|
||||||
|
Eventstore *eventstore.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustNewConfig(v *viper.Viper) *Config {
|
func MustNewConfig(v *viper.Viper) *Config {
|
||||||
|
@ -99,7 +99,8 @@ func startZitadel(config *Config, masterKey string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
eventstoreClient, err := eventstore.Start(dbClient)
|
config.Eventstore.Client = dbClient
|
||||||
|
eventstoreClient, err := eventstore.Start(config.Eventstore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot start eventstore for queries: %w", err)
|
return fmt.Errorf("cannot start eventstore for queries: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore {
|
|||||||
for _, e := range expects {
|
for _, e := range expects {
|
||||||
e(m)
|
e(m)
|
||||||
}
|
}
|
||||||
es := eventstore.NewEventstore(m)
|
es := eventstore.NewEventstore(eventstore.TestConfig(m))
|
||||||
iam_repo.RegisterEventMappers(es)
|
iam_repo.RegisterEventMappers(es)
|
||||||
org.RegisterEventMappers(es)
|
org.RegisterEventMappers(es)
|
||||||
usr_repo.RegisterEventMappers(es)
|
usr_repo.RegisterEventMappers(es)
|
||||||
|
@ -2,10 +2,24 @@ package eventstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/eventstore/repository"
|
||||||
z_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
|
z_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Start(sqlClient *sql.DB) (*Eventstore, error) {
|
type Config struct {
|
||||||
return NewEventstore(z_sql.NewCRDB(sqlClient)), nil
|
PushTimeout time.Duration
|
||||||
|
Client *sql.DB
|
||||||
|
|
||||||
|
repo repository.Repository
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConfig(repo repository.Repository) *Config {
|
||||||
|
return &Config{repo: repo}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Start(config *Config) (*Eventstore, error) {
|
||||||
|
config.repo = z_sql.NewCRDB(config.Client)
|
||||||
|
return NewEventstore(config), nil
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/errors"
|
"github.com/zitadel/zitadel/internal/errors"
|
||||||
@ -18,17 +19,19 @@ type Eventstore struct {
|
|||||||
repo repository.Repository
|
repo repository.Repository
|
||||||
interceptorMutex sync.Mutex
|
interceptorMutex sync.Mutex
|
||||||
eventInterceptors map[EventType]eventTypeInterceptors
|
eventInterceptors map[EventType]eventTypeInterceptors
|
||||||
|
PushTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventTypeInterceptors struct {
|
type eventTypeInterceptors struct {
|
||||||
eventMapper func(*repository.Event) (Event, error)
|
eventMapper func(*repository.Event) (Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEventstore(repo repository.Repository) *Eventstore {
|
func NewEventstore(config *Config) *Eventstore {
|
||||||
return &Eventstore{
|
return &Eventstore{
|
||||||
repo: repo,
|
repo: config.repo,
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
interceptorMutex: sync.Mutex{},
|
interceptorMutex: sync.Mutex{},
|
||||||
|
PushTimeout: config.PushTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,6 +48,13 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if es.PushTimeout > 0 {
|
||||||
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
err = es.repo.Push(ctx, events, constraints...)
|
err = es.repo.Push(ctx, events, constraints...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/repository"
|
"github.com/zitadel/zitadel/internal/eventstore/repository"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/repository/sql"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
@ -287,7 +286,12 @@ func (rm *UserReadModel) Reduce() error {
|
|||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
func TestUserReadModel(t *testing.T) {
|
func TestUserReadModel(t *testing.T) {
|
||||||
es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient))
|
es, err := eventstore.Start(&eventstore.Config{Client: testCRDBClient})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unable to start eventstore: %v", err)
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
// es := eventstore.NewEventstore(&eventstore.Config{re})
|
||||||
es.RegisterFilterEventMapper(UserAddedEventMapper()).
|
es.RegisterFilterEventMapper(UserAddedEventMapper()).
|
||||||
RegisterFilterEventMapper(UserFirstNameChangedMapper()).
|
RegisterFilterEventMapper(UserFirstNameChangedMapper()).
|
||||||
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
|
RegisterFilterEventMapper(UserPasswordCheckedMapper()).
|
||||||
|
@ -269,10 +269,11 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "fetch previous fails",
|
name: "fetch previous fails",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).
|
es_repo_mock.NewRepo(t).
|
||||||
ExpectFilterEventsError(errFilter),
|
ExpectFilterEventsError(errFilter),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -301,9 +302,10 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "no successful stmts",
|
name: "no successful stmts",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t),
|
es_repo_mock.NewRepo(t),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -339,9 +341,10 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "update current sequence fails",
|
name: "update current sequence fails",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t),
|
es_repo_mock.NewRepo(t),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"agg"},
|
aggregates: []eventstore.AggregateType{"agg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -381,9 +384,10 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "commit fails",
|
name: "commit fails",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t),
|
es_repo_mock.NewRepo(t),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"agg"},
|
aggregates: []eventstore.AggregateType{"agg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -423,9 +427,10 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "correct",
|
name: "correct",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t),
|
es_repo_mock.NewRepo(t),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -455,9 +460,10 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "fetch previous stmts no additional stmts",
|
name: "fetch previous stmts no additional stmts",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -486,7 +492,7 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "fetch previous stmts additional events",
|
name: "fetch previous stmts additional events",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
AggregateType: "testAgg",
|
AggregateType: "testAgg",
|
||||||
@ -496,6 +502,7 @@ func TestStatementHandler_Update(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
@ -617,9 +624,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
stmtSeq: 6,
|
stmtSeq: 6,
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEventsError(errFilter),
|
es_repo_mock.NewRepo(t).ExpectFilterEventsError(errFilter),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
@ -642,9 +650,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
stmtSeq: 6,
|
stmtSeq: 6,
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
@ -666,7 +675,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
stmtSeq: 10,
|
stmtSeq: 10,
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -690,6 +699,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
@ -712,7 +722,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
stmtSeq: 10,
|
stmtSeq: 10,
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -726,6 +736,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
aggregates: []eventstore.AggregateType{"testAgg"},
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
|
@ -68,7 +68,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
@ -93,7 +93,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
"process error",
|
"process error",
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -106,6 +106,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
AggregateType: "testAgg",
|
AggregateType: "testAgg",
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
@ -131,7 +132,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
"process ok",
|
"process ok",
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -144,6 +145,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
AggregateType: "testAgg",
|
AggregateType: "testAgg",
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
@ -170,7 +172,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
"process limit exceeded ok",
|
"process limit exceeded ok",
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).
|
es_repo_mock.NewRepo(t).
|
||||||
ExpectFilterEvents(
|
ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
@ -184,6 +186,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
|||||||
AggregateType: "testAgg",
|
AggregateType: "testAgg",
|
||||||
},
|
},
|
||||||
).ExpectFilterEvents(),
|
).ExpectFilterEvents(),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
@ -389,9 +392,10 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
|
es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||||
AddQuery().
|
AddQuery().
|
||||||
@ -415,7 +419,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()),
|
||||||
),
|
),
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||||
@ -439,7 +443,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -463,6 +467,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||||
AddQuery().
|
AddQuery().
|
||||||
@ -485,7 +490,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
eventstore: eventstore.NewEventstore(
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
ID: "id",
|
ID: "id",
|
||||||
@ -509,6 +514,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
),
|
||||||
query: testQuery(
|
query: testQuery(
|
||||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||||
AddQuery().
|
AddQuery().
|
||||||
@ -694,8 +700,9 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
|
es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
triggerProjection: time.NewTimer(0),
|
triggerProjection: time.NewTimer(0),
|
||||||
@ -713,7 +720,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
AggregateType: "system",
|
AggregateType: "system",
|
||||||
@ -723,6 +730,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
Type: "system.projections.scheduler.succeeded",
|
Type: "system.projections.scheduler.succeeded",
|
||||||
}).
|
}).
|
||||||
ExpectInstanceIDsError(ErrFilter),
|
ExpectInstanceIDsError(ErrFilter),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
triggerProjection: time.NewTimer(0),
|
triggerProjection: time.NewTimer(0),
|
||||||
@ -740,7 +748,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
AggregateType: "system",
|
AggregateType: "system",
|
||||||
@ -749,6 +757,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
InstanceID: "",
|
InstanceID: "",
|
||||||
Type: "system.projections.scheduler.succeeded",
|
Type: "system.projections.scheduler.succeeded",
|
||||||
}).ExpectInstanceIDs("instanceID1"),
|
}).ExpectInstanceIDs("instanceID1"),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
triggerProjection: time.NewTimer(0),
|
triggerProjection: time.NewTimer(0),
|
||||||
@ -771,7 +780,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields{
|
fields{
|
||||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||||
return eventstore.NewEventstore(
|
return eventstore.NewEventstore(eventstore.TestConfig(
|
||||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||||
&repository.Event{
|
&repository.Event{
|
||||||
AggregateType: "system",
|
AggregateType: "system",
|
||||||
@ -780,6 +789,7 @@ func TestProjection_schedule(t *testing.T) {
|
|||||||
InstanceID: "",
|
InstanceID: "",
|
||||||
Type: "system.projections.scheduler.succeeded",
|
Type: "system.projections.scheduler.succeeded",
|
||||||
}).ExpectInstanceIDs("instanceID1"),
|
}).ExpectInstanceIDs("instanceID1"),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
triggerProjection: time.NewTimer(0),
|
triggerProjection: time.NewTimer(0),
|
||||||
|
@ -24,7 +24,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore {
|
|||||||
for _, e := range expects {
|
for _, e := range expects {
|
||||||
e(m)
|
e(m)
|
||||||
}
|
}
|
||||||
es := eventstore.NewEventstore(m)
|
es := eventstore.NewEventstore(eventstore.TestConfig(m))
|
||||||
iam_repo.RegisterEventMappers(es)
|
iam_repo.RegisterEventMappers(es)
|
||||||
org.RegisterEventMappers(es)
|
org.RegisterEventMappers(es)
|
||||||
usr_repo.RegisterEventMappers(es)
|
usr_repo.RegisterEventMappers(es)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user