mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-27 15:26:32 +00:00
56 lines
1.6 KiB
MySQL
56 lines
1.6 KiB
MySQL
|
|
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[])
|
||
|
|
RETURNS SETOF eventstore.events2
|
||
|
|
LANGUAGE 'plpgsql'
|
||
|
|
STABLE PARALLEL SAFE
|
||
|
|
ROWS 10
|
||
|
|
AS $$
|
||
|
|
DECLARE
|
||
|
|
"aggregate" RECORD;
|
||
|
|
current_sequence BIGINT;
|
||
|
|
current_owner TEXT;
|
||
|
|
created_at TIMESTAMPTZ;
|
||
|
|
BEGIN
|
||
|
|
created_at := statement_timestamp();
|
||
|
|
FOR "aggregate" IN
|
||
|
|
SELECT DISTINCT
|
||
|
|
instance_id
|
||
|
|
, aggregate_type
|
||
|
|
, aggregate_id
|
||
|
|
FROM UNNEST(commands)
|
||
|
|
LOOP
|
||
|
|
SELECT
|
||
|
|
*
|
||
|
|
INTO
|
||
|
|
current_sequence
|
||
|
|
, current_owner
|
||
|
|
FROM eventstore.latest_aggregate_state(
|
||
|
|
"aggregate".instance_id
|
||
|
|
, "aggregate".aggregate_type
|
||
|
|
, "aggregate".aggregate_id
|
||
|
|
);
|
||
|
|
|
||
|
|
RETURN QUERY
|
||
|
|
SELECT
|
||
|
|
c.instance_id
|
||
|
|
, c.aggregate_type
|
||
|
|
, c.aggregate_id
|
||
|
|
, c.command_type -- AS event_type
|
||
|
|
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||
|
|
, c.revision
|
||
|
|
, created_at
|
||
|
|
, c.payload
|
||
|
|
, c.creator
|
||
|
|
, COALESCE(current_owner, c.owner) -- AS owner
|
||
|
|
, EXTRACT(EPOCH FROM created_at) -- AS position
|
||
|
|
, c.ordinality::%s -- AS in_tx_order
|
||
|
|
FROM
|
||
|
|
UNNEST(commands) WITH ORDINALITY AS c
|
||
|
|
WHERE
|
||
|
|
c.instance_id = aggregate.instance_id
|
||
|
|
AND c.aggregate_type = aggregate.aggregate_type
|
||
|
|
AND c.aggregate_id = aggregate.aggregate_id;
|
||
|
|
END LOOP;
|
||
|
|
RETURN;
|
||
|
|
END;
|
||
|
|
$$;
|