mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 20:57:31 +00:00
perf(eventstore): fast push on crdb (#9186)
# Which Problems Are Solved
The performance of the initial push function can further be increased
# How the Problems Are Solved
`eventstore.push`- and `eventstore.commands_to_events`-functions were
rewritten
# Additional Changes
none
# Additional Context
same optimizations as for postgres:
https://github.com/zitadel/zitadel/pull/9092
(cherry picked from commit 690147b30e
)
This commit is contained in:
@@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e
|
||||
}
|
||||
|
||||
func (mig *InitPushFunc) String() string {
|
||||
return "40_init_push_func_v2"
|
||||
return "40_init_push_func_v3"
|
||||
}
|
||||
|
@@ -10,8 +10,132 @@ CREATE TYPE IF NOT EXISTS eventstore.command AS (
|
||||
, owner TEXT
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state(
|
||||
instance_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, aggregate_id TEXT
|
||||
|
||||
, sequence OUT BIGINT
|
||||
, owner OUT TEXT
|
||||
)
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
BEGIN
|
||||
SELECT
|
||||
COALESCE(e.sequence, 0) AS sequence
|
||||
, e.owner
|
||||
INTO
|
||||
sequence
|
||||
, owner
|
||||
FROM
|
||||
eventstore.events2 e
|
||||
WHERE
|
||||
e.instance_id = $1
|
||||
AND e.aggregate_type = $2
|
||||
AND e.aggregate_id = $3
|
||||
ORDER BY
|
||||
e.sequence DESC
|
||||
LIMIT 1;
|
||||
|
||||
RETURN;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events2(commands eventstore.command[])
|
||||
RETURNS eventstore.events2[]
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
DECLARE
|
||||
current_sequence BIGINT;
|
||||
current_owner TEXT;
|
||||
|
||||
instance_id TEXT;
|
||||
aggregate_type TEXT;
|
||||
aggregate_id TEXT;
|
||||
|
||||
_events eventstore.events2[];
|
||||
|
||||
_aggregates CURSOR FOR
|
||||
select
|
||||
DISTINCT ("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
FROM
|
||||
UNNEST(commands) AS c;
|
||||
BEGIN
|
||||
OPEN _aggregates;
|
||||
LOOP
|
||||
FETCH NEXT IN _aggregates INTO instance_id, aggregate_type, aggregate_id;
|
||||
-- crdb does not support EXIT WHEN NOT FOUND
|
||||
EXIT WHEN instance_id IS NULL;
|
||||
|
||||
SELECT
|
||||
*
|
||||
INTO
|
||||
current_sequence
|
||||
, current_owner
|
||||
FROM eventstore.latest_aggregate_state(
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
);
|
||||
|
||||
-- RETURN QUERY is not supported by crdb: https://github.com/cockroachdb/cockroach/issues/105240
|
||||
SELECT
|
||||
ARRAY_CAT(_events, ARRAY_AGG(e))
|
||||
INTO
|
||||
_events
|
||||
FROM (
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type -- AS event_type
|
||||
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||
, ("c").revision
|
||||
, NOW() -- AS created_at
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, COALESCE(current_owner, ("c").owner) -- AS owner
|
||||
, EXTRACT(EPOCH FROM NOW()) -- AS position
|
||||
, ordinality::INT -- AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) WITH ORDINALITY AS c
|
||||
WHERE
|
||||
("c").instance_id = instance_id
|
||||
AND ("c").aggregate_type = aggregate_type
|
||||
AND ("c").aggregate_id = aggregate_id
|
||||
) AS e;
|
||||
END LOOP;
|
||||
CLOSE _aggregates;
|
||||
RETURN _events;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||
INSERT INTO eventstore.events2
|
||||
SELECT
|
||||
("e").instance_id
|
||||
, ("e").aggregate_type
|
||||
, ("e").aggregate_id
|
||||
, ("e").event_type
|
||||
, ("e").sequence
|
||||
, ("e").revision
|
||||
, ("e").created_at
|
||||
, ("e").payload
|
||||
, ("e").creator
|
||||
, ("e").owner
|
||||
, ("e")."position"
|
||||
, ("e").in_tx_order
|
||||
FROM
|
||||
UNNEST(eventstore.commands_to_events2(commands)) e
|
||||
ORDER BY
|
||||
in_tx_order
|
||||
RETURNING *
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
/*
|
||||
select * from eventstore.commands_to_events(
|
||||
select (c).* from UNNEST(eventstore.commands_to_events2(
|
||||
ARRAY[
|
||||
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
@@ -20,88 +144,6 @@ ARRAY[
|
||||
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
]::eventstore.command[]
|
||||
);
|
||||
) )c;
|
||||
*/
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type AS event_type
|
||||
, cs.sequence + ROW_NUMBER() OVER (PARTITION BY ("c").instance_id, ("c").aggregate_type, ("c").aggregate_id ORDER BY ("c").in_tx_order) AS sequence
|
||||
, ("c").revision
|
||||
, hlc_to_timestamp(cluster_logical_timestamp()) AS created_at
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, cs.owner
|
||||
, cluster_logical_timestamp() AS position
|
||||
, ("c").in_tx_order
|
||||
FROM (
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type
|
||||
, ("c").revision
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, ("c").owner
|
||||
, ROW_NUMBER() OVER () AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) AS "c"
|
||||
) AS "c"
|
||||
JOIN (
|
||||
SELECT
|
||||
cmds.instance_id
|
||||
, cmds.aggregate_type
|
||||
, cmds.aggregate_id
|
||||
, CASE WHEN (e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner
|
||||
, COALESCE(MAX(e.sequence), 0) AS sequence
|
||||
FROM (
|
||||
SELECT DISTINCT
|
||||
("cmds").instance_id
|
||||
, ("cmds").aggregate_type
|
||||
, ("cmds").aggregate_id
|
||||
, ("cmds").owner
|
||||
FROM UNNEST(commands) AS "cmds"
|
||||
) AS cmds
|
||||
LEFT JOIN eventstore.events2 AS e
|
||||
ON cmds.instance_id = e.instance_id
|
||||
AND cmds.aggregate_type = e.aggregate_type
|
||||
AND cmds.aggregate_id = e.aggregate_id
|
||||
JOIN (
|
||||
SELECT
|
||||
DISTINCT ON (
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
)
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").owner
|
||||
FROM
|
||||
UNNEST(commands) AS "c"
|
||||
) AS command_owners ON
|
||||
cmds.instance_id = command_owners.instance_id
|
||||
AND cmds.aggregate_type = command_owners.aggregate_type
|
||||
AND cmds.aggregate_id = command_owners.aggregate_id
|
||||
GROUP BY
|
||||
cmds.instance_id
|
||||
, cmds.aggregate_type
|
||||
, cmds.aggregate_id
|
||||
, 4 -- owner
|
||||
) AS cs
|
||||
ON ("c").instance_id = cs.instance_id
|
||||
AND ("c").aggregate_type = cs.aggregate_type
|
||||
AND ("c").aggregate_id = cs.aggregate_id
|
||||
ORDER BY
|
||||
in_tx_order
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||
INSERT INTO eventstore.events2
|
||||
SELECT * FROM eventstore.commands_to_events(commands)
|
||||
RETURNING *
|
||||
$$ LANGUAGE SQL;
|
@@ -30,7 +30,7 @@ services:
|
||||
|
||||
db:
|
||||
restart: 'always'
|
||||
image: 'cockroachdb/cockroach:v24.2.1'
|
||||
image: 'cockroachdb/cockroach:latest'
|
||||
command: 'start-single-node --insecure --http-addr :9090'
|
||||
healthcheck:
|
||||
test: ['CMD', 'curl', '-f', 'http://localhost:9090/health?ready=1']
|
||||
|
Reference in New Issue
Block a user