diff --git a/cmd/setup/40.go b/cmd/setup/40.go index 0a3a116d21..39191a9b8d 100644 --- a/cmd/setup/40.go +++ b/cmd/setup/40.go @@ -48,5 +48,5 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e } func (mig *InitPushFunc) String() string { - return "40_init_push_func_v2" + return "40_init_push_func_v3" } diff --git a/cmd/setup/40/cockroach/40_init_push_func.sql b/cmd/setup/40/cockroach/40_init_push_func.sql index c2e2e92b07..802dc759c9 100644 --- a/cmd/setup/40/cockroach/40_init_push_func.sql +++ b/cmd/setup/40/cockroach/40_init_push_func.sql @@ -10,8 +10,132 @@ CREATE TYPE IF NOT EXISTS eventstore.command AS ( , owner TEXT ); +CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state( + instance_id TEXT + , aggregate_type TEXT + , aggregate_id TEXT + + , sequence OUT BIGINT + , owner OUT TEXT +) + LANGUAGE 'plpgsql' +AS $$ + BEGIN + SELECT + COALESCE(e.sequence, 0) AS sequence + , e.owner + INTO + sequence + , owner + FROM + eventstore.events2 e + WHERE + e.instance_id = $1 + AND e.aggregate_type = $2 + AND e.aggregate_id = $3 + ORDER BY + e.sequence DESC + LIMIT 1; + + RETURN; + END; +$$; + +CREATE OR REPLACE FUNCTION eventstore.commands_to_events2(commands eventstore.command[]) + RETURNS eventstore.events2[] + LANGUAGE 'plpgsql' +AS $$ +DECLARE + current_sequence BIGINT; + current_owner TEXT; + + instance_id TEXT; + aggregate_type TEXT; + aggregate_id TEXT; + + _events eventstore.events2[]; + + _aggregates CURSOR FOR + select + DISTINCT ("c").instance_id + , ("c").aggregate_type + , ("c").aggregate_id + FROM + UNNEST(commands) AS c; +BEGIN + OPEN _aggregates; + LOOP + FETCH NEXT IN _aggregates INTO instance_id, aggregate_type, aggregate_id; + -- crdb does not support EXIT WHEN NOT FOUND + EXIT WHEN instance_id IS NULL; + + SELECT + * + INTO + current_sequence + , current_owner + FROM eventstore.latest_aggregate_state( + instance_id + , aggregate_type + , aggregate_id + ); + + -- RETURN QUERY is not supported by crdb: https://github.com/cockroachdb/cockroach/issues/105240 + SELECT + ARRAY_CAT(_events, ARRAY_AGG(e)) + INTO + _events + FROM ( + SELECT + ("c").instance_id + , ("c").aggregate_type + , ("c").aggregate_id + , ("c").command_type -- AS event_type + , COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence + , ("c").revision + , NOW() -- AS created_at + , ("c").payload + , ("c").creator + , COALESCE(current_owner, ("c").owner) -- AS owner + , EXTRACT(EPOCH FROM NOW()) -- AS position + , ordinality::INT -- AS in_tx_order + FROM + UNNEST(commands) WITH ORDINALITY AS c + WHERE + ("c").instance_id = instance_id + AND ("c").aggregate_type = aggregate_type + AND ("c").aggregate_id = aggregate_id + ) AS e; + END LOOP; + CLOSE _aggregates; + RETURN _events; +END; +$$; + +CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$ + INSERT INTO eventstore.events2 + SELECT + ("e").instance_id + , ("e").aggregate_type + , ("e").aggregate_id + , ("e").event_type + , ("e").sequence + , ("e").revision + , ("e").created_at + , ("e").payload + , ("e").creator + , ("e").owner + , ("e")."position" + , ("e").in_tx_order + FROM + UNNEST(eventstore.commands_to_events2(commands)) e + ORDER BY + in_tx_order + RETURNING * +$$ LANGUAGE SQL; + /* -select * from eventstore.commands_to_events( +select (c).* from UNNEST(eventstore.commands_to_events2( ARRAY[ ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM') , ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM') @@ -20,88 +144,6 @@ ARRAY[ , ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844') , ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM') ]::eventstore.command[] -); +) )c; */ -CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$ -SELECT - ("c").instance_id - , ("c").aggregate_type - , ("c").aggregate_id - , ("c").command_type AS event_type - , cs.sequence + ROW_NUMBER() OVER (PARTITION BY ("c").instance_id, ("c").aggregate_type, ("c").aggregate_id ORDER BY ("c").in_tx_order) AS sequence - , ("c").revision - , hlc_to_timestamp(cluster_logical_timestamp()) AS created_at - , ("c").payload - , ("c").creator - , cs.owner - , cluster_logical_timestamp() AS position - , ("c").in_tx_order -FROM ( - SELECT - ("c").instance_id - , ("c").aggregate_type - , ("c").aggregate_id - , ("c").command_type - , ("c").revision - , ("c").payload - , ("c").creator - , ("c").owner - , ROW_NUMBER() OVER () AS in_tx_order - FROM - UNNEST(commands) AS "c" -) AS "c" -JOIN ( - SELECT - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , CASE WHEN (e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner - , COALESCE(MAX(e.sequence), 0) AS sequence - FROM ( - SELECT DISTINCT - ("cmds").instance_id - , ("cmds").aggregate_type - , ("cmds").aggregate_id - , ("cmds").owner - FROM UNNEST(commands) AS "cmds" - ) AS cmds - LEFT JOIN eventstore.events2 AS e - ON cmds.instance_id = e.instance_id - AND cmds.aggregate_type = e.aggregate_type - AND cmds.aggregate_id = e.aggregate_id - JOIN ( - SELECT - DISTINCT ON ( - ("c").instance_id - , ("c").aggregate_type - , ("c").aggregate_id - ) - ("c").instance_id - , ("c").aggregate_type - , ("c").aggregate_id - , ("c").owner - FROM - UNNEST(commands) AS "c" - ) AS command_owners ON - cmds.instance_id = command_owners.instance_id - AND cmds.aggregate_type = command_owners.aggregate_type - AND cmds.aggregate_id = command_owners.aggregate_id - GROUP BY - cmds.instance_id - , cmds.aggregate_type - , cmds.aggregate_id - , 4 -- owner -) AS cs - ON ("c").instance_id = cs.instance_id - AND ("c").aggregate_type = cs.aggregate_type - AND ("c").aggregate_id = cs.aggregate_id -ORDER BY - in_tx_order -$$ LANGUAGE SQL; - -CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$ - INSERT INTO eventstore.events2 - SELECT * FROM eventstore.commands_to_events(commands) - RETURNING * -$$ LANGUAGE SQL; \ No newline at end of file diff --git a/e2e/config/localhost/docker-compose.yaml b/e2e/config/localhost/docker-compose.yaml index 040cbc81c0..f90ee158f0 100644 --- a/e2e/config/localhost/docker-compose.yaml +++ b/e2e/config/localhost/docker-compose.yaml @@ -30,7 +30,7 @@ services: db: restart: 'always' - image: 'cockroachdb/cockroach:v24.2.1' + image: 'cockroachdb/cockroach:latest' command: 'start-single-node --insecure --http-addr :9090' healthcheck: test: ['CMD', 'curl', '-f', 'http://localhost:9090/health?ready=1']