diff --git a/cmd/setup/40/02_func.sql b/cmd/setup/40/02_func.sql index 851547c2402..47681e721b7 100644 --- a/cmd/setup/40/02_func.sql +++ b/cmd/setup/40/02_func.sql @@ -40,7 +40,9 @@ DECLARE "aggregate" RECORD; current_sequence BIGINT; current_owner TEXT; + created_at TIMESTAMPTZ; BEGIN + created_at := statement_timestamp(); FOR "aggregate" IN SELECT DISTINCT instance_id @@ -67,11 +69,11 @@ BEGIN , c.command_type -- AS event_type , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence , c.revision - , NOW() -- AS created_at + , created_at , c.payload , c.creator , COALESCE(current_owner, c.owner) -- AS owner - , EXTRACT(EPOCH FROM NOW()) -- AS position + , EXTRACT(EPOCH FROM created_at) -- AS position , c.ordinality::{{ .InTxOrderType }} -- AS in_tx_order FROM UNNEST(commands) WITH ORDINALITY AS c diff --git a/cmd/setup/64.go b/cmd/setup/64.go new file mode 100644 index 00000000000..906e3855154 --- /dev/null +++ b/cmd/setup/64.go @@ -0,0 +1,44 @@ +package setup + +import ( + "context" + "database/sql" + _ "embed" + "fmt" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 64.sql + changePushPosition string +) + +type ChangePushPosition struct { + dbClient *database.DB +} + +func (mig *ChangePushPosition) Execute(ctx context.Context, _ eventstore.Event) error { + inTxOrderType, err := mig.inTxOrderType(ctx) + if err != nil { + return err + } + stmt := fmt.Sprintf(changePushPosition, inTxOrderType) + _, err = mig.dbClient.ExecContext(ctx, stmt) + return err +} + +func (mig *ChangePushPosition) String() string { + return "64_change_push_position" +} + +func (mig *ChangePushPosition) inTxOrderType(ctx context.Context) (typeName string, err error) { + err = mig.dbClient.QueryRowContext(ctx, func(row *sql.Row) error { + return row.Scan(&typeName) + }, `SELECT data_type FROM information_schema.columns WHERE table_schema = 'eventstore' AND table_name = 'events2' AND column_name = 'in_tx_order'`) + if err != nil { + return "", fmt.Errorf("get in_tx_order_type: %w", err) + } + return typeName, nil +} diff --git a/cmd/setup/64.sql b/cmd/setup/64.sql new file mode 100644 index 00000000000..6da95fdf9d6 --- /dev/null +++ b/cmd/setup/64.sql @@ -0,0 +1,55 @@ +CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) + RETURNS SETOF eventstore.events2 + LANGUAGE 'plpgsql' + STABLE PARALLEL SAFE + ROWS 10 +AS $$ +DECLARE + "aggregate" RECORD; + current_sequence BIGINT; + current_owner TEXT; + created_at TIMESTAMPTZ; +BEGIN + created_at := statement_timestamp(); + FOR "aggregate" IN + SELECT DISTINCT + instance_id + , aggregate_type + , aggregate_id + FROM UNNEST(commands) + LOOP + SELECT + * + INTO + current_sequence + , current_owner + FROM eventstore.latest_aggregate_state( + "aggregate".instance_id + , "aggregate".aggregate_type + , "aggregate".aggregate_id + ); + + RETURN QUERY + SELECT + c.instance_id + , c.aggregate_type + , c.aggregate_id + , c.command_type -- AS event_type + , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence + , c.revision + , created_at + , c.payload + , c.creator + , COALESCE(current_owner, c.owner) -- AS owner + , EXTRACT(EPOCH FROM created_at) -- AS position + , c.ordinality::%s -- AS in_tx_order + FROM + UNNEST(commands) WITH ORDINALITY AS c + WHERE + c.instance_id = aggregate.instance_id + AND c.aggregate_type = aggregate.aggregate_type + AND c.aggregate_id = aggregate.aggregate_id; + END LOOP; + RETURN; +END; +$$; diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 1cf39ecab14..00cfc5339c7 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -160,6 +160,7 @@ type Steps struct { s61IDPTemplate6SAMLSignatureAlgorithm *IDPTemplate6SAMLSignatureAlgorithm s62HTTPProviderAddSigningKey *HTTPProviderAddSigningKey s63AlterResourceCounts *AlterResourceCounts + s64ChangePushPosition *ChangePushPosition } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 320523e6867..279415154d1 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -221,6 +221,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s61IDPTemplate6SAMLSignatureAlgorithm = &IDPTemplate6SAMLSignatureAlgorithm{dbClient: dbClient} steps.s62HTTPProviderAddSigningKey = &HTTPProviderAddSigningKey{dbClient: dbClient} steps.s63AlterResourceCounts = &AlterResourceCounts{dbClient: dbClient} + steps.s64ChangePushPosition = &ChangePushPosition{dbClient: dbClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -272,6 +273,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s61IDPTemplate6SAMLSignatureAlgorithm, steps.s62HTTPProviderAddSigningKey, steps.s63AlterResourceCounts, + steps.s64ChangePushPosition, } { setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed") if setupErr != nil {