diff --git a/internal/command/existing_label_policies_model.go b/internal/command/existing_label_policies_model.go index af48427740..dda39980f1 100644 --- a/internal/command/existing_label_policies_model.go +++ b/internal/command/existing_label_policies_model.go @@ -38,7 +38,7 @@ func (rm *ExistingLabelPoliciesReadModel) Reduce() error { } } } - return nil + return rm.WriteModel.Reduce() } func (rm *ExistingLabelPoliciesReadModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/command/instance_domain_model.go b/internal/command/instance_domain_model.go index 06af63f8b2..2126741a88 100644 --- a/internal/command/instance_domain_model.go +++ b/internal/command/instance_domain_model.go @@ -55,7 +55,7 @@ func (wm *InstanceDomainWriteModel) Reduce() error { wm.State = domain.InstanceDomainStateRemoved } } - return nil + return wm.WriteModel.Reduce() } func (wm *InstanceDomainWriteModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/command/instance_model.go b/internal/command/instance_model.go index 6db058aff9..98d49926bf 100644 --- a/internal/command/instance_model.go +++ b/internal/command/instance_model.go @@ -60,7 +60,7 @@ func (wm *InstanceWriteModel) Reduce() error { wm.DefaultLanguage = e.Language } } - return nil + return wm.WriteModel.Reduce() } func (wm *InstanceWriteModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/command/org_domain_model.go b/internal/command/org_domain_model.go index a23d740696..43ab9e52dd 100644 --- a/internal/command/org_domain_model.go +++ b/internal/command/org_domain_model.go @@ -84,7 +84,7 @@ func (wm *OrgDomainWriteModel) Reduce() error { wm.ValidationCode = nil } } - return nil + return wm.WriteModel.Reduce() } func (wm *OrgDomainWriteModel) Query() *eventstore.SearchQueryBuilder { @@ -154,7 +154,7 @@ func (wm *OrgDomainsWriteModel) Reduce() error { } } } - return nil + return wm.WriteModel.Reduce() } func (wm *OrgDomainsWriteModel) Query() *eventstore.SearchQueryBuilder { @@ -216,7 +216,7 @@ func (wm *OrgDomainVerifiedWriteModel) Reduce() error { wm.Verified = false } } - return nil + return wm.WriteModel.Reduce() } func (wm *OrgDomainVerifiedWriteModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/command/preparation/command.go b/internal/command/preparation/command.go index 3d0ac69925..96a3da8909 100644 --- a/internal/command/preparation/command.go +++ b/internal/command/preparation/command.go @@ -25,6 +25,8 @@ var ( ) // PrepareCommands checks the passed validations and if ok creates the commands +// +// Deprecated: filter causes unneeded allocation. Use [eventstore.FilterToQueryReducer] instead. func PrepareCommands(ctx context.Context, filter FilterToQueryReducer, validations ...Validation) (cmds []eventstore.Command, err error) { commanders, err := validate(validations) if err != nil { diff --git a/internal/command/quota_model.go b/internal/command/quota_model.go index 3c4c87c401..8c2efa7dfe 100644 --- a/internal/command/quota_model.go +++ b/internal/command/quota_model.go @@ -79,7 +79,7 @@ func (wm *quotaWriteModel) Reduce() error { } // wm.WriteModel.Reduce() sets the aggregateID to the first event's aggregateID, but we need the last one wm.AggregateID = wm.rollingAggregateID - return nil + return wm.WriteModel.Reduce() } // NewChanges returns all changes that need to be applied to the aggregate. diff --git a/internal/command/system_model.go b/internal/command/system_model.go index 6637013d55..4021e5e46d 100644 --- a/internal/command/system_model.go +++ b/internal/command/system_model.go @@ -94,7 +94,7 @@ func (wm *SystemConfigWriteModel) Reduce() error { } } } - return nil + return wm.WriteModel.Reduce() } func (wm *SystemConfigWriteModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/command/unique_constraints_model.go b/internal/command/unique_constraints_model.go index c09f6343c2..9b71b0e7a1 100644 --- a/internal/command/unique_constraints_model.go +++ b/internal/command/unique_constraints_model.go @@ -183,7 +183,7 @@ func (rm *UniqueConstraintReadModel) Reduce() error { rm.removeUniqueConstraint(e.Aggregate().ID, e.UserID, member.UniqueMember) } } - return nil + return rm.WriteModel.Reduce() } func (rm *UniqueConstraintReadModel) Query() *eventstore.SearchQueryBuilder { diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 2a8d8a02d7..0bf31afb48 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -12,7 +12,9 @@ import ( // Eventstore abstracts all functions needed to store valid events // and filters the stored events type Eventstore struct { - interceptorMutex sync.Mutex + // TODO: get rid of this mutex, + // or if we scale to >4vCPU use a sync.Map + interceptorMutex sync.RWMutex eventInterceptors map[EventType]eventTypeInterceptors eventTypes []string aggregateTypes []string @@ -33,7 +35,6 @@ type eventTypeInterceptors struct { func NewEventstore(config *Config) *Eventstore { return &Eventstore{ eventInterceptors: map[EventType]eventTypeInterceptors{}, - interceptorMutex: sync.Mutex{}, PushTimeout: config.PushTimeout, pusher: config.Pusher, @@ -83,28 +84,33 @@ func (es *Eventstore) AggregateTypes() []string { // Filter filters the stored events based on the searchQuery // and maps the events to the defined event structs -func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error) { - // make sure that the instance id is always set - if queryFactory.instanceID == nil && authz.GetInstance(ctx).InstanceID() != "" { - queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID()) - } - - events, err := es.querier.Filter(ctx, queryFactory) +// +// Deprecated: Use [FilterToQueryReducer] instead to avoid allocations. +func (es *Eventstore) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) { + events := make([]Event, 0, searchQuery.GetLimit()) + searchQuery.ensureInstanceID(ctx) + err := es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error { + event, err := es.mapEvent(event) + if err != nil { + return err + } + events = append(events, event) + return nil + }) if err != nil { return nil, err } - - return es.mapEvents(events) + return events, nil } func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error) { mappedEvents = make([]Event, len(events)) - es.interceptorMutex.Lock() - defer es.interceptorMutex.Unlock() + es.interceptorMutex.RLock() + defer es.interceptorMutex.RUnlock() for i, event := range events { - mappedEvents[i], err = es.mapEvent(event) + mappedEvents[i], err = es.mapEventLocked(event) if err != nil { return nil, err } @@ -114,6 +120,12 @@ func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error } func (es *Eventstore) mapEvent(event Event) (Event, error) { + es.interceptorMutex.RLock() + defer es.interceptorMutex.RUnlock() + return es.mapEventLocked(event) +} + +func (es *Eventstore) mapEventLocked(event Event) (Event, error) { interceptors, ok := es.eventInterceptors[event.Type()] if !ok || interceptors.eventMapper == nil { return BaseEventFromRepo(event), nil @@ -121,6 +133,14 @@ func (es *Eventstore) mapEvent(event Event) (Event, error) { return interceptors.eventMapper(event) } +// TODO: refactor so we can change to the following interface: +/* +type reducer interface { + // Reduce applies an event on the object. + Reduce(Event) error +} +*/ + type reducer interface { //Reduce handles the events of the internal events list // it only appends the newly added events @@ -131,14 +151,15 @@ type reducer interface { // FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error { - events, err := es.Filter(ctx, searchQuery) - if err != nil { - return err - } - - r.AppendEvents(events...) - - return r.Reduce() + searchQuery.ensureInstanceID(ctx) + return es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error { + event, err := es.mapEvent(event) + if err != nil { + return err + } + r.AppendEvents(event) + return r.Reduce() + }) } // LatestSequence filters the latest sequence for the given search query @@ -180,13 +201,7 @@ type QueryReducer interface { // FilterToQueryReducer filters the events based on the search query of the query function, // appends all events to the reducer and calls it's reduce function func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error { - events, err := es.Filter(ctx, r.Query()) - if err != nil { - return err - } - r.AppendEvents(events...) - - return r.Reduce() + return es.FilterToReducer(ctx, r.Query(), r) } // RegisterFilterEventMapper registers a function for mapping an eventstore event to an event @@ -207,11 +222,13 @@ func (es *Eventstore) RegisterFilterEventMapper(aggregateType AggregateType, eve return es } +type Reducer func(event Event) error + type Querier interface { // Health checks if the connection to the storage is available Health(ctx context.Context) error - // Filter returns all events matching the given search query - Filter(ctx context.Context, searchQuery *SearchQueryBuilder) (events []Event, err error) + // FilterToReducer calls r for every event returned from the storage + FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error // LatestSequence returns the latest sequence found by the search query LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) // InstanceIDs returns the instance ids found by the search query diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 8738a47afc..8fe9fd8592 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "reflect" - "sync" "testing" "time" @@ -402,6 +401,7 @@ func (repo *testQuerier) Health(ctx context.Context) error { func (repo *testQuerier) CreateInstance(ctx context.Context, instance string) error { return nil } + func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) { if repo.err != nil { return nil, repo.err @@ -409,6 +409,18 @@ func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBui return repo.events, nil } +func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, reduce Reducer) error { + if repo.err != nil { + return repo.err + } + for _, event := range repo.events { + if err := reduce(event); err != nil { + return err + } + } + return nil +} + func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { if repo.err != nil { return 0, repo.err @@ -684,7 +696,6 @@ func TestEventstore_Push(t *testing.T) { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ pusher: tt.fields.pusher, - interceptorMutex: sync.Mutex{}, eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { @@ -816,7 +827,6 @@ func TestEventstore_FilterEvents(t *testing.T) { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ querier: tt.fields.repo, - interceptorMutex: sync.Mutex{}, eventInterceptors: map[EventType]eventTypeInterceptors{}, } @@ -1121,7 +1131,6 @@ func TestEventstore_FilterToReducer(t *testing.T) { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ querier: tt.fields.repo, - interceptorMutex: sync.Mutex{}, eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { @@ -1238,7 +1247,6 @@ func TestEventstore_mapEvents(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { es := &Eventstore{ - interceptorMutex: sync.Mutex{}, eventInterceptors: map[EventType]eventTypeInterceptors{}, } for eventType, mapper := range tt.fields.eventMapper { diff --git a/internal/eventstore/repository/mock/repository.mock.go b/internal/eventstore/repository/mock/repository.mock.go index f2b4fb089a..67007e69ec 100644 --- a/internal/eventstore/repository/mock/repository.mock.go +++ b/internal/eventstore/repository/mock/repository.mock.go @@ -35,19 +35,18 @@ func (m *MockQuerier) EXPECT() *MockQuerierMockRecorder { return m.recorder } -// Filter mocks base method. -func (m *MockQuerier) Filter(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) ([]eventstore.Event, error) { +// FilterToReducer mocks base method. +func (m *MockQuerier) FilterToReducer(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder, arg2 eventstore.Reducer) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Filter", arg0, arg1) - ret0, _ := ret[0].([]eventstore.Event) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "FilterToReducer", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 } -// Filter indicates an expected call of Filter. -func (mr *MockQuerierMockRecorder) Filter(arg0, arg1 interface{}) *gomock.Call { +// FilterToReducer indicates an expected call of FilterToReducer. +func (mr *MockQuerierMockRecorder) FilterToReducer(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockQuerier)(nil).Filter), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterToReducer", reflect.TypeOf((*MockQuerier)(nil).FilterToReducer), arg0, arg1, arg2) } // Health mocks base method. diff --git a/internal/eventstore/repository/mock/repository.mock.impl.go b/internal/eventstore/repository/mock/repository.mock.impl.go index b6d5eabbd8..1df54f8c56 100644 --- a/internal/eventstore/repository/mock/repository.mock.impl.go +++ b/internal/eventstore/repository/mock/repository.mock.impl.go @@ -30,21 +30,30 @@ func NewRepo(t *testing.T) *MockRepository { func (m *MockRepository) ExpectFilterNoEventsNoError() *MockRepository { m.MockQuerier.ctrl.T.Helper() - m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(nil, nil) + m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) return m } func (m *MockRepository) ExpectFilterEvents(events ...eventstore.Event) *MockRepository { m.MockQuerier.ctrl.T.Helper() - m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(events, nil) + m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) error { + for _, event := range events { + if err := reduce(event); err != nil { + return err + } + } + return nil + }, + ) return m } func (m *MockRepository) ExpectFilterEventsError(err error) *MockRepository { m.MockQuerier.ctrl.T.Helper() - m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(nil, err) + m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(err) return m } diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index 6d0cda612c..2d92e45ef7 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -247,22 +247,18 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC return nil } -// Filter returns all events matching the given search query -func (crdb *CRDB) Filter(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (events []eventstore.Event, err error) { - events = make([]eventstore.Event, 0, searchQuery.GetLimit()) - err = query(ctx, crdb, searchQuery, &events, false) +// FilterToReducer finds all events matching the given search query and passes them to the reduce function. +func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) error { + err := query(ctx, crdb, searchQuery, reduce, false) + if err == nil { + return nil + } pgErr := new(pgconn.PgError) // check events2 not exists - if err != nil && errors.As(err, &pgErr) { - if pgErr.Code == "42P01" { - err = query(ctx, crdb, searchQuery, &events, true) - } + if errors.As(err, &pgErr) && pgErr.Code == "42P01" { + return query(ctx, crdb, searchQuery, reduce, true) } - if err != nil { - return nil, err - } - - return events, nil + return err } // LatestSequence returns the latest sequence found by the search query diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index 144a414fd1..aab8abbcfe 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -168,12 +168,11 @@ func instanceIDsScanner(scanner scan, dest interface{}) (err error) { func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) { return func(scanner scan, dest interface{}) (err error) { - events, ok := dest.(*[]eventstore.Event) + reduce, ok := dest.(eventstore.Reducer) if !ok { - return z_errors.ThrowInvalidArgument(nil, "SQL-4GP6F", "type must be event") + return z_errors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest) } event := new(repository.Event) - data := sql.RawBytes{} position := new(sql.NullFloat64) if useV1 { @@ -181,7 +180,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) &event.CreationDate, &event.Typ, &event.Seq, - &data, + &event.Data, &event.EditorUser, &event.ResourceOwner, &event.InstanceID, @@ -196,7 +195,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) &event.Typ, &event.Seq, position, - &data, + &event.Data, &event.EditorUser, &event.ResourceOwner, &event.InstanceID, @@ -211,14 +210,8 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) logging.New().WithError(err).Warn("unable to scan row") return z_errors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row") } - - event.Data = make([]byte, len(data)) - copy(event.Data, data) event.Pos = position.Float64 - - *events = append(*events, event) - - return nil + return reduce(event) } } diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index 478191ec1e..8d6f670384 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database/cockroach" @@ -74,6 +75,8 @@ func Test_getCondition(t *testing.T) { } func Test_prepareColumns(t *testing.T) { + var reducedEvents []eventstore.Event + type fields struct { dbRow []interface{} } @@ -146,13 +149,16 @@ func Test_prepareColumns(t *testing.T) { name: "events", args: args{ columns: eventstore.ColumnsEvent, - dest: &[]eventstore.Event{}, - useV1: true, + dest: eventstore.Reducer(func(event eventstore.Event) error { + reducedEvents = append(reducedEvents, event) + return nil + }), + useV1: true, }, res: res{ query: `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Data: make(sql.RawBytes, 0)}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Data: nil}, }, }, fields: fields{ @@ -163,12 +169,15 @@ func Test_prepareColumns(t *testing.T) { name: "events v2", args: args{ columns: eventstore.ColumnsEvent, - dest: &[]eventstore.Event{}, + dest: eventstore.Reducer(func(event eventstore.Event) error { + reducedEvents = append(reducedEvents, event) + return nil + }), }, res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: make(sql.RawBytes, 0), Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"}, }, }, fields: fields{ @@ -179,12 +188,15 @@ func Test_prepareColumns(t *testing.T) { name: "event null position", args: args{ columns: eventstore.ColumnsEvent, - dest: &[]eventstore.Event{}, + dest: eventstore.Reducer(func(event eventstore.Event) error { + reducedEvents = append(reducedEvents, event) + return nil + }), }, res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: make(sql.RawBytes, 0), Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"}, }, }, fields: fields{ @@ -207,9 +219,12 @@ func Test_prepareColumns(t *testing.T) { name: "event query error", args: args{ columns: eventstore.ColumnsEvent, - dest: &[]eventstore.Event{}, - dbErr: sql.ErrConnDone, - useV1: true, + dest: eventstore.Reducer(func(event eventstore.Event) error { + reducedEvents = append(reducedEvents, event) + return nil + }), + dbErr: sql.ErrConnDone, + useV1: true, }, res: res{ query: `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events`, @@ -242,6 +257,12 @@ func Test_prepareColumns(t *testing.T) { equalizer.Equal(tt.args.dest.(*sql.NullTime).Time) return } + if _, ok := tt.args.dest.(eventstore.Reducer); ok { + assert.Equal(t, tt.res.expected, reducedEvents) + reducedEvents = nil + return + } + got := reflect.Indirect(reflect.ValueOf(tt.args.dest)).Interface() if !reflect.DeepEqual(got, tt.res.expected) { t.Errorf("unexpected result from rowScanner \nwant: %+v \ngot: %+v", tt.res.expected, got) @@ -625,7 +646,10 @@ func Test_query_events_with_crdb(t *testing.T) { } events := []eventstore.Event{} - if err := query(context.Background(), db, tt.args.searchQuery, &events, true); (err != nil) != tt.wantErr { + if err := query(context.Background(), db, tt.args.searchQuery, eventstore.Reducer(func(event eventstore.Event) error { + events = append(events, event) + return nil + }), true); (err != nil) != tt.wantErr { t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index 3cdd7f4d81..2b102db9fb 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -1,9 +1,11 @@ package eventstore import ( + "context" "database/sql" "time" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/errors" ) @@ -82,6 +84,13 @@ func (q SearchQueryBuilder) GetCreationDateAfter() time.Time { return q.creationDateAfter } +// ensureInstanceID makes sure that the instance id is always set +func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) { + if b.instanceID == nil && authz.GetInstance(ctx).InstanceID() != "" { + b.InstanceID(authz.GetInstance(ctx).InstanceID()) + } +} + type SearchQuery struct { builder *SearchQueryBuilder aggregateTypes []AggregateType