perf(eventstore): optimize commands to events function (#9092)

# Which Problems Are Solved

We were seeing high query costs in a the lateral join executed in the
commands_to_events procedural function in the database. The high cost
resulted in incremental CPU usage as a load test continued and less
req/sec handled, sarting at 836 and ending at 130 req/sec.

# How the Problems Are Solved

1. Set `PARALLEL SAFE`. I noticed that this option defaults to `UNSAFE`.
But it's actually safe if the function doesn't `INSERT`
2. Set the returned `ROWS 10` parameter.
3. Function is re-written in Pl/PgSQL so that we eliminate expensive
joins.
4. Introduced an intermediate state that does `SELECT DISTINCT` for the
aggregate so that we don't have to do an expensive lateral join.

# Additional Changes

Use a `COALESCE` to get the owner from the last event, instead of a
`CASE` switch.

# Additional Context

- Function was introduced in
https://github.com/zitadel/zitadel/pull/8816
- Closes https://github.com/zitadel/zitadel/issues/8352

---------

Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
This commit is contained in:
Tim Möhlmann 2025-01-08 13:59:44 +02:00 committed by GitHub
parent c966446f80
commit df2c6f1d4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 70 deletions

View File

@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e
} }
func (mig *InitPushFunc) String() string { func (mig *InitPushFunc) String() string {
return "40_init_push_func" return "40_init_push_func_v2"
} }

View File

@ -1,82 +1,92 @@
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state(
SELECT instance_id TEXT
c.instance_id , aggregate_type TEXT
, c.aggregate_type , aggregate_id TEXT
, c.aggregate_id
, c.command_type AS event_type , sequence OUT BIGINT
, cs.sequence + ROW_NUMBER() OVER (PARTITION BY c.instance_id, c.aggregate_type, c.aggregate_id ORDER BY c.in_tx_order) AS sequence , owner OUT TEXT
, c.revision )
, NOW() AS created_at LANGUAGE 'plpgsql'
, c.payload STABLE PARALLEL SAFE
, c.creator AS $$
, cs.owner BEGIN
, EXTRACT(EPOCH FROM NOW()) AS position SELECT
, c.in_tx_order COALESCE(e.sequence, 0) AS sequence
FROM ( , e.owner
SELECT INTO
c.instance_id sequence
, c.aggregate_type , owner
, c.aggregate_id FROM
, c.command_type eventstore.events2 e
, c.revision WHERE
, c.payload e.instance_id = $1
, c.creator AND e.aggregate_type = $2
, c.owner AND e.aggregate_id = $3
, ROW_NUMBER() OVER () AS in_tx_order ORDER BY
FROM e.sequence DESC
UNNEST(commands) AS c LIMIT 1;
) AS c
JOIN ( RETURN;
SELECT END;
cmds.instance_id $$;
, cmds.aggregate_type
, cmds.aggregate_id CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[])
, CASE WHEN (e.owner IS NOT NULL OR e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner RETURNS SETOF eventstore.events2
, COALESCE(MAX(e.sequence), 0) AS sequence LANGUAGE 'plpgsql'
FROM ( STABLE PARALLEL SAFE
ROWS 10
AS $$
DECLARE
"aggregate" RECORD;
current_sequence BIGINT;
current_owner TEXT;
BEGIN
FOR "aggregate" IN
SELECT DISTINCT SELECT DISTINCT
instance_id instance_id
, aggregate_type , aggregate_type
, aggregate_id , aggregate_id
, owner
FROM UNNEST(commands) FROM UNNEST(commands)
) AS cmds LOOP
LEFT JOIN eventstore.events2 AS e SELECT
ON cmds.instance_id = e.instance_id *
AND cmds.aggregate_type = e.aggregate_type INTO
AND cmds.aggregate_id = e.aggregate_id current_sequence
JOIN ( , current_owner
FROM eventstore.latest_aggregate_state(
"aggregate".instance_id
, "aggregate".aggregate_type
, "aggregate".aggregate_id
);
RETURN QUERY
SELECT SELECT
DISTINCT ON ( c.instance_id
instance_id , c.aggregate_type
, aggregate_type , c.aggregate_id
, aggregate_id , c.command_type -- AS event_type
) , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
instance_id , c.revision
, aggregate_type , NOW() -- AS created_at
, aggregate_id , c.payload
, owner , c.creator
, COALESCE(current_owner, c.owner) -- AS owner
, EXTRACT(EPOCH FROM NOW()) -- AS position
, c.ordinality::INT -- AS in_tx_order
FROM FROM
UNNEST(commands) UNNEST(commands) WITH ORDINALITY AS c
) AS command_owners ON WHERE
cmds.instance_id = command_owners.instance_id c.instance_id = aggregate.instance_id
AND cmds.aggregate_type = command_owners.aggregate_type AND c.aggregate_type = aggregate.aggregate_type
AND cmds.aggregate_id = command_owners.aggregate_id AND c.aggregate_id = aggregate.aggregate_id;
GROUP BY END LOOP;
cmds.instance_id RETURN;
, cmds.aggregate_type END;
, cmds.aggregate_id $$;
, 4 -- owner
) AS cs
ON c.instance_id = cs.instance_id
AND c.aggregate_type = cs.aggregate_type
AND c.aggregate_id = cs.aggregate_id
ORDER BY
in_tx_order;
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$
INSERT INTO eventstore.events2 INSERT INTO eventstore.events2
SELECT * FROM eventstore.commands_to_events(commands) SELECT * FROM eventstore.commands_to_events(commands)
ORDER BY in_tx_order
RETURNING * RETURNING *
$$ LANGUAGE SQL; $$ LANGUAGE SQL;