diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 9278d5c85a..4006738bf0 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -320,6 +320,9 @@ Actions: - localhost - "127.0.0.1" +Eventstore: + PushTimeout: 15s + DefaultInstance: InstanceName: DefaultLanguage: en diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index bc4883ad34..fddaee61ec 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -62,7 +62,7 @@ func Setup(config *Config, steps *Steps, masterKey string) { dbClient, err := database.Connect(config.Database, false) logging.OnError(err).Fatal("unable to connect to database") - eventstoreClient, err := eventstore.Start(dbClient) + eventstoreClient, err := eventstore.Start(&eventstore.Config{Client: dbClient}) logging.OnError(err).Fatal("unable to start eventstore") migration.RegisterMappers(eventstoreClient) diff --git a/cmd/start/config.go b/cmd/start/config.go index 078e3ce1de..db5e0e170e 100644 --- a/cmd/start/config.go +++ b/cmd/start/config.go @@ -22,6 +22,7 @@ import ( "github.com/zitadel/zitadel/internal/config/systemdefaults" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/query/projection" static_config "github.com/zitadel/zitadel/internal/static/config" @@ -60,6 +61,7 @@ type Config struct { CustomerPortal string Machine *id.Config Actions *actions.Config + Eventstore *eventstore.Config } func MustNewConfig(v *viper.Viper) *Config { diff --git a/cmd/start/start.go b/cmd/start/start.go index 486d551689..84caebe5cc 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -99,7 +99,8 @@ func startZitadel(config *Config, masterKey string) error { return err } - eventstoreClient, err := eventstore.Start(dbClient) + config.Eventstore.Client = dbClient + eventstoreClient, err := eventstore.Start(config.Eventstore) if err != nil { return fmt.Errorf("cannot start eventstore for queries: %w", err) } diff --git a/internal/command/main_test.go b/internal/command/main_test.go index 6974878473..aeb57cd4bb 100644 --- a/internal/command/main_test.go +++ b/internal/command/main_test.go @@ -29,7 +29,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore { for _, e := range expects { e(m) } - es := eventstore.NewEventstore(m) + es := eventstore.NewEventstore(eventstore.TestConfig(m)) iam_repo.RegisterEventMappers(es) org.RegisterEventMappers(es) usr_repo.RegisterEventMappers(es) diff --git a/internal/eventstore/config.go b/internal/eventstore/config.go index ee26b9fdbc..bd401e27df 100644 --- a/internal/eventstore/config.go +++ b/internal/eventstore/config.go @@ -2,10 +2,24 @@ package eventstore import ( "database/sql" + "time" + "github.com/zitadel/zitadel/internal/eventstore/repository" z_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql" ) -func Start(sqlClient *sql.DB) (*Eventstore, error) { - return NewEventstore(z_sql.NewCRDB(sqlClient)), nil +type Config struct { + PushTimeout time.Duration + Client *sql.DB + + repo repository.Repository +} + +func TestConfig(repo repository.Repository) *Config { + return &Config{repo: repo} +} + +func Start(config *Config) (*Eventstore, error) { + config.repo = z_sql.NewCRDB(config.Client) + return NewEventstore(config), nil } diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 9624664986..0d702a7a2d 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -6,6 +6,7 @@ import ( "encoding/json" "reflect" "sync" + "time" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/errors" @@ -18,17 +19,19 @@ type Eventstore struct { repo repository.Repository interceptorMutex sync.Mutex eventInterceptors map[EventType]eventTypeInterceptors + PushTimeout time.Duration } type eventTypeInterceptors struct { eventMapper func(*repository.Event) (Event, error) } -func NewEventstore(repo repository.Repository) *Eventstore { +func NewEventstore(config *Config) *Eventstore { return &Eventstore{ - repo: repo, + repo: config.repo, eventInterceptors: map[EventType]eventTypeInterceptors{}, interceptorMutex: sync.Mutex{}, + PushTimeout: config.PushTimeout, } } @@ -45,6 +48,13 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error if err != nil { return nil, err } + + if es.PushTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, es.PushTimeout) + defer cancel() + } + err = es.repo.Push(ctx, events, constraints...) if err != nil { return nil, err diff --git a/internal/eventstore/example_test.go b/internal/eventstore/example_test.go index 2b6b4e7c92..8ee5aa51d6 100644 --- a/internal/eventstore/example_test.go +++ b/internal/eventstore/example_test.go @@ -9,7 +9,6 @@ import ( "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/repository" - "github.com/zitadel/zitadel/internal/eventstore/repository/sql" ) // ------------------------------------------------------------ @@ -287,7 +286,12 @@ func (rm *UserReadModel) Reduce() error { // ------------------------------------------------------------ func TestUserReadModel(t *testing.T) { - es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient)) + es, err := eventstore.Start(&eventstore.Config{Client: testCRDBClient}) + if err != nil { + t.Errorf("unable to start eventstore: %v", err) + t.FailNow() + } + // es := eventstore.NewEventstore(&eventstore.Config{re}) es.RegisterFilterEventMapper(UserAddedEventMapper()). RegisterFilterEventMapper(UserFirstNameChangedMapper()). RegisterFilterEventMapper(UserPasswordCheckedMapper()). diff --git a/internal/eventstore/handler/crdb/handler_stmt_test.go b/internal/eventstore/handler/crdb/handler_stmt_test.go index fa245c8eb1..401d86ec09 100644 --- a/internal/eventstore/handler/crdb/handler_stmt_test.go +++ b/internal/eventstore/handler/crdb/handler_stmt_test.go @@ -269,10 +269,11 @@ func TestStatementHandler_Update(t *testing.T) { { name: "fetch previous fails", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t). ExpectFilterEventsError(errFilter), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, args: args{ @@ -301,9 +302,10 @@ func TestStatementHandler_Update(t *testing.T) { { name: "no successful stmts", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, args: args{ @@ -339,9 +341,10 @@ func TestStatementHandler_Update(t *testing.T) { { name: "update current sequence fails", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t), ), + ), aggregates: []eventstore.AggregateType{"agg"}, }, args: args{ @@ -381,9 +384,10 @@ func TestStatementHandler_Update(t *testing.T) { { name: "commit fails", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t), ), + ), aggregates: []eventstore.AggregateType{"agg"}, }, args: args{ @@ -423,9 +427,10 @@ func TestStatementHandler_Update(t *testing.T) { { name: "correct", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, args: args{ @@ -455,9 +460,10 @@ func TestStatementHandler_Update(t *testing.T) { { name: "fetch previous stmts no additional stmts", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents(), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, args: args{ @@ -486,7 +492,7 @@ func TestStatementHandler_Update(t *testing.T) { { name: "fetch previous stmts additional events", fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ AggregateType: "testAgg", @@ -496,6 +502,7 @@ func TestStatementHandler_Update(t *testing.T) { }, ), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, args: args{ @@ -617,9 +624,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { stmtSeq: 6, }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEventsError(errFilter), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, want: want{ @@ -642,9 +650,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { stmtSeq: 6, }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents(), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, want: want{ @@ -666,7 +675,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { stmtSeq: 10, }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -690,6 +699,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { }, ), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, want: want{ @@ -712,7 +722,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { stmtSeq: 10, }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -726,6 +736,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) { }, ), ), + ), aggregates: []eventstore.AggregateType{"testAgg"}, }, want: want{ diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index c504a9dbd4..667a293246 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -68,7 +68,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { return eventstore.NewEventstore( - es_repo_mock.NewRepo(t).ExpectFilterEvents(), + eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()), ) }, query: testQuery( @@ -93,7 +93,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { "process error", fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -106,6 +106,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { AggregateType: "testAgg", }, ), + ), ) }, query: testQuery( @@ -131,7 +132,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { "process ok", fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -144,6 +145,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { AggregateType: "testAgg", }, ), + ), ) }, query: testQuery( @@ -170,7 +172,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { "process limit exceeded ok", fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t). ExpectFilterEvents( &repository.Event{ @@ -184,6 +186,7 @@ func TestProjectionHandler_Trigger(t *testing.T) { AggregateType: "testAgg", }, ).ExpectFilterEvents(), + ), ) }, query: testQuery( @@ -389,9 +392,10 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { ctx: context.Background(), }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter), ), + ), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -415,7 +419,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { }, fields: fields{ eventstore: eventstore.NewEventstore( - es_repo_mock.NewRepo(t).ExpectFilterEvents(), + eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()), ), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). @@ -439,7 +443,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { ctx: context.Background(), }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -463,6 +467,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { }, ), ), + ), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -485,7 +490,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { ctx: context.Background(), }, fields: fields{ - eventstore: eventstore.NewEventstore( + eventstore: eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ ID: "id", @@ -509,6 +514,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) { }, ), ), + ), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -694,8 +700,9 @@ func TestProjection_schedule(t *testing.T) { }, fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter), + ), ) }, triggerProjection: time.NewTimer(0), @@ -713,7 +720,7 @@ func TestProjection_schedule(t *testing.T) { }, fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ AggregateType: "system", @@ -723,6 +730,7 @@ func TestProjection_schedule(t *testing.T) { Type: "system.projections.scheduler.succeeded", }). ExpectInstanceIDsError(ErrFilter), + ), ) }, triggerProjection: time.NewTimer(0), @@ -740,7 +748,7 @@ func TestProjection_schedule(t *testing.T) { }, fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ AggregateType: "system", @@ -749,6 +757,7 @@ func TestProjection_schedule(t *testing.T) { InstanceID: "", Type: "system.projections.scheduler.succeeded", }).ExpectInstanceIDs("instanceID1"), + ), ) }, triggerProjection: time.NewTimer(0), @@ -771,7 +780,7 @@ func TestProjection_schedule(t *testing.T) { }, fields{ eventstore: func(t *testing.T) *eventstore.Eventstore { - return eventstore.NewEventstore( + return eventstore.NewEventstore(eventstore.TestConfig( es_repo_mock.NewRepo(t).ExpectFilterEvents( &repository.Event{ AggregateType: "system", @@ -780,6 +789,7 @@ func TestProjection_schedule(t *testing.T) { InstanceID: "", Type: "system.projections.scheduler.succeeded", }).ExpectInstanceIDs("instanceID1"), + ), ) }, triggerProjection: time.NewTimer(0), diff --git a/internal/query/projection/main_test.go b/internal/query/projection/main_test.go index 6d7001902d..1c09cf3ffa 100644 --- a/internal/query/projection/main_test.go +++ b/internal/query/projection/main_test.go @@ -24,7 +24,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore { for _, e := range expects { e(m) } - es := eventstore.NewEventstore(m) + es := eventstore.NewEventstore(eventstore.TestConfig(m)) iam_repo.RegisterEventMappers(es) org.RegisterEventMappers(es) usr_repo.RegisterEventMappers(es)