mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-11 13:33:41 +00:00
107 lines
3.6 KiB
MySQL
107 lines
3.6 KiB
MySQL
|
-- represents an event to be created.
|
||
|
CREATE TYPE IF NOT EXISTS eventstore.command AS (
|
||
|
instance_id TEXT
|
||
|
, aggregate_type TEXT
|
||
|
, aggregate_id TEXT
|
||
|
, command_type TEXT
|
||
|
, revision INT2
|
||
|
, payload JSONB
|
||
|
, creator TEXT
|
||
|
, owner TEXT
|
||
|
);
|
||
|
|
||
|
/*
|
||
|
select * from eventstore.commands_to_events(
|
||
|
ARRAY[
|
||
|
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||
|
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||
|
, ROW('289525561255060732', 'org', '289575074711790844', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||
|
, ROW('289525561255060732', 'user', '289575075164906748', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||
|
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||
|
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||
|
]::eventstore.command[]
|
||
|
);
|
||
|
*/
|
||
|
|
||
|
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$
|
||
|
SELECT
|
||
|
("c").instance_id
|
||
|
, ("c").aggregate_type
|
||
|
, ("c").aggregate_id
|
||
|
, ("c").command_type AS event_type
|
||
|
, cs.sequence + ROW_NUMBER() OVER (PARTITION BY ("c").instance_id, ("c").aggregate_type, ("c").aggregate_id ORDER BY ("c").in_tx_order) AS sequence
|
||
|
, ("c").revision
|
||
|
, hlc_to_timestamp(cluster_logical_timestamp()) AS created_at
|
||
|
, ("c").payload
|
||
|
, ("c").creator
|
||
|
, cs.owner
|
||
|
, cluster_logical_timestamp() AS position
|
||
|
, ("c").in_tx_order
|
||
|
FROM (
|
||
|
SELECT
|
||
|
("c").instance_id
|
||
|
, ("c").aggregate_type
|
||
|
, ("c").aggregate_id
|
||
|
, ("c").command_type
|
||
|
, ("c").revision
|
||
|
, ("c").payload
|
||
|
, ("c").creator
|
||
|
, ("c").owner
|
||
|
, ROW_NUMBER() OVER () AS in_tx_order
|
||
|
FROM
|
||
|
UNNEST(commands) AS "c"
|
||
|
) AS "c"
|
||
|
JOIN (
|
||
|
SELECT
|
||
|
cmds.instance_id
|
||
|
, cmds.aggregate_type
|
||
|
, cmds.aggregate_id
|
||
|
, CASE WHEN (e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner
|
||
|
, COALESCE(MAX(e.sequence), 0) AS sequence
|
||
|
FROM (
|
||
|
SELECT DISTINCT
|
||
|
("cmds").instance_id
|
||
|
, ("cmds").aggregate_type
|
||
|
, ("cmds").aggregate_id
|
||
|
, ("cmds").owner
|
||
|
FROM UNNEST(commands) AS "cmds"
|
||
|
) AS cmds
|
||
|
LEFT JOIN eventstore.events2 AS e
|
||
|
ON cmds.instance_id = e.instance_id
|
||
|
AND cmds.aggregate_type = e.aggregate_type
|
||
|
AND cmds.aggregate_id = e.aggregate_id
|
||
|
JOIN (
|
||
|
SELECT
|
||
|
DISTINCT ON (
|
||
|
("c").instance_id
|
||
|
, ("c").aggregate_type
|
||
|
, ("c").aggregate_id
|
||
|
)
|
||
|
("c").instance_id
|
||
|
, ("c").aggregate_type
|
||
|
, ("c").aggregate_id
|
||
|
, ("c").owner
|
||
|
FROM
|
||
|
UNNEST(commands) AS "c"
|
||
|
) AS command_owners ON
|
||
|
cmds.instance_id = command_owners.instance_id
|
||
|
AND cmds.aggregate_type = command_owners.aggregate_type
|
||
|
AND cmds.aggregate_id = command_owners.aggregate_id
|
||
|
GROUP BY
|
||
|
cmds.instance_id
|
||
|
, cmds.aggregate_type
|
||
|
, cmds.aggregate_id
|
||
|
, 4 -- owner
|
||
|
) AS cs
|
||
|
ON ("c").instance_id = cs.instance_id
|
||
|
AND ("c").aggregate_type = cs.aggregate_type
|
||
|
AND ("c").aggregate_id = cs.aggregate_id
|
||
|
ORDER BY
|
||
|
in_tx_order
|
||
|
$$ LANGUAGE SQL;
|
||
|
|
||
|
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||
|
INSERT INTO eventstore.events2
|
||
|
SELECT * FROM eventstore.commands_to_events(commands)
|
||
|
RETURNING *
|
||
|
$$ LANGUAGE SQL;
|