mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 11:47:34 +00:00
fix(setup): use template for in_tx_order type (#9346)
# Which Problems Are Solved
Systems running with PostgreSQL before Zitadel v2.39 are likely to have
a wrong type for the `in_tx_order` column in the `eventstore.event2`
table. The migration at the time used the `event_sequence` as default
value without typecast, which results in a `bigint` type for that
column. However, when creating the table from scratch, we explicitly
specify the type to be `integer`.
Starting from Zitadel v2.67 we use a Pl/PgSQL function to push events.
The function requires the types from `eventstore.events2` to the same as
the `select` destinations used in the function. In the function
`in_tx_order` is also expected to by of `integer` type.
CochroachDB systems are not affected because `bigint` is an alias to the
`int` type. In other words, CockroachDB uses `int8` when specifying type
`int`. Therefore the types already match.
# How the Problems Are Solved
Retrieve the actual column type currently in use. A template is used to
assign the type to the `ordinality` column returned as `in_tx_order`.
# Additional Changes
- Detailed logging on migration failure
# Additional Context
- Closes #9180
---------
Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
(cherry picked from commit bcc6a689fa
)
This commit is contained in:

committed by
Livio Spring

parent
464a4718df
commit
92265dca21
137
cmd/setup/40/cockroach/02_func.sql
Normal file
137
cmd/setup/40/cockroach/02_func.sql
Normal file
@@ -0,0 +1,137 @@
|
||||
CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state(
|
||||
instance_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, aggregate_id TEXT
|
||||
|
||||
, sequence OUT BIGINT
|
||||
, owner OUT TEXT
|
||||
)
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
BEGIN
|
||||
SELECT
|
||||
COALESCE(e.sequence, 0) AS sequence
|
||||
, e.owner
|
||||
INTO
|
||||
sequence
|
||||
, owner
|
||||
FROM
|
||||
eventstore.events2 e
|
||||
WHERE
|
||||
e.instance_id = $1
|
||||
AND e.aggregate_type = $2
|
||||
AND e.aggregate_id = $3
|
||||
ORDER BY
|
||||
e.sequence DESC
|
||||
LIMIT 1;
|
||||
|
||||
RETURN;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events2(commands eventstore.command[])
|
||||
RETURNS eventstore.events2[]
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
DECLARE
|
||||
current_sequence BIGINT;
|
||||
current_owner TEXT;
|
||||
|
||||
instance_id TEXT;
|
||||
aggregate_type TEXT;
|
||||
aggregate_id TEXT;
|
||||
|
||||
_events eventstore.events2[];
|
||||
|
||||
_aggregates CURSOR FOR
|
||||
select
|
||||
DISTINCT ("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
FROM
|
||||
UNNEST(commands) AS c;
|
||||
BEGIN
|
||||
OPEN _aggregates;
|
||||
LOOP
|
||||
FETCH NEXT IN _aggregates INTO instance_id, aggregate_type, aggregate_id;
|
||||
-- crdb does not support EXIT WHEN NOT FOUND
|
||||
EXIT WHEN instance_id IS NULL;
|
||||
|
||||
SELECT
|
||||
*
|
||||
INTO
|
||||
current_sequence
|
||||
, current_owner
|
||||
FROM eventstore.latest_aggregate_state(
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
);
|
||||
|
||||
-- RETURN QUERY is not supported by crdb: https://github.com/cockroachdb/cockroach/issues/105240
|
||||
SELECT
|
||||
ARRAY_CAT(_events, ARRAY_AGG(e))
|
||||
INTO
|
||||
_events
|
||||
FROM (
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type -- AS event_type
|
||||
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||
, ("c").revision
|
||||
, NOW() -- AS created_at
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, COALESCE(current_owner, ("c").owner) -- AS owner
|
||||
, cluster_logical_timestamp() -- AS position
|
||||
, ordinality::{{ .InTxOrderType }} -- AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) WITH ORDINALITY AS c
|
||||
WHERE
|
||||
("c").instance_id = instance_id
|
||||
AND ("c").aggregate_type = aggregate_type
|
||||
AND ("c").aggregate_id = aggregate_id
|
||||
) AS e;
|
||||
END LOOP;
|
||||
CLOSE _aggregates;
|
||||
RETURN _events;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||
INSERT INTO eventstore.events2
|
||||
SELECT
|
||||
("e").instance_id
|
||||
, ("e").aggregate_type
|
||||
, ("e").aggregate_id
|
||||
, ("e").event_type
|
||||
, ("e").sequence
|
||||
, ("e").revision
|
||||
, ("e").created_at
|
||||
, ("e").payload
|
||||
, ("e").creator
|
||||
, ("e").owner
|
||||
, ("e")."position"
|
||||
, ("e").in_tx_order
|
||||
FROM
|
||||
UNNEST(eventstore.commands_to_events2(commands)) e
|
||||
ORDER BY
|
||||
in_tx_order
|
||||
RETURNING *
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
/*
|
||||
select (c).* from UNNEST(eventstore.commands_to_events2(
|
||||
ARRAY[
|
||||
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
, ROW('289525561255060732', 'org', '289575074711790844', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('289525561255060732', 'user', '289575075164906748', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
]::eventstore.command[]
|
||||
) )c;
|
||||
*/
|
||||
|
Reference in New Issue
Block a user