fix(projection): prevent skipped events written within the same microsecond (#10710)

This PR fixes a bug where projections could skip events if they were
written within the same microsecond, which can occur during high load on
different transactions.

## Problem

The event query ordering was not fully deterministic. Events created at
the exact same time (same `position`) and in the same transaction
(`in_tx_order`) were not guaranteed to be returned in the same order on
subsequent queries. This could lead to some events being skipped by the
projection logic.

## Solution

To solve this, the `ORDER BY` clause for event queries has been extended
to include `instance_id`, `aggregate_type`, and `aggregate_id`. This
ensures a stable and deterministic ordering for all events, even if they
share the same timestamp.

## Additionally changes:

* Replaced a manual slice search with the more idiomatic
`slices.Contains` to skip already projected instances.
* Changed the handling of already locked projections to log a debug
message and skip execution instead of returning an error.
* Ensures the database transaction is explicitly committed.

(cherry picked from commit 25ab6b2397)
This commit is contained in:
Silvan
2025-09-12 13:26:03 +02:00
committed by Livio Spring
parent 23d98e9d11
commit cb4a874be1
3 changed files with 12 additions and 17 deletions

View File

@@ -20,7 +20,6 @@ import (
"github.com/zitadel/zitadel/internal/migration"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/zitadel/internal/zerrors"
)
type EventStore interface {
@@ -316,7 +315,7 @@ func (h *Handler) subscribe(ctx context.Context) {
solvedInstances := make([]string, 0, len(events))
queueCtx := call.WithTimestamp(ctx)
for _, e := range events {
if instanceSolved(solvedInstances, e.Aggregate().InstanceID) {
if slices.Contains(solvedInstances, e.Aggregate().InstanceID) {
continue
}
queueCtx = authz.WithInstanceID(queueCtx, e.Aggregate().InstanceID)
@@ -330,15 +329,6 @@ func (h *Handler) subscribe(ctx context.Context) {
}
}
func instanceSolved(solvedInstances []string, instanceID string) bool {
for _, solvedInstance := range solvedInstances {
if solvedInstance == instanceID {
return true
}
}
return false
}
func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event {
events := make([]eventstore.Event, 1)
events[0] = event
@@ -562,6 +552,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
return
}
commitErr := tx.Commit()
if !errors.Is(commitErr, sql.ErrTxDone) {
err = commitErr
}
}()
var hasLocked bool
@@ -570,7 +564,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
return false, err
}
if !hasLocked {
return false, zerrors.ThrowInternal(nil, "V2-lpiK0", "projection already locked")
h.log().Debug("skip execution, projection already locked")
return false, nil
}
currentState, err := h.currentState(ctx, tx)
@@ -578,7 +573,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
return additionalIteration, err
}
// stop execution if currentState.position >= config.maxPosition
if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) {
if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil
}