mirror of
https://github.com/zitadel/zitadel.git
synced 2025-11-01 00:46:23 +00:00
fix(projection): Prevent race condition with event push (#10676)
A timing issue (a race condition) was identified in our event processing system. Under specific circumstances, it was possible for the system to skip processing certain events, leading to potential data inconsistencies. ## Which problems are solved The system tracks its progress through the event log using timestamps. The issue occurred because we were using the timestamp from the start of a database transaction. If a query to read new events began after the transaction started but before the new event was committed, the query would not see the new event and would fail to process it. ## How the problems are solved The fix is to change which timestamp is used for tracking. We now use the precise timestamp of when the event is actually written to the database. This ensures that the event's timestamp is always correctly ordered, closing the timing gap and preventing the race condition. This change enhances the reliability and integrity of our event processing pipeline. It guarantees that all events are processed in the correct order and eliminates the risk of skipped events, ensuring data is always consistent across the system. ## Additional information original fix: https://github.com/zitadel/zitadel/pull/10560
This commit is contained in:
@@ -40,7 +40,9 @@ DECLARE
|
||||
"aggregate" RECORD;
|
||||
current_sequence BIGINT;
|
||||
current_owner TEXT;
|
||||
created_at TIMESTAMPTZ;
|
||||
BEGIN
|
||||
created_at := statement_timestamp();
|
||||
FOR "aggregate" IN
|
||||
SELECT DISTINCT
|
||||
instance_id
|
||||
@@ -67,11 +69,11 @@ BEGIN
|
||||
, c.command_type -- AS event_type
|
||||
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||
, c.revision
|
||||
, NOW() -- AS created_at
|
||||
, created_at
|
||||
, c.payload
|
||||
, c.creator
|
||||
, COALESCE(current_owner, c.owner) -- AS owner
|
||||
, EXTRACT(EPOCH FROM NOW()) -- AS position
|
||||
, EXTRACT(EPOCH FROM created_at) -- AS position
|
||||
, c.ordinality::{{ .InTxOrderType }} -- AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) WITH ORDINALITY AS c
|
||||
|
||||
44
cmd/setup/64.go
Normal file
44
cmd/setup/64.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 64.sql
|
||||
changePushPosition string
|
||||
)
|
||||
|
||||
type ChangePushPosition struct {
|
||||
dbClient *database.DB
|
||||
}
|
||||
|
||||
func (mig *ChangePushPosition) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
inTxOrderType, err := mig.inTxOrderType(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stmt := fmt.Sprintf(changePushPosition, inTxOrderType)
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mig *ChangePushPosition) String() string {
|
||||
return "64_change_push_position"
|
||||
}
|
||||
|
||||
func (mig *ChangePushPosition) inTxOrderType(ctx context.Context) (typeName string, err error) {
|
||||
err = mig.dbClient.QueryRowContext(ctx, func(row *sql.Row) error {
|
||||
return row.Scan(&typeName)
|
||||
}, `SELECT data_type FROM information_schema.columns WHERE table_schema = 'eventstore' AND table_name = 'events2' AND column_name = 'in_tx_order'`)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("get in_tx_order_type: %w", err)
|
||||
}
|
||||
return typeName, nil
|
||||
}
|
||||
55
cmd/setup/64.sql
Normal file
55
cmd/setup/64.sql
Normal file
@@ -0,0 +1,55 @@
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[])
|
||||
RETURNS SETOF eventstore.events2
|
||||
LANGUAGE 'plpgsql'
|
||||
STABLE PARALLEL SAFE
|
||||
ROWS 10
|
||||
AS $$
|
||||
DECLARE
|
||||
"aggregate" RECORD;
|
||||
current_sequence BIGINT;
|
||||
current_owner TEXT;
|
||||
created_at TIMESTAMPTZ;
|
||||
BEGIN
|
||||
created_at := statement_timestamp();
|
||||
FOR "aggregate" IN
|
||||
SELECT DISTINCT
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
FROM UNNEST(commands)
|
||||
LOOP
|
||||
SELECT
|
||||
*
|
||||
INTO
|
||||
current_sequence
|
||||
, current_owner
|
||||
FROM eventstore.latest_aggregate_state(
|
||||
"aggregate".instance_id
|
||||
, "aggregate".aggregate_type
|
||||
, "aggregate".aggregate_id
|
||||
);
|
||||
|
||||
RETURN QUERY
|
||||
SELECT
|
||||
c.instance_id
|
||||
, c.aggregate_type
|
||||
, c.aggregate_id
|
||||
, c.command_type -- AS event_type
|
||||
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||
, c.revision
|
||||
, created_at
|
||||
, c.payload
|
||||
, c.creator
|
||||
, COALESCE(current_owner, c.owner) -- AS owner
|
||||
, EXTRACT(EPOCH FROM created_at) -- AS position
|
||||
, c.ordinality::%s -- AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) WITH ORDINALITY AS c
|
||||
WHERE
|
||||
c.instance_id = aggregate.instance_id
|
||||
AND c.aggregate_type = aggregate.aggregate_type
|
||||
AND c.aggregate_id = aggregate.aggregate_id;
|
||||
END LOOP;
|
||||
RETURN;
|
||||
END;
|
||||
$$;
|
||||
@@ -160,6 +160,7 @@ type Steps struct {
|
||||
s61IDPTemplate6SAMLSignatureAlgorithm *IDPTemplate6SAMLSignatureAlgorithm
|
||||
s62HTTPProviderAddSigningKey *HTTPProviderAddSigningKey
|
||||
s63AlterResourceCounts *AlterResourceCounts
|
||||
s64ChangePushPosition *ChangePushPosition
|
||||
}
|
||||
|
||||
func MustNewSteps(v *viper.Viper) *Steps {
|
||||
|
||||
@@ -221,6 +221,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
||||
steps.s61IDPTemplate6SAMLSignatureAlgorithm = &IDPTemplate6SAMLSignatureAlgorithm{dbClient: dbClient}
|
||||
steps.s62HTTPProviderAddSigningKey = &HTTPProviderAddSigningKey{dbClient: dbClient}
|
||||
steps.s63AlterResourceCounts = &AlterResourceCounts{dbClient: dbClient}
|
||||
steps.s64ChangePushPosition = &ChangePushPosition{dbClient: dbClient}
|
||||
|
||||
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil)
|
||||
logging.OnError(err).Fatal("unable to start projections")
|
||||
@@ -272,6 +273,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
||||
steps.s61IDPTemplate6SAMLSignatureAlgorithm,
|
||||
steps.s62HTTPProviderAddSigningKey,
|
||||
steps.s63AlterResourceCounts,
|
||||
steps.s64ChangePushPosition,
|
||||
} {
|
||||
setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed")
|
||||
if setupErr != nil {
|
||||
|
||||
Reference in New Issue
Block a user