mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-23 07:39:02 +00:00
fix(projection): prevent skipped events written within the same microsecond (#10710)
This PR fixes a bug where projections could skip events if they were
written within the same microsecond, which can occur during high load on
different transactions.
## Problem
The event query ordering was not fully deterministic. Events created at
the exact same time (same `position`) and in the same transaction
(`in_tx_order`) were not guaranteed to be returned in the same order on
subsequent queries. This could lead to some events being skipped by the
projection logic.
## Solution
To solve this, the `ORDER BY` clause for event queries has been extended
to include `instance_id`, `aggregate_type`, and `aggregate_id`. This
ensures a stable and deterministic ordering for all events, even if they
share the same timestamp.
## Additionally changes:
* Replaced a manual slice search with the more idiomatic
`slices.Contains` to skip already projected instances.
* Changed the handling of already locked projections to log a debug
message and skip execution instead of returning an error.
* Ensures the database transaction is explicitly committed.
(cherry picked from commit 25ab6b2397)
This commit is contained in:
@@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/zitadel/zitadel/internal/migration"
|
"github.com/zitadel/zitadel/internal/migration"
|
||||||
"github.com/zitadel/zitadel/internal/repository/instance"
|
"github.com/zitadel/zitadel/internal/repository/instance"
|
||||||
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
||||||
"github.com/zitadel/zitadel/internal/zerrors"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventStore interface {
|
type EventStore interface {
|
||||||
@@ -316,7 +315,7 @@ func (h *Handler) subscribe(ctx context.Context) {
|
|||||||
solvedInstances := make([]string, 0, len(events))
|
solvedInstances := make([]string, 0, len(events))
|
||||||
queueCtx := call.WithTimestamp(ctx)
|
queueCtx := call.WithTimestamp(ctx)
|
||||||
for _, e := range events {
|
for _, e := range events {
|
||||||
if instanceSolved(solvedInstances, e.Aggregate().InstanceID) {
|
if slices.Contains(solvedInstances, e.Aggregate().InstanceID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
queueCtx = authz.WithInstanceID(queueCtx, e.Aggregate().InstanceID)
|
queueCtx = authz.WithInstanceID(queueCtx, e.Aggregate().InstanceID)
|
||||||
@@ -330,15 +329,6 @@ func (h *Handler) subscribe(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func instanceSolved(solvedInstances []string, instanceID string) bool {
|
|
||||||
for _, solvedInstance := range solvedInstances {
|
|
||||||
if solvedInstance == instanceID {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event {
|
func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event {
|
||||||
events := make([]eventstore.Event, 1)
|
events := make([]eventstore.Event, 1)
|
||||||
events[0] = event
|
events[0] = event
|
||||||
@@ -562,6 +552,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
|
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
commitErr := tx.Commit()
|
||||||
|
if !errors.Is(commitErr, sql.ErrTxDone) {
|
||||||
|
err = commitErr
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var hasLocked bool
|
var hasLocked bool
|
||||||
@@ -570,7 +564,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if !hasLocked {
|
if !hasLocked {
|
||||||
return false, zerrors.ThrowInternal(nil, "V2-lpiK0", "projection already locked")
|
h.log().Debug("skip execution, projection already locked")
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
currentState, err := h.currentState(ctx, tx)
|
currentState, err := h.currentState(ctx, tx)
|
||||||
@@ -578,7 +573,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
return additionalIteration, err
|
return additionalIteration, err
|
||||||
}
|
}
|
||||||
// stop execution if currentState.position >= config.maxPosition
|
// stop execution if currentState.position >= config.maxPosition
|
||||||
if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) {
|
if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,9 +91,9 @@ func (db *Postgres) orderByEventSequence(desc, shouldOrderBySequence, useV1 bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
if desc {
|
if desc {
|
||||||
return ` ORDER BY "position" DESC, in_tx_order DESC`
|
return ` ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id`
|
||||||
}
|
}
|
||||||
return ` ORDER BY "position", in_tx_order`
|
return ` ORDER BY "position", in_tx_order, instance_id, aggregate_type, aggregate_id`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Postgres) eventQuery(useV1 bool) string {
|
func (db *Postgres) eventQuery(useV1 bool) string {
|
||||||
|
|||||||
@@ -903,7 +903,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(
|
mock: newMockClient(t).expectQuery(
|
||||||
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
|
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id LIMIT $9`),
|
||||||
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
|
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@@ -932,7 +932,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(
|
mock: newMockClient(t).expectQuery(
|
||||||
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
|
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC, instance_id, aggregate_type, aggregate_id LIMIT $9`),
|
||||||
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), time.Unix(123, 456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", time.Unix(123, 456), uint64(5)},
|
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), time.Unix(123, 456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", time.Unix(123, 456), uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user