diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index 60c703f8f4..e49fe007be 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -93,7 +93,7 @@ func NewStatementHandler( reduceScheduledPseudoEvent: reduceScheduledPseudoEvent, } - h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized, reduceScheduledPseudoEvent) + h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.searchQuery, h.Lock, h.Unlock, h.initialized, reduceScheduledPseudoEvent) return h } @@ -106,18 +106,20 @@ func (h *StatementHandler) Start() { } } -func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { +func (h *StatementHandler) searchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { + if h.reduceScheduledPseudoEvent { + return nil, 1, nil + } + return h.dbSearchQuery(ctx, instanceIDs) +} + +func (h *StatementHandler) dbSearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) { sequences, err := h.currentSequences(ctx, h.client.QueryContext, instanceIDs) if err != nil { return nil, 0, err } - bulkLimit := h.bulkLimit - if h.reduceScheduledPseudoEvent { - bulkLimit = 1 - } - - queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(bulkLimit).AllowTimeTravel() + queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(h.bulkLimit).AllowTimeTravel() for _, aggregateType := range h.aggregates { for _, instanceID := range instanceIDs { @@ -128,13 +130,6 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string break } } - if h.reduceScheduledPseudoEvent { - queryBuilder. - AddQuery(). - SequenceGreater(seq). - InstanceID(instanceID) - continue - } queryBuilder. AddQuery(). AggregateTypes(aggregateType). @@ -142,8 +137,7 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string InstanceID(instanceID) } } - - return queryBuilder, bulkLimit, nil + return queryBuilder, h.bulkLimit, nil } // Update implements handler.Update diff --git a/internal/eventstore/handler/crdb/handler_stmt_test.go b/internal/eventstore/handler/crdb/handler_stmt_test.go index 8c0d1537f4..eb72aab059 100644 --- a/internal/eventstore/handler/crdb/handler_stmt_test.go +++ b/internal/eventstore/handler/crdb/handler_stmt_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/zitadel/zitadel/internal/repository/pseudo" + "github.com/DATA-DOG/go-sqlmock" "github.com/stretchr/testify/assert" @@ -60,7 +62,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { type fields struct { sequenceTable string projectionName string - aggregates []eventstore.AggregateType + reducers []handler.AggregateReducer bulkLimit uint64 } type args struct { @@ -77,7 +79,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -99,7 +101,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -129,7 +131,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { fields: fields{ sequenceTable: "my_sequences", projectionName: "my_projection", - aggregates: []eventstore.AggregateType{"testAgg"}, + reducers: failingAggregateReducers("testAgg"), bulkLimit: 5, }, args: args{ @@ -158,6 +160,32 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { Limit(5), }, }, + { + name: "scheduled pseudo event", + fields: fields{ + sequenceTable: "my_sequences", + projectionName: "my_projection", + reducers: []handler.AggregateReducer{{ + Aggregate: pseudo.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: pseudo.ScheduledEventType, + Reduce: testReduceErr(errors.New("should not be called")), + }, + }, + }}, + bulkLimit: 5, + }, + args: args{ + instanceIDs: []string{"instanceID1", "instanceID2"}, + }, + want: want{ + limit: 1, + isErr: func(err error) bool { + return err == nil + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -168,6 +196,7 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { defer client.Close() id.Configure(&id.Config{Identification: id.Identification{PrivateIp: id.PrivateIp{Enabled: true}}}) + h := NewStatementHandler(context.Background(), StatementHandlerConfig{ ProjectionHandlerConfig: handler.ProjectionHandlerConfig{ ProjectionName: tt.fields.projectionName, @@ -177,15 +206,15 @@ func TestProjectionHandler_SearchQuery(t *testing.T) { Client: &database.DB{ DB: client, }, - }) - h.aggregates = tt.fields.aggregates + Reducers: tt.fields.reducers, + }) for _, expectation := range tt.want.expectations { expectation(mock) } - query, limit, err := h.SearchQuery(context.Background(), tt.args.instanceIDs) + query, limit, err := h.searchQuery(context.Background(), tt.args.instanceIDs) if !tt.want.isErr(err) { t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err) return @@ -1768,3 +1797,17 @@ func testReduceErr(err error) handler.Reduce { return nil, err } } + +func failingAggregateReducers(aggregates ...eventstore.AggregateType) []handler.AggregateReducer { + reducers := make([]handler.AggregateReducer, len(aggregates)) + for idx := range aggregates { + reducers[idx] = handler.AggregateReducer{ + Aggregate: aggregates[idx], + EventRedusers: []handler.EventReducer{{ + Event: "any.event", + Reduce: testReduceErr(errors.New("should not be called")), + }}, + } + } + return reducers +} diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index c954cf1bc5..9149598d99 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -164,6 +164,13 @@ func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Ev // FetchEvents checks the current sequences and filters for newer events func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { + if h.reduceScheduledPseudoEvent { + return h.fetchPseudoEvents(ctx, instances...) + } + return h.fetchDBEvents(ctx, instances...) +} + +func (h *ProjectionHandler) fetchDBEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { eventQuery, eventsLimit, err := h.searchQuery(ctx, instances) if err != nil { return nil, false, err @@ -172,10 +179,11 @@ func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string if err != nil { return nil, false, err } - if h.reduceScheduledPseudoEvent { - events = []eventstore.Event{pseudo.NewScheduledEvent(ctx, time.Now(), events[0], instances...)} - } - return events, int(eventsLimit) == len(events) && !h.reduceScheduledPseudoEvent, err + return events, int(eventsLimit) == len(events), err +} + +func (h *ProjectionHandler) fetchPseudoEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error) { + return []eventstore.Event{pseudo.NewScheduledEvent(ctx, time.Now(), instances...)}, false, nil } func (h *ProjectionHandler) subscribe(ctx context.Context) { diff --git a/internal/repository/pseudo/events.go b/internal/repository/pseudo/events.go index 12b84d499d..c1e5900fa1 100644 --- a/internal/repository/pseudo/events.go +++ b/internal/repository/pseudo/events.go @@ -16,18 +16,13 @@ var _ eventstore.Event = (*ScheduledEvent)(nil) type ScheduledEvent struct { *eventstore.BaseEvent `json:"-"` - // TODO: `json:"-"` - Timestamp time.Time `json:"timestamp"` - // TODO: `json:"-"` - InstanceIDs []string `json:"instanceIDs"` - // TODO: `json:"-"` - TriggeringEvent eventstore.Event `json:"triggeringEvent"` + Timestamp time.Time `json:"-"` + InstanceIDs []string `json:"-"` } func NewScheduledEvent( ctx context.Context, timestamp time.Time, - triggeringEvent eventstore.Event, instanceIDs ...string, ) *ScheduledEvent { return &ScheduledEvent{ @@ -36,8 +31,7 @@ func NewScheduledEvent( &NewAggregate().Aggregate, ScheduledEventType, ), - Timestamp: timestamp, - InstanceIDs: instanceIDs, - TriggeringEvent: triggeringEvent, + Timestamp: timestamp, + InstanceIDs: instanceIDs, } }