From cb4a874be127a76817b11d5b8f6c05ff5d428b7d Mon Sep 17 00:00:00 2001 From: Silvan <27845747+adlerhurst@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:26:03 +0200 Subject: [PATCH] fix(projection): prevent skipped events written within the same microsecond (#10710) This PR fixes a bug where projections could skip events if they were written within the same microsecond, which can occur during high load on different transactions. ## Problem The event query ordering was not fully deterministic. Events created at the exact same time (same `position`) and in the same transaction (`in_tx_order`) were not guaranteed to be returned in the same order on subsequent queries. This could lead to some events being skipped by the projection logic. ## Solution To solve this, the `ORDER BY` clause for event queries has been extended to include `instance_id`, `aggregate_type`, and `aggregate_id`. This ensures a stable and deterministic ordering for all events, even if they share the same timestamp. ## Additionally changes: * Replaced a manual slice search with the more idiomatic `slices.Contains` to skip already projected instances. * Changed the handling of already locked projections to log a debug message and skip execution instead of returning an error. * Ensures the database transaction is explicitly committed. (cherry picked from commit 25ab6b23977368b65f36a8bb91a6650a54ffbb25) --- internal/eventstore/handler/v2/handler.go | 21 +++++++------------ .../eventstore/repository/sql/postgres.go | 4 ++-- .../eventstore/repository/sql/query_test.go | 4 ++-- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index ce548428910..bb90a07cbc2 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -20,7 +20,6 @@ import ( "github.com/zitadel/zitadel/internal/migration" "github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/pseudo" - "github.com/zitadel/zitadel/internal/zerrors" ) type EventStore interface { @@ -316,7 +315,7 @@ func (h *Handler) subscribe(ctx context.Context) { solvedInstances := make([]string, 0, len(events)) queueCtx := call.WithTimestamp(ctx) for _, e := range events { - if instanceSolved(solvedInstances, e.Aggregate().InstanceID) { + if slices.Contains(solvedInstances, e.Aggregate().InstanceID) { continue } queueCtx = authz.WithInstanceID(queueCtx, e.Aggregate().InstanceID) @@ -330,15 +329,6 @@ func (h *Handler) subscribe(ctx context.Context) { } } -func instanceSolved(solvedInstances []string, instanceID string) bool { - for _, solvedInstance := range solvedInstances { - if solvedInstance == instanceID { - return true - } - } - return false -} - func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event { events := make([]eventstore.Event, 1) events[0] = event @@ -562,6 +552,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add h.log().OnError(rollbackErr).Debug("unable to rollback tx") return } + commitErr := tx.Commit() + if !errors.Is(commitErr, sql.ErrTxDone) { + err = commitErr + } }() var hasLocked bool @@ -570,7 +564,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return false, err } if !hasLocked { - return false, zerrors.ThrowInternal(nil, "V2-lpiK0", "projection already locked") + h.log().Debug("skip execution, projection already locked") + return false, nil } currentState, err := h.currentState(ctx, tx) @@ -578,7 +573,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return additionalIteration, err } // stop execution if currentState.position >= config.maxPosition - if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) { + if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) { return false, nil } diff --git a/internal/eventstore/repository/sql/postgres.go b/internal/eventstore/repository/sql/postgres.go index 5d56d913190..0bd9910dc86 100644 --- a/internal/eventstore/repository/sql/postgres.go +++ b/internal/eventstore/repository/sql/postgres.go @@ -91,9 +91,9 @@ func (db *Postgres) orderByEventSequence(desc, shouldOrderBySequence, useV1 bool } if desc { - return ` ORDER BY "position" DESC, in_tx_order DESC` + return ` ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id` } - return ` ORDER BY "position", in_tx_order` + return ` ORDER BY "position", in_tx_order, instance_id, aggregate_type, aggregate_id` } func (db *Postgres) eventQuery(useV1 bool) string { diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index 4c6e30daf57..641ff952d65 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -903,7 +903,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`), + regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id LIMIT $9`), []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)}, ), }, @@ -932,7 +932,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`), + regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id LIMIT $9`), []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), time.Unix(123, 456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", time.Unix(123, 456), uint64(5)}, ), },