From 037384d5cc2ce0d9055229612d2f553999118f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20M=C3=B6hlmann?= Date: Wed, 8 Jan 2025 13:59:44 +0200 Subject: [PATCH] perf(eventstore): optimize commands to events function (#9092) # Which Problems Are Solved We were seeing high query costs in a the lateral join executed in the commands_to_events procedural function in the database. The high cost resulted in incremental CPU usage as a load test continued and less req/sec handled, sarting at 836 and ending at 130 req/sec. # How the Problems Are Solved 1. Set `PARALLEL SAFE`. I noticed that this option defaults to `UNSAFE`. But it's actually safe if the function doesn't `INSERT` 2. Set the returned `ROWS 10` parameter. 3. Function is re-written in Pl/PgSQL so that we eliminate expensive joins. 4. Introduced an intermediate state that does `SELECT DISTINCT` for the aggregate so that we don't have to do an expensive lateral join. # Additional Changes Use a `COALESCE` to get the owner from the last event, instead of a `CASE` switch. # Additional Context - Function was introduced in https://github.com/zitadel/zitadel/pull/8816 - Closes https://github.com/zitadel/zitadel/issues/8352 --------- Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com> --- cmd/setup/40.go | 2 +- cmd/setup/40/postgres/02_func.sql | 148 ++++++++++++++++-------------- 2 files changed, 80 insertions(+), 70 deletions(-) diff --git a/cmd/setup/40.go b/cmd/setup/40.go index a0d1afcf54..0a3a116d21 100644 --- a/cmd/setup/40.go +++ b/cmd/setup/40.go @@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e } func (mig *InitPushFunc) String() string { - return "40_init_push_func" + return "40_init_push_func_v2" } diff --git a/cmd/setup/40/postgres/02_func.sql b/cmd/setup/40/postgres/02_func.sql index 5f84f3908c..0d566ebb42 100644 --- a/cmd/setup/40/postgres/02_func.sql +++ b/cmd/setup/40/postgres/02_func.sql @@ -1,82 +1,92 @@ -CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ -SELECT - c.instance_id - , c.aggregate_type - , c.aggregate_id - , c.command_type AS event_type - , cs.sequence + ROW_NUMBER() OVER (PARTITION BY c.instance_id, c.aggregate_type, c.aggregate_id ORDER BY c.in_tx_order) AS sequence - , c.revision - , NOW() AS created_at - , c.payload - , c.creator - , cs.owner - , EXTRACT(EPOCH FROM NOW()) AS position - , c.in_tx_order -FROM ( - SELECT - c.instance_id - , c.aggregate_type - , c.aggregate_id - , c.command_type - , c.revision - , c.payload - , c.creator - , c.owner - , ROW_NUMBER() OVER () AS in_tx_order - FROM - UNNEST(commands) AS c -) AS c -JOIN ( - SELECT - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , CASE WHEN (e.owner IS NOT NULL OR e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner - , COALESCE(MAX(e.sequence), 0) AS sequence - FROM ( +CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state( + instance_id TEXT + , aggregate_type TEXT + , aggregate_id TEXT + + , sequence OUT BIGINT + , owner OUT TEXT +) + LANGUAGE 'plpgsql' + STABLE PARALLEL SAFE +AS $$ + BEGIN + SELECT + COALESCE(e.sequence, 0) AS sequence + , e.owner + INTO + sequence + , owner + FROM + eventstore.events2 e + WHERE + e.instance_id = $1 + AND e.aggregate_type = $2 + AND e.aggregate_id = $3 + ORDER BY + e.sequence DESC + LIMIT 1; + + RETURN; + END; +$$; + +CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) + RETURNS SETOF eventstore.events2 + LANGUAGE 'plpgsql' + STABLE PARALLEL SAFE + ROWS 10 +AS $$ +DECLARE + "aggregate" RECORD; + current_sequence BIGINT; + current_owner TEXT; +BEGIN + FOR "aggregate" IN SELECT DISTINCT instance_id , aggregate_type , aggregate_id - , owner FROM UNNEST(commands) - ) AS cmds - LEFT JOIN eventstore.events2 AS e - ON cmds.instance_id = e.instance_id - AND cmds.aggregate_type = e.aggregate_type - AND cmds.aggregate_id = e.aggregate_id - JOIN ( + LOOP + SELECT + * + INTO + current_sequence + , current_owner + FROM eventstore.latest_aggregate_state( + "aggregate".instance_id + , "aggregate".aggregate_type + , "aggregate".aggregate_id + ); + + RETURN QUERY SELECT - DISTINCT ON ( - instance_id - , aggregate_type - , aggregate_id - ) - instance_id - , aggregate_type - , aggregate_id - , owner + c.instance_id + , c.aggregate_type + , c.aggregate_id + , c.command_type -- AS event_type + , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence + , c.revision + , NOW() -- AS created_at + , c.payload + , c.creator + , COALESCE(current_owner, c.owner) -- AS owner + , EXTRACT(EPOCH FROM NOW()) -- AS position + , c.ordinality::INT -- AS in_tx_order FROM - UNNEST(commands) - ) AS command_owners ON - cmds.instance_id = command_owners.instance_id - AND cmds.aggregate_type = command_owners.aggregate_type - AND cmds.aggregate_id = command_owners.aggregate_id - GROUP BY - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , 4 -- owner -) AS cs - ON c.instance_id = cs.instance_id - AND c.aggregate_type = cs.aggregate_type - AND c.aggregate_id = cs.aggregate_id -ORDER BY - in_tx_order; -$$ LANGUAGE SQL; + UNNEST(commands) WITH ORDINALITY AS c + WHERE + c.instance_id = aggregate.instance_id + AND c.aggregate_type = aggregate.aggregate_type + AND c.aggregate_id = aggregate.aggregate_id; + END LOOP; + RETURN; +END; +$$; CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ INSERT INTO eventstore.events2 SELECT * FROM eventstore.commands_to_events(commands) +ORDER BY in_tx_order RETURNING * $$ LANGUAGE SQL;