fix(projection): Prevent race condition with event push (#10676)

A timing issue (a race condition) was identified in our event processing
system. Under specific circumstances, it was possible for the system to
skip processing certain events, leading to potential data
inconsistencies.

## Which problems are solved

The system tracks its progress through the event log using timestamps.
The issue occurred because we were using the timestamp from the start of
a database transaction. If a query to read new events began after the
transaction started but before the new event was committed, the query
would not see the new event and would fail to process it.

## How the problems are solved

The fix is to change which timestamp is used for tracking. We now use
the precise timestamp of when the event is actually written to the
database. This ensures that the event's timestamp is always correctly
ordered, closing the timing gap and preventing the race condition.

This change enhances the reliability and integrity of our event
processing pipeline. It guarantees that all events are processed in the
correct order and eliminates the risk of skipped events, ensuring data
is always consistent across the system.

## Additional information

original fix: https://github.com/zitadel/zitadel/pull/10560

(cherry picked from commit 136363deda)
This commit is contained in:
Silvan
2025-09-10 12:06:51 +02:00
committed by Livio Spring
parent 19d1ab9c94
commit 23d98e9d11
5 changed files with 106 additions and 2 deletions

View File

@@ -40,7 +40,9 @@ DECLARE
"aggregate" RECORD;
current_sequence BIGINT;
current_owner TEXT;
created_at TIMESTAMPTZ;
BEGIN
created_at := statement_timestamp();
FOR "aggregate" IN
SELECT DISTINCT
instance_id
@@ -67,11 +69,11 @@ BEGIN
, c.command_type -- AS event_type
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
, c.revision
, NOW() -- AS created_at
, created_at
, c.payload
, c.creator
, COALESCE(current_owner, c.owner) -- AS owner
, EXTRACT(EPOCH FROM NOW()) -- AS position
, EXTRACT(EPOCH FROM created_at) -- AS position
, c.ordinality::{{ .InTxOrderType }} -- AS in_tx_order
FROM
UNNEST(commands) WITH ORDINALITY AS c

44
cmd/setup/64.go Normal file
View File

@@ -0,0 +1,44 @@
package setup
import (
"context"
"database/sql"
_ "embed"
"fmt"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 64.sql
changePushPosition string
)
type ChangePushPosition struct {
dbClient *database.DB
}
func (mig *ChangePushPosition) Execute(ctx context.Context, _ eventstore.Event) error {
inTxOrderType, err := mig.inTxOrderType(ctx)
if err != nil {
return err
}
stmt := fmt.Sprintf(changePushPosition, inTxOrderType)
_, err = mig.dbClient.ExecContext(ctx, stmt)
return err
}
func (mig *ChangePushPosition) String() string {
return "64_change_push_position"
}
func (mig *ChangePushPosition) inTxOrderType(ctx context.Context) (typeName string, err error) {
err = mig.dbClient.QueryRowContext(ctx, func(row *sql.Row) error {
return row.Scan(&typeName)
}, `SELECT data_type FROM information_schema.columns WHERE table_schema = 'eventstore' AND table_name = 'events2' AND column_name = 'in_tx_order'`)
if err != nil {
return "", fmt.Errorf("get in_tx_order_type: %w", err)
}
return typeName, nil
}

55
cmd/setup/64.sql Normal file
View File

@@ -0,0 +1,55 @@
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[])
RETURNS SETOF eventstore.events2
LANGUAGE 'plpgsql'
STABLE PARALLEL SAFE
ROWS 10
AS $$
DECLARE
"aggregate" RECORD;
current_sequence BIGINT;
current_owner TEXT;
created_at TIMESTAMPTZ;
BEGIN
created_at := statement_timestamp();
FOR "aggregate" IN
SELECT DISTINCT
instance_id
, aggregate_type
, aggregate_id
FROM UNNEST(commands)
LOOP
SELECT
*
INTO
current_sequence
, current_owner
FROM eventstore.latest_aggregate_state(
"aggregate".instance_id
, "aggregate".aggregate_type
, "aggregate".aggregate_id
);
RETURN QUERY
SELECT
c.instance_id
, c.aggregate_type
, c.aggregate_id
, c.command_type -- AS event_type
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
, c.revision
, created_at
, c.payload
, c.creator
, COALESCE(current_owner, c.owner) -- AS owner
, EXTRACT(EPOCH FROM created_at) -- AS position
, c.ordinality::%s -- AS in_tx_order
FROM
UNNEST(commands) WITH ORDINALITY AS c
WHERE
c.instance_id = aggregate.instance_id
AND c.aggregate_type = aggregate.aggregate_type
AND c.aggregate_id = aggregate.aggregate_id;
END LOOP;
RETURN;
END;
$$;

View File

@@ -160,6 +160,7 @@ type Steps struct {
s61IDPTemplate6SAMLSignatureAlgorithm *IDPTemplate6SAMLSignatureAlgorithm
s62HTTPProviderAddSigningKey *HTTPProviderAddSigningKey
s63AlterResourceCounts *AlterResourceCounts
s64ChangePushPosition *ChangePushPosition
}
func MustNewSteps(v *viper.Viper) *Steps {

View File

@@ -221,6 +221,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s61IDPTemplate6SAMLSignatureAlgorithm = &IDPTemplate6SAMLSignatureAlgorithm{dbClient: dbClient}
steps.s62HTTPProviderAddSigningKey = &HTTPProviderAddSigningKey{dbClient: dbClient}
steps.s63AlterResourceCounts = &AlterResourceCounts{dbClient: dbClient}
steps.s64ChangePushPosition = &ChangePushPosition{dbClient: dbClient}
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections")
@@ -272,6 +273,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s61IDPTemplate6SAMLSignatureAlgorithm,
steps.s62HTTPProviderAddSigningKey,
steps.s63AlterResourceCounts,
steps.s64ChangePushPosition,
} {
setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed")
if setupErr != nil {