mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 18:57:32 +00:00
perf(eventstore): fast push on crdb (#9186)
# Which Problems Are Solved The performance of the initial push function can further be increased # How the Problems Are Solved `eventstore.push`- and `eventstore.commands_to_events`-functions were rewritten # Additional Changes none # Additional Context same optimizations as for postgres: https://github.com/zitadel/zitadel/pull/9092
This commit is contained in:
@@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mig *InitPushFunc) String() string {
|
func (mig *InitPushFunc) String() string {
|
||||||
return "40_init_push_func_v2"
|
return "40_init_push_func_v3"
|
||||||
}
|
}
|
||||||
|
@@ -10,8 +10,132 @@ CREATE TYPE IF NOT EXISTS eventstore.command AS (
|
|||||||
, owner TEXT
|
, owner TEXT
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state(
|
||||||
|
instance_id TEXT
|
||||||
|
, aggregate_type TEXT
|
||||||
|
, aggregate_id TEXT
|
||||||
|
|
||||||
|
, sequence OUT BIGINT
|
||||||
|
, owner OUT TEXT
|
||||||
|
)
|
||||||
|
LANGUAGE 'plpgsql'
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
SELECT
|
||||||
|
COALESCE(e.sequence, 0) AS sequence
|
||||||
|
, e.owner
|
||||||
|
INTO
|
||||||
|
sequence
|
||||||
|
, owner
|
||||||
|
FROM
|
||||||
|
eventstore.events2 e
|
||||||
|
WHERE
|
||||||
|
e.instance_id = $1
|
||||||
|
AND e.aggregate_type = $2
|
||||||
|
AND e.aggregate_id = $3
|
||||||
|
ORDER BY
|
||||||
|
e.sequence DESC
|
||||||
|
LIMIT 1;
|
||||||
|
|
||||||
|
RETURN;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION eventstore.commands_to_events2(commands eventstore.command[])
|
||||||
|
RETURNS eventstore.events2[]
|
||||||
|
LANGUAGE 'plpgsql'
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
current_sequence BIGINT;
|
||||||
|
current_owner TEXT;
|
||||||
|
|
||||||
|
instance_id TEXT;
|
||||||
|
aggregate_type TEXT;
|
||||||
|
aggregate_id TEXT;
|
||||||
|
|
||||||
|
_events eventstore.events2[];
|
||||||
|
|
||||||
|
_aggregates CURSOR FOR
|
||||||
|
select
|
||||||
|
DISTINCT ("c").instance_id
|
||||||
|
, ("c").aggregate_type
|
||||||
|
, ("c").aggregate_id
|
||||||
|
FROM
|
||||||
|
UNNEST(commands) AS c;
|
||||||
|
BEGIN
|
||||||
|
OPEN _aggregates;
|
||||||
|
LOOP
|
||||||
|
FETCH NEXT IN _aggregates INTO instance_id, aggregate_type, aggregate_id;
|
||||||
|
-- crdb does not support EXIT WHEN NOT FOUND
|
||||||
|
EXIT WHEN instance_id IS NULL;
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
*
|
||||||
|
INTO
|
||||||
|
current_sequence
|
||||||
|
, current_owner
|
||||||
|
FROM eventstore.latest_aggregate_state(
|
||||||
|
instance_id
|
||||||
|
, aggregate_type
|
||||||
|
, aggregate_id
|
||||||
|
);
|
||||||
|
|
||||||
|
-- RETURN QUERY is not supported by crdb: https://github.com/cockroachdb/cockroach/issues/105240
|
||||||
|
SELECT
|
||||||
|
ARRAY_CAT(_events, ARRAY_AGG(e))
|
||||||
|
INTO
|
||||||
|
_events
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
("c").instance_id
|
||||||
|
, ("c").aggregate_type
|
||||||
|
, ("c").aggregate_id
|
||||||
|
, ("c").command_type -- AS event_type
|
||||||
|
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||||
|
, ("c").revision
|
||||||
|
, NOW() -- AS created_at
|
||||||
|
, ("c").payload
|
||||||
|
, ("c").creator
|
||||||
|
, COALESCE(current_owner, ("c").owner) -- AS owner
|
||||||
|
, EXTRACT(EPOCH FROM NOW()) -- AS position
|
||||||
|
, ordinality::INT -- AS in_tx_order
|
||||||
|
FROM
|
||||||
|
UNNEST(commands) WITH ORDINALITY AS c
|
||||||
|
WHERE
|
||||||
|
("c").instance_id = instance_id
|
||||||
|
AND ("c").aggregate_type = aggregate_type
|
||||||
|
AND ("c").aggregate_id = aggregate_id
|
||||||
|
) AS e;
|
||||||
|
END LOOP;
|
||||||
|
CLOSE _aggregates;
|
||||||
|
RETURN _events;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||||
|
INSERT INTO eventstore.events2
|
||||||
|
SELECT
|
||||||
|
("e").instance_id
|
||||||
|
, ("e").aggregate_type
|
||||||
|
, ("e").aggregate_id
|
||||||
|
, ("e").event_type
|
||||||
|
, ("e").sequence
|
||||||
|
, ("e").revision
|
||||||
|
, ("e").created_at
|
||||||
|
, ("e").payload
|
||||||
|
, ("e").creator
|
||||||
|
, ("e").owner
|
||||||
|
, ("e")."position"
|
||||||
|
, ("e").in_tx_order
|
||||||
|
FROM
|
||||||
|
UNNEST(eventstore.commands_to_events2(commands)) e
|
||||||
|
ORDER BY
|
||||||
|
in_tx_order
|
||||||
|
RETURNING *
|
||||||
|
$$ LANGUAGE SQL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
select * from eventstore.commands_to_events(
|
select (c).* from UNNEST(eventstore.commands_to_events2(
|
||||||
ARRAY[
|
ARRAY[
|
||||||
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||||
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||||
@@ -20,88 +144,6 @@ ARRAY[
|
|||||||
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||||
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||||
]::eventstore.command[]
|
]::eventstore.command[]
|
||||||
);
|
) )c;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$
|
|
||||||
SELECT
|
|
||||||
("c").instance_id
|
|
||||||
, ("c").aggregate_type
|
|
||||||
, ("c").aggregate_id
|
|
||||||
, ("c").command_type AS event_type
|
|
||||||
, cs.sequence + ROW_NUMBER() OVER (PARTITION BY ("c").instance_id, ("c").aggregate_type, ("c").aggregate_id ORDER BY ("c").in_tx_order) AS sequence
|
|
||||||
, ("c").revision
|
|
||||||
, hlc_to_timestamp(cluster_logical_timestamp()) AS created_at
|
|
||||||
, ("c").payload
|
|
||||||
, ("c").creator
|
|
||||||
, cs.owner
|
|
||||||
, cluster_logical_timestamp() AS position
|
|
||||||
, ("c").in_tx_order
|
|
||||||
FROM (
|
|
||||||
SELECT
|
|
||||||
("c").instance_id
|
|
||||||
, ("c").aggregate_type
|
|
||||||
, ("c").aggregate_id
|
|
||||||
, ("c").command_type
|
|
||||||
, ("c").revision
|
|
||||||
, ("c").payload
|
|
||||||
, ("c").creator
|
|
||||||
, ("c").owner
|
|
||||||
, ROW_NUMBER() OVER () AS in_tx_order
|
|
||||||
FROM
|
|
||||||
UNNEST(commands) AS "c"
|
|
||||||
) AS "c"
|
|
||||||
JOIN (
|
|
||||||
SELECT
|
|
||||||
cmds.instance_id
|
|
||||||
, cmds.aggregate_type
|
|
||||||
, cmds.aggregate_id
|
|
||||||
, CASE WHEN (e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner
|
|
||||||
, COALESCE(MAX(e.sequence), 0) AS sequence
|
|
||||||
FROM (
|
|
||||||
SELECT DISTINCT
|
|
||||||
("cmds").instance_id
|
|
||||||
, ("cmds").aggregate_type
|
|
||||||
, ("cmds").aggregate_id
|
|
||||||
, ("cmds").owner
|
|
||||||
FROM UNNEST(commands) AS "cmds"
|
|
||||||
) AS cmds
|
|
||||||
LEFT JOIN eventstore.events2 AS e
|
|
||||||
ON cmds.instance_id = e.instance_id
|
|
||||||
AND cmds.aggregate_type = e.aggregate_type
|
|
||||||
AND cmds.aggregate_id = e.aggregate_id
|
|
||||||
JOIN (
|
|
||||||
SELECT
|
|
||||||
DISTINCT ON (
|
|
||||||
("c").instance_id
|
|
||||||
, ("c").aggregate_type
|
|
||||||
, ("c").aggregate_id
|
|
||||||
)
|
|
||||||
("c").instance_id
|
|
||||||
, ("c").aggregate_type
|
|
||||||
, ("c").aggregate_id
|
|
||||||
, ("c").owner
|
|
||||||
FROM
|
|
||||||
UNNEST(commands) AS "c"
|
|
||||||
) AS command_owners ON
|
|
||||||
cmds.instance_id = command_owners.instance_id
|
|
||||||
AND cmds.aggregate_type = command_owners.aggregate_type
|
|
||||||
AND cmds.aggregate_id = command_owners.aggregate_id
|
|
||||||
GROUP BY
|
|
||||||
cmds.instance_id
|
|
||||||
, cmds.aggregate_type
|
|
||||||
, cmds.aggregate_id
|
|
||||||
, 4 -- owner
|
|
||||||
) AS cs
|
|
||||||
ON ("c").instance_id = cs.instance_id
|
|
||||||
AND ("c").aggregate_type = cs.aggregate_type
|
|
||||||
AND ("c").aggregate_id = cs.aggregate_id
|
|
||||||
ORDER BY
|
|
||||||
in_tx_order
|
|
||||||
$$ LANGUAGE SQL;
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
|
||||||
INSERT INTO eventstore.events2
|
|
||||||
SELECT * FROM eventstore.commands_to_events(commands)
|
|
||||||
RETURNING *
|
|
||||||
$$ LANGUAGE SQL;
|
|
@@ -30,7 +30,7 @@ services:
|
|||||||
|
|
||||||
db:
|
db:
|
||||||
restart: 'always'
|
restart: 'always'
|
||||||
image: 'cockroachdb/cockroach:v24.2.1'
|
image: 'cockroachdb/cockroach:latest'
|
||||||
command: 'start-single-node --insecure --http-addr :9090'
|
command: 'start-single-node --insecure --http-addr :9090'
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ['CMD', 'curl', '-f', 'http://localhost:9090/health?ready=1']
|
test: ['CMD', 'curl', '-f', 'http://localhost:9090/health?ready=1']
|
||||||
|
Reference in New Issue
Block a user