diff --git a/cmd/setup/40.go b/cmd/setup/40.go index a0d1afcf54..0a3a116d21 100644 --- a/cmd/setup/40.go +++ b/cmd/setup/40.go @@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e } func (mig *InitPushFunc) String() string { - return "40_init_push_func" + return "40_init_push_func_v2" } diff --git a/cmd/setup/40/postgres/02_func.sql b/cmd/setup/40/postgres/02_func.sql index 5f84f3908c..0d566ebb42 100644 --- a/cmd/setup/40/postgres/02_func.sql +++ b/cmd/setup/40/postgres/02_func.sql @@ -1,82 +1,92 @@ -CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ -SELECT - c.instance_id - , c.aggregate_type - , c.aggregate_id - , c.command_type AS event_type - , cs.sequence + ROW_NUMBER() OVER (PARTITION BY c.instance_id, c.aggregate_type, c.aggregate_id ORDER BY c.in_tx_order) AS sequence - , c.revision - , NOW() AS created_at - , c.payload - , c.creator - , cs.owner - , EXTRACT(EPOCH FROM NOW()) AS position - , c.in_tx_order -FROM ( - SELECT - c.instance_id - , c.aggregate_type - , c.aggregate_id - , c.command_type - , c.revision - , c.payload - , c.creator - , c.owner - , ROW_NUMBER() OVER () AS in_tx_order - FROM - UNNEST(commands) AS c -) AS c -JOIN ( - SELECT - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , CASE WHEN (e.owner IS NOT NULL OR e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner - , COALESCE(MAX(e.sequence), 0) AS sequence - FROM ( +CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state( + instance_id TEXT + , aggregate_type TEXT + , aggregate_id TEXT + + , sequence OUT BIGINT + , owner OUT TEXT +) + LANGUAGE 'plpgsql' + STABLE PARALLEL SAFE +AS $$ + BEGIN + SELECT + COALESCE(e.sequence, 0) AS sequence + , e.owner + INTO + sequence + , owner + FROM + eventstore.events2 e + WHERE + e.instance_id = $1 + AND e.aggregate_type = $2 + AND e.aggregate_id = $3 + ORDER BY + e.sequence DESC + LIMIT 1; + + RETURN; + END; +$$; + +CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) + RETURNS SETOF eventstore.events2 + LANGUAGE 'plpgsql' + STABLE PARALLEL SAFE + ROWS 10 +AS $$ +DECLARE + "aggregate" RECORD; + current_sequence BIGINT; + current_owner TEXT; +BEGIN + FOR "aggregate" IN SELECT DISTINCT instance_id , aggregate_type , aggregate_id - , owner FROM UNNEST(commands) - ) AS cmds - LEFT JOIN eventstore.events2 AS e - ON cmds.instance_id = e.instance_id - AND cmds.aggregate_type = e.aggregate_type - AND cmds.aggregate_id = e.aggregate_id - JOIN ( + LOOP + SELECT + * + INTO + current_sequence + , current_owner + FROM eventstore.latest_aggregate_state( + "aggregate".instance_id + , "aggregate".aggregate_type + , "aggregate".aggregate_id + ); + + RETURN QUERY SELECT - DISTINCT ON ( - instance_id - , aggregate_type - , aggregate_id - ) - instance_id - , aggregate_type - , aggregate_id - , owner + c.instance_id + , c.aggregate_type + , c.aggregate_id + , c.command_type -- AS event_type + , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence + , c.revision + , NOW() -- AS created_at + , c.payload + , c.creator + , COALESCE(current_owner, c.owner) -- AS owner + , EXTRACT(EPOCH FROM NOW()) -- AS position + , c.ordinality::INT -- AS in_tx_order FROM - UNNEST(commands) - ) AS command_owners ON - cmds.instance_id = command_owners.instance_id - AND cmds.aggregate_type = command_owners.aggregate_type - AND cmds.aggregate_id = command_owners.aggregate_id - GROUP BY - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , 4 -- owner -) AS cs - ON c.instance_id = cs.instance_id - AND c.aggregate_type = cs.aggregate_type - AND c.aggregate_id = cs.aggregate_id -ORDER BY - in_tx_order; -$$ LANGUAGE SQL; + UNNEST(commands) WITH ORDINALITY AS c + WHERE + c.instance_id = aggregate.instance_id + AND c.aggregate_type = aggregate.aggregate_type + AND c.aggregate_id = aggregate.aggregate_id; + END LOOP; + RETURN; +END; +$$; CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ INSERT INTO eventstore.events2 SELECT * FROM eventstore.commands_to_events(commands) +ORDER BY in_tx_order RETURNING * $$ LANGUAGE SQL;