From 52b5e6063bad0f991ed0d5bebb2a21066d0bb22d Mon Sep 17 00:00:00 2001 From: adlerhurst <27845747+adlerhurst@users.noreply.github.com> Date: Tue, 7 Jan 2025 09:35:59 +0100 Subject: [PATCH] proper example code for instance and instance domain --- cmd/setup/43.go | 47 ----- cmd/setup/43/01_table_definition.sql | 13 -- cmd/setup/43/cockroach/02_drop_trigger.sql | 1 - cmd/setup/43/cockroach/03_create_function.sql | 29 ---- cmd/setup/43/cockroach/04_create_trigger.sql | 3 - cmd/setup/43/postgres/02_create_trigger.sql | 35 ---- cmd/setup/44.go | 164 ------------------ cmd/setup/44/cockroach/02_drop_trigger.sql | 1 - .../cockroach/04_create_trigger_function.sql | 23 --- cmd/setup/44/postgres/02_create_trigger.sql | 26 --- cmd/setup/45.go | 41 +++++ .../45/cockroach/01_subscriptions_table.sql | 9 + cmd/setup/45/cockroach/02_queue_table.sql | 13 ++ .../45/cockroach/03_queue_events_function.sql | 35 ++++ .../04_write_event_queue_trigger.sql | 4 + .../45/postgres/03_queue_events_function.sql | 35 ++++ cmd/setup/46.go | 41 +++++ .../00_subscribe_instance_events.sql | 9 + .../cockroach/01_instances_table.sql} | 2 - .../02_01_create_reduce_added_function.sql} | 4 +- ..._reduce_default_language_set_function.sql} | 0 ...03_create_reduce_project_set_function.sql} | 0 ...04_create_reduce_console_set_function.sql} | 0 ...reate_reduce_default_org_set_function.sql} | 0 .../02_06_create_reduce_updated_function.sql} | 0 .../02_07_create_reduce_removed_function.sql} | 0 .../cockroach/03_create_trigger_function.sql | 41 +++++ .../cockroach/04_create_trigger.sql} | 4 +- .../46/cockroach/05_copy_previous_events.sql | 41 +++++ .../02_01_create_reduce_added_function.sql | 23 +++ cmd/setup/47.go | 41 +++++ .../00_subscribe_instance_domain_events.sql | 5 + .../cockroach/01_instance_domains_table.sql | 15 ++ .../02_01_create_reduce_added_function.sql | 25 +++ ..._02_create_reduce_primary_set_function.sql | 15 ++ .../02_03_create_reduce_removed_function.sql | 12 ++ .../cockroach/03_create_trigger_function.sql | 33 ++++ cmd/setup/47/cockroach/04_create_trigger.sql | 5 + .../47/cockroach/05_copy_previous_events.sql | 41 +++++ .../02_01_create_reduce_added_function.sql | 23 +++ cmd/setup/config.go | 5 +- cmd/setup/setup.go | 10 +- 42 files changed, 520 insertions(+), 354 deletions(-) delete mode 100644 cmd/setup/43.go delete mode 100644 cmd/setup/43/01_table_definition.sql delete mode 100644 cmd/setup/43/cockroach/02_drop_trigger.sql delete mode 100644 cmd/setup/43/cockroach/03_create_function.sql delete mode 100644 cmd/setup/43/cockroach/04_create_trigger.sql delete mode 100644 cmd/setup/43/postgres/02_create_trigger.sql delete mode 100644 cmd/setup/44.go delete mode 100644 cmd/setup/44/cockroach/02_drop_trigger.sql delete mode 100644 cmd/setup/44/cockroach/04_create_trigger_function.sql delete mode 100644 cmd/setup/44/postgres/02_create_trigger.sql create mode 100644 cmd/setup/45.go create mode 100644 cmd/setup/45/cockroach/01_subscriptions_table.sql create mode 100644 cmd/setup/45/cockroach/02_queue_table.sql create mode 100644 cmd/setup/45/cockroach/03_queue_events_function.sql create mode 100644 cmd/setup/45/cockroach/04_write_event_queue_trigger.sql create mode 100644 cmd/setup/45/postgres/03_queue_events_function.sql create mode 100644 cmd/setup/46.go create mode 100644 cmd/setup/46/cockroach/00_subscribe_instance_events.sql rename cmd/setup/{44/01_table_definition.sql => 46/cockroach/01_instances_table.sql} (92%) rename cmd/setup/{44/cockroach/03_01_create_reduce_added_function.sql => 46/cockroach/02_01_create_reduce_added_function.sql} (82%) rename cmd/setup/{44/cockroach/03_02_create_reduce_default_language_set_function.sql => 46/cockroach/02_02_create_reduce_default_language_set_function.sql} (100%) rename cmd/setup/{44/cockroach/03_03_create_reduce_project_set_function.sql => 46/cockroach/02_03_create_reduce_project_set_function.sql} (100%) rename cmd/setup/{44/cockroach/03_04_create_reduce_console_set_function.sql => 46/cockroach/02_04_create_reduce_console_set_function.sql} (100%) rename cmd/setup/{44/cockroach/03_05_create_reduce_default_org_set_function.sql => 46/cockroach/02_05_create_reduce_default_org_set_function.sql} (100%) rename cmd/setup/{44/cockroach/03_06_create_reduce_updated_function.sql => 46/cockroach/02_06_create_reduce_updated_function.sql} (100%) rename cmd/setup/{44/cockroach/03_07_create_reduce_removed_function.sql => 46/cockroach/02_07_create_reduce_removed_function.sql} (100%) create mode 100644 cmd/setup/46/cockroach/03_create_trigger_function.sql rename cmd/setup/{44/cockroach/05_create_trigger.sql => 46/cockroach/04_create_trigger.sql} (55%) create mode 100644 cmd/setup/46/cockroach/05_copy_previous_events.sql create mode 100644 cmd/setup/46/postgres/02_01_create_reduce_added_function.sql create mode 100644 cmd/setup/47.go create mode 100644 cmd/setup/47/cockroach/00_subscribe_instance_domain_events.sql create mode 100644 cmd/setup/47/cockroach/01_instance_domains_table.sql create mode 100644 cmd/setup/47/cockroach/02_01_create_reduce_added_function.sql create mode 100644 cmd/setup/47/cockroach/02_02_create_reduce_primary_set_function.sql create mode 100644 cmd/setup/47/cockroach/02_03_create_reduce_removed_function.sql create mode 100644 cmd/setup/47/cockroach/03_create_trigger_function.sql create mode 100644 cmd/setup/47/cockroach/04_create_trigger.sql create mode 100644 cmd/setup/47/cockroach/05_copy_previous_events.sql create mode 100644 cmd/setup/47/postgres/02_01_create_reduce_added_function.sql diff --git a/cmd/setup/43.go b/cmd/setup/43.go deleted file mode 100644 index e28ee7944c..0000000000 --- a/cmd/setup/43.go +++ /dev/null @@ -1,47 +0,0 @@ -package setup - -import ( - "context" - "embed" - _ "embed" - - "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/eventstore" -) - -var ( - //go:embed 43/01_table_definition.sql - createOutboxTable string - - //go:embed 43/cockroach/*.sql - //go:embed 43/postgres/*.sql - createOutboxTriggers embed.FS -) - -type CreateOutbox struct { - dbClient *database.DB -} - -func (mig *CreateOutbox) Execute(ctx context.Context, _ eventstore.Event) error { - _, err := mig.dbClient.ExecContext(ctx, createOutboxTable) - if err != nil { - return err - } - statements, err := readStatements(createOutboxTriggers, "43", mig.dbClient.Type()) - if err != nil { - return err - } - for _, stmt := range statements { - logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") - _, err = mig.dbClient.ExecContext(ctx, stmt.query) - if err != nil { - return err - } - } - return nil -} - -func (mig *CreateOutbox) String() string { - return "43_create_outbox" -} diff --git a/cmd/setup/43/01_table_definition.sql b/cmd/setup/43/01_table_definition.sql deleted file mode 100644 index b780fa1a7a..0000000000 --- a/cmd/setup/43/01_table_definition.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE IF NOT EXISTS event_outbox ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid() - , instance_id TEXT NOT NULL - , aggregate_type TEXT NOT NULL - , aggregate_id TEXT NOT NULL - , event_type TEXT NOT NULL - , event_revision INT2 NOT NULL - , created_at TIMESTAMPTZ NOT NULL DEFAULT TRANSACTION_TIMESTAMP() - , payload JSONB NULL - , creator TEXT NOT NULL - , position NUMERIC NOT NULL - , in_position_order INT4 NOT NULL -); \ No newline at end of file diff --git a/cmd/setup/43/cockroach/02_drop_trigger.sql b/cmd/setup/43/cockroach/02_drop_trigger.sql deleted file mode 100644 index fdc18b07c0..0000000000 --- a/cmd/setup/43/cockroach/02_drop_trigger.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TRIGGER IF EXISTS copy_to_outbox ON eventstore.events2; diff --git a/cmd/setup/43/cockroach/03_create_function.sql b/cmd/setup/43/cockroach/03_create_function.sql deleted file mode 100644 index 0ea1b2c06e..0000000000 --- a/cmd/setup/43/cockroach/03_create_function.sql +++ /dev/null @@ -1,29 +0,0 @@ -CREATE OR REPLACE FUNCTION copy_events_to_outbox() -RETURNS TRIGGER AS $$ -BEGIN - INSERT INTO event_outbox ( - instance_id - , aggregate_type - , aggregate_id - , event_type - , event_revision - , created_at - , payload - , creator - , position - , in_position_order - ) VALUES ( - (NEW).instance_id - , (NEW).aggregate_type - , (NEW).aggregate_id - , (NEW).event_type - , (NEW).revision - , (NEW).created_at - , (NEW).payload - , (NEW).creator - , (NEW).position::NUMERIC - , (NEW).in_tx_order - ); - RETURN NULL; -END; -$$ LANGUAGE plpgsql; diff --git a/cmd/setup/43/cockroach/04_create_trigger.sql b/cmd/setup/43/cockroach/04_create_trigger.sql deleted file mode 100644 index d15cd2cfa9..0000000000 --- a/cmd/setup/43/cockroach/04_create_trigger.sql +++ /dev/null @@ -1,3 +0,0 @@ -CREATE TRIGGER copy_to_outbox -AFTER INSERT ON eventstore.events2 -FOR EACH ROW EXECUTE FUNCTION copy_events_to_outbox(); diff --git a/cmd/setup/43/postgres/02_create_trigger.sql b/cmd/setup/43/postgres/02_create_trigger.sql deleted file mode 100644 index 70ff70c400..0000000000 --- a/cmd/setup/43/postgres/02_create_trigger.sql +++ /dev/null @@ -1,35 +0,0 @@ -DROP TRIGGER IF EXISTS copy_to_outbox ON eventstore.events2; - -CREATE OR REPLACE FUNCTION copy_events_to_outbox() -RETURNS TRIGGER AS $$ -BEGIN - INSERT INTO event_outbox ( - instance_id - , aggregate_type - , aggregate_id - , event_type - , event_revision - , created_at - , payload - , creator - , position - , in_position_order - ) VALUES ( - (NEW).instance_id - , (NEW).aggregate_type - , (NEW).aggregate_id - , (NEW).event_type - , (NEW).revision - , (NEW).created_at - , (NEW).payload - , (NEW).creator - , pg_current_xact_id()::TEXT::NUMERIC - , (NEW).in_tx_order - ); - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER copy_to_outbox -AFTER INSERT ON eventstore.events2 -FOR EACH ROW EXECUTE FUNCTION copy_events_to_outbox(); \ No newline at end of file diff --git a/cmd/setup/44.go b/cmd/setup/44.go deleted file mode 100644 index 51afc3d21b..0000000000 --- a/cmd/setup/44.go +++ /dev/null @@ -1,164 +0,0 @@ -package setup - -import ( - "context" - "embed" - _ "embed" - - "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/repository/instance" -) - -var ( - //go:embed 44/01_table_definition.sql - createTransactionalInstance string - - //go:embed 44/cockroach/*.sql - //go:embed 44/postgres/*.sql - createReduceInstanceTrigger embed.FS -) - -type CreateTransactionalInstance struct { - dbClient *database.DB - eventstore *eventstore.Eventstore - BulkLimit uint64 -} - -func (mig *CreateTransactionalInstance) Execute(ctx context.Context, _ eventstore.Event) (err error) { - _, err = mig.dbClient.ExecContext(ctx, createTransactionalInstance) - if err != nil { - return err - } - statements, err := readStatements(createReduceInstanceTrigger, "44", mig.dbClient.Type()) - if err != nil { - return err - } - for _, stmt := range statements { - logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") - _, err = mig.dbClient.ExecContext(ctx, stmt.query) - if err != nil { - return err - } - } - - reducer := new(instanceEvents) - for { - err = mig.eventstore.FilterToReducer(ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - AwaitOpenTransactions(). - Limit(mig.BulkLimit). - Offset(reducer.offset). - OrderAsc(). - AddQuery(). - AggregateTypes(instance.AggregateType). - EventTypes( - instance.InstanceAddedEventType, - instance.InstanceChangedEventType, - instance.InstanceRemovedEventType, - instance.DefaultLanguageSetEventType, - instance.ProjectSetEventType, - instance.ConsoleSetEventType, - instance.DefaultOrgSetEventType, - ). - Builder(), - reducer, - ) - if err != nil || len(reducer.events) == 0 { - return err - } - - tx, err := mig.dbClient.BeginTx(ctx, nil) - if err != nil { - return err - } - for _, event := range reducer.events { - switch e := event.(type) { - case *instance.InstanceAddedEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_added($1, $2, $3, $4)", - e.Aggregate().ID, - e.Name, - e.CreatedAt(), - e.Position(), - ) - case *instance.InstanceChangedEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_updated($1, $2, $3, $4)", - e.Aggregate().ID, - e.Name, - e.CreatedAt(), - e.Position(), - ) - case *instance.InstanceRemovedEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_removed($1)", - e.Aggregate().ID, - ) - case *instance.DefaultLanguageSetEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_removed($1, $2, $3, $4)", - e.Aggregate().ID, - e.Language, - e.CreatedAt(), - e.Position(), - ) - case *instance.ProjectSetEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_project_set($1, $2, $3, $4)", - e.Aggregate().ID, - e.ProjectID, - e.CreatedAt(), - e.Position(), - ) - case *instance.ConsoleSetEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_console_set($1, $2, $3, $4, $5)", - e.Aggregate().ID, - e.AppID, - e.ClientID, - e.CreatedAt(), - e.Position(), - ) - case *instance.DefaultOrgSetEvent: - _, err = tx.ExecContext(ctx, - "SELECT reduce_instance_default_org_set($1, $2, $3, $4)", - e.Aggregate().ID, - e.OrgID, - e.CreatedAt(), - e.Position(), - ) - } - if err != nil { - _ = tx.Rollback() - return err - } - if err = tx.Commit(); err != nil { - return err - } - } - - reducer.events = nil - reducer.offset += uint32(len(reducer.events)) - } -} - -func (mig *CreateTransactionalInstance) String() string { - return "44_create_transactional_instance" -} - -type instanceEvents struct { - offset uint32 - events []eventstore.Event -} - -// AppendEvents implements eventstore.reducer. -func (i *instanceEvents) AppendEvents(events ...eventstore.Event) { - i.events = append(i.events, events...) -} - -// Reduce implements eventstore.reducer. -func (i *instanceEvents) Reduce() error { - return nil -} diff --git a/cmd/setup/44/cockroach/02_drop_trigger.sql b/cmd/setup/44/cockroach/02_drop_trigger.sql deleted file mode 100644 index 59af2c71ab..0000000000 --- a/cmd/setup/44/cockroach/02_drop_trigger.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TRIGGER IF EXISTS reduce_instance_added ON eventstore.events2; diff --git a/cmd/setup/44/cockroach/04_create_trigger_function.sql b/cmd/setup/44/cockroach/04_create_trigger_function.sql deleted file mode 100644 index 111467d938..0000000000 --- a/cmd/setup/44/cockroach/04_create_trigger_function.sql +++ /dev/null @@ -1,23 +0,0 @@ -CREATE OR REPLACE FUNCTION reduce_instance_events() -RETURNS TRIGGER -LANGUAGE PLpgSQL -AS $$ -BEGIN - IF (NEW).event_type = 'instance.added' THEN - SELECT reduce_instance_added(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.changed' THEN - -- SELECT reduce_instance_changed(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.removed' THEN - -- SELECT reduce_instance_removed(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.default.language.set' THEN - -- SELECT reduce_instance_default_language_set(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.default.org.set' THEN - -- SELECT reduce_instance_default_org_set(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.iam.project.set' THEN - -- SELECT reduce_instance_project_set(NEW::eventstore.events2); - -- ELSIF (NEW).event_type = 'instance.iam.console.set' THEN - -- SELECT reduce_instance_console_set(NEW::eventstore.events2); - END IF; - RETURN NULL; -END -$$; diff --git a/cmd/setup/44/postgres/02_create_trigger.sql b/cmd/setup/44/postgres/02_create_trigger.sql deleted file mode 100644 index 8bdb160e4c..0000000000 --- a/cmd/setup/44/postgres/02_create_trigger.sql +++ /dev/null @@ -1,26 +0,0 @@ -DROP TRIGGER IF EXISTS reduce_instance_added ON eventstore.events2; - -CREATE OR REPLACE FUNCTION zitadel.reduce_instance_added() -RETURNS TRIGGER AS $$ -BEGIN - INSERT INTO zitadel.instances ( - id - , name - , change_date - , creation_date - ) VALUES ( - (NEW).aggregate_id - , (NEW).payload->>'name' - , (NEW).created_at - , (NEW).created_at - ) - ON CONFLICT (id) DO NOTHING; - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER reduce_instance_added -AFTER INSERT ON eventstore.events2 -FOR EACH ROW -WHEN (NEW).event_type = 'instance.added' -EXECUTE FUNCTION reduce_instance_added(); diff --git a/cmd/setup/45.go b/cmd/setup/45.go new file mode 100644 index 0000000000..d561f2e4bd --- /dev/null +++ b/cmd/setup/45.go @@ -0,0 +1,41 @@ +package setup + +import ( + "context" + "embed" + _ "embed" + + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 45/cockroach/*.sql + //go:embed 45/postgres/*.sql + eventQueue embed.FS +) + +type EventQueue struct { + dbClient *database.DB +} + +func (mig *EventQueue) Execute(ctx context.Context, _ eventstore.Event) (err error) { + statements, err := readStatements(eventQueue, "45", mig.dbClient.Type()) + if err != nil { + return err + } + for _, stmt := range statements { + logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") + _, err = mig.dbClient.ExecContext(ctx, stmt.query) + if err != nil { + return err + } + } + + return nil +} + +func (mig *EventQueue) String() string { + return "45_event_queue" +} diff --git a/cmd/setup/45/cockroach/01_subscriptions_table.sql b/cmd/setup/45/cockroach/01_subscriptions_table.sql new file mode 100644 index 0000000000..b2cbbbb87c --- /dev/null +++ b/cmd/setup/45/cockroach/01_subscriptions_table.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS subscriptions ( + subscriber TEXT NOT NULL + , instance_id TEXT -- if null susbcription is for all instances + , "all" BOOLEAN + , aggregate_type TEXT + , event_type TEXT + + , CONSTRAINT min_args CHECK ("all" OR aggregate_type IS NOT NULL) +); \ No newline at end of file diff --git a/cmd/setup/45/cockroach/02_queue_table.sql b/cmd/setup/45/cockroach/02_queue_table.sql new file mode 100644 index 0000000000..0b8254ae58 --- /dev/null +++ b/cmd/setup/45/cockroach/02_queue_table.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS "queue" ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid() + , subscriber TEXT NOT NULL + , instance_id TEXT NOT NULL + , aggregate_type TEXT NOT NULL + , aggregate_id TEXT NOT NULL + , sequence INT8 NOT NULL + + , position NUMERIC NOT NULL + , in_position_order INT2 NOT NULL + + , CONSTRAINT events_fk FOREIGN KEY (instance_id, aggregate_type, aggregate_id, "sequence") REFERENCES eventstore.events2 (instance_id, aggregate_type, aggregate_id, "sequence") +); \ No newline at end of file diff --git a/cmd/setup/45/cockroach/03_queue_events_function.sql b/cmd/setup/45/cockroach/03_queue_events_function.sql new file mode 100644 index 0000000000..0a4cc198da --- /dev/null +++ b/cmd/setup/45/cockroach/03_queue_events_function.sql @@ -0,0 +1,35 @@ +CREATE OR REPLACE FUNCTION queue_events() +RETURNS TRIGGER +LANGUAGE PLpgSQL +AS $$ +BEGIN + INSERT INTO "queue" ( + subscriber + , instance_id + , aggregate_type + , aggregate_id + , sequence + , position + , in_position_order + ) SELECT + DISTINCT subscriber + , (NEW).instance_id + , (NEW).aggregate_type + , (NEW).aggregate_id + , (NEW)."sequence" + , (NEW).position + , (NEW).in_tx_order + FROM + subscriptions + WHERE + (instance_id IS NULL OR (NEW).instance_id = instance_id) + AND ("all" OR ( + aggregate_type = (NEW).aggregate_type + AND ( + event_type IS NULL + OR (NEW).event_type = event_type + )) + ); + RETURN NULL; +END; +$$; \ No newline at end of file diff --git a/cmd/setup/45/cockroach/04_write_event_queue_trigger.sql b/cmd/setup/45/cockroach/04_write_event_queue_trigger.sql new file mode 100644 index 0000000000..99073ac7ad --- /dev/null +++ b/cmd/setup/45/cockroach/04_write_event_queue_trigger.sql @@ -0,0 +1,4 @@ +CREATE TRIGGER write_event_queue +AFTER INSERT ON eventstore.events2 +FOR EACH ROW +EXECUTE FUNCTION queue_events(); \ No newline at end of file diff --git a/cmd/setup/45/postgres/03_queue_events_function.sql b/cmd/setup/45/postgres/03_queue_events_function.sql new file mode 100644 index 0000000000..b803271836 --- /dev/null +++ b/cmd/setup/45/postgres/03_queue_events_function.sql @@ -0,0 +1,35 @@ +CREATE OR REPLACE FUNCTION queue_events() +RETURNS TRIGGER +LANGUAGE PLpgSQL +AS $$ +BEGIN + INSERT INTO "queue" ( + subscriber + , instance_id + , aggregate_type + , aggregate_id + , sequence + , position + , in_position_order + ) SELECT + subscriber + , NEW.instance_id + , NEW.aggregate_type + , NEW.aggregate_id + , NEW."sequence" + , NEW.position + , NEW.in_tx_order + FROM + subscriptions + WHERE + (instance_id IS NULL OR NEW.instance_id = instance_id) + AND ("all" OR ( + aggregate_type = NEW.aggregate_type + AND ( + event_type IS NULL + OR NEW.event_type = event_type + )) + ); + RETURN NULL; +END; +$$; \ No newline at end of file diff --git a/cmd/setup/46.go b/cmd/setup/46.go new file mode 100644 index 0000000000..1706b2acaa --- /dev/null +++ b/cmd/setup/46.go @@ -0,0 +1,41 @@ +package setup + +import ( + "context" + "embed" + _ "embed" + + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 46/cockroach/*.sql + //go:embed 46/postgres/*.sql + transactionalInstanceTable embed.FS +) + +type TransactionalInstanceTable struct { + dbClient *database.DB +} + +func (mig *TransactionalInstanceTable) Execute(ctx context.Context, _ eventstore.Event) (err error) { + statements, err := readStatements(transactionalInstanceTable, "46", mig.dbClient.Type()) + if err != nil { + return err + } + for _, stmt := range statements { + logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") + _, err = mig.dbClient.ExecContext(ctx, stmt.query) + if err != nil { + return err + } + } + + return nil +} + +func (mig *TransactionalInstanceTable) String() string { + return "46_transactional_instance_table" +} diff --git a/cmd/setup/46/cockroach/00_subscribe_instance_events.sql b/cmd/setup/46/cockroach/00_subscribe_instance_events.sql new file mode 100644 index 0000000000..1eb9859083 --- /dev/null +++ b/cmd/setup/46/cockroach/00_subscribe_instance_events.sql @@ -0,0 +1,9 @@ +INSERT INTO subscriptions (subscriber, aggregate_type, event_type) VALUES + ('transactional-instances', 'instance', 'instance.added') + , ('transactional-instances', 'instance', 'instance.changed') + , ('transactional-instances', 'instance', 'instance.removed') + , ('transactional-instances', 'instance', 'instance.default.language.set') + , ('transactional-instances', 'instance', 'instance.default.org.set') + , ('transactional-instances', 'instance', 'instance.iam.project.set') + , ('transactional-instances', 'instance', 'instance.iam.console.set') +; \ No newline at end of file diff --git a/cmd/setup/44/01_table_definition.sql b/cmd/setup/46/cockroach/01_instances_table.sql similarity index 92% rename from cmd/setup/44/01_table_definition.sql rename to cmd/setup/46/cockroach/01_instances_table.sql index 4c3e60fd96..c60332f876 100644 --- a/cmd/setup/44/01_table_definition.sql +++ b/cmd/setup/46/cockroach/01_instances_table.sql @@ -11,5 +11,3 @@ CREATE TABLE IF NOT EXISTS instances ( , console_app_id TEXT , default_language TEXT ); - --- | sequence INT8 NOT NULL, \ No newline at end of file diff --git a/cmd/setup/44/cockroach/03_01_create_reduce_added_function.sql b/cmd/setup/46/cockroach/02_01_create_reduce_added_function.sql similarity index 82% rename from cmd/setup/44/cockroach/03_01_create_reduce_added_function.sql rename to cmd/setup/46/cockroach/02_01_create_reduce_added_function.sql index 8794870c35..a4c7612a6e 100644 --- a/cmd/setup/44/cockroach/03_01_create_reduce_added_function.sql +++ b/cmd/setup/46/cockroach/02_01_create_reduce_added_function.sql @@ -1,6 +1,6 @@ -CREATE OR REPLACE FUNCTION reduce_instance_added("event" RECORD) +CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2) RETURNS VOID -LANGUAGE SQL +LANGUAGE PLpgSQL AS $$ BEGIN INSERT INTO instances ( diff --git a/cmd/setup/44/cockroach/03_02_create_reduce_default_language_set_function.sql b/cmd/setup/46/cockroach/02_02_create_reduce_default_language_set_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_02_create_reduce_default_language_set_function.sql rename to cmd/setup/46/cockroach/02_02_create_reduce_default_language_set_function.sql diff --git a/cmd/setup/44/cockroach/03_03_create_reduce_project_set_function.sql b/cmd/setup/46/cockroach/02_03_create_reduce_project_set_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_03_create_reduce_project_set_function.sql rename to cmd/setup/46/cockroach/02_03_create_reduce_project_set_function.sql diff --git a/cmd/setup/44/cockroach/03_04_create_reduce_console_set_function.sql b/cmd/setup/46/cockroach/02_04_create_reduce_console_set_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_04_create_reduce_console_set_function.sql rename to cmd/setup/46/cockroach/02_04_create_reduce_console_set_function.sql diff --git a/cmd/setup/44/cockroach/03_05_create_reduce_default_org_set_function.sql b/cmd/setup/46/cockroach/02_05_create_reduce_default_org_set_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_05_create_reduce_default_org_set_function.sql rename to cmd/setup/46/cockroach/02_05_create_reduce_default_org_set_function.sql diff --git a/cmd/setup/44/cockroach/03_06_create_reduce_updated_function.sql b/cmd/setup/46/cockroach/02_06_create_reduce_updated_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_06_create_reduce_updated_function.sql rename to cmd/setup/46/cockroach/02_06_create_reduce_updated_function.sql diff --git a/cmd/setup/44/cockroach/03_07_create_reduce_removed_function.sql b/cmd/setup/46/cockroach/02_07_create_reduce_removed_function.sql similarity index 100% rename from cmd/setup/44/cockroach/03_07_create_reduce_removed_function.sql rename to cmd/setup/46/cockroach/02_07_create_reduce_removed_function.sql diff --git a/cmd/setup/46/cockroach/03_create_trigger_function.sql b/cmd/setup/46/cockroach/03_create_trigger_function.sql new file mode 100644 index 0000000000..d3440f29fc --- /dev/null +++ b/cmd/setup/46/cockroach/03_create_trigger_function.sql @@ -0,0 +1,41 @@ +CREATE OR REPLACE FUNCTION reduce_instance_events() +RETURNS TRIGGER +LANGUAGE PLpgSQL +AS $$ +DECLARE + "event" eventstore.events2; +BEGIN + SELECT + * + INTO + event + FROM + eventstore.events2 e + WHERE + e.instance_id = (NEW).instance_id + AND e.aggregate_type = (NEW).aggregate_type + AND e.aggregate_id = (NEW).aggregate_id + AND e."sequence" = (NEW)."sequence" + ; + + IF ("event").event_type = 'instance.added' THEN + SELECT reduce_instance_added("event"); + ELSIF ("event").event_type = 'instance.changed' THEN + SELECT reduce_instance_changed("event"); + ELSIF ("event").event_type = 'instance.removed' THEN + SELECT reduce_instance_removed("event"); + ELSIF ("event").event_type = 'instance.default.language.set' THEN + SELECT reduce_instance_default_language_set("event"); + ELSIF ("event").event_type = 'instance.default.org.set' THEN + SELECT reduce_instance_default_org_set("event"); + ELSIF ("event").event_type = 'instance.iam.project.set' THEN + SELECT reduce_instance_project_set("event"); + ELSIF ("event").event_type = 'instance.iam.console.set' THEN + SELECT reduce_instance_console_set("event"); + END IF; + + DELETE FROM "queue" WHERE id = (NEW).id; + + RETURN NULL; +END +$$; diff --git a/cmd/setup/44/cockroach/05_create_trigger.sql b/cmd/setup/46/cockroach/04_create_trigger.sql similarity index 55% rename from cmd/setup/44/cockroach/05_create_trigger.sql rename to cmd/setup/46/cockroach/04_create_trigger.sql index 42f3119ae6..55f7e60dd9 100644 --- a/cmd/setup/44/cockroach/05_create_trigger.sql +++ b/cmd/setup/46/cockroach/04_create_trigger.sql @@ -1,5 +1,5 @@ CREATE TRIGGER reduce_instance_events -AFTER INSERT ON eventstore.events2 +AFTER INSERT ON "queue" FOR EACH ROW -WHEN (NEW).aggregate_type = 'instance' +WHEN (NEW).subscriber = 'transactional-instances' EXECUTE FUNCTION reduce_instance_events(); diff --git a/cmd/setup/46/cockroach/05_copy_previous_events.sql b/cmd/setup/46/cockroach/05_copy_previous_events.sql new file mode 100644 index 0000000000..11b62005e9 --- /dev/null +++ b/cmd/setup/46/cockroach/05_copy_previous_events.sql @@ -0,0 +1,41 @@ +WITH active_instances AS ( + SELECT + instance_id + FROM + eventstore.events2 + WHERE + aggregate_type = 'instance' + AND event_type = 'instance.added' + AND instance_id NOT IN ( + SELECT + instance_id + FROM + eventstore.events2 + WHERE + aggregate_type = 'instance' + AND event_type = 'instance.removed' + ) +) +INSERT INTO "queue" ( + subscriber + , instance_id + , aggregate_type + , aggregate_id + , sequence + , position + , in_position_order +) SELECT + 'transactional-instances' + , e.instance_id + , e.aggregate_type + , e.aggregate_id + , e."sequence" + , e.position + , e.in_tx_order +FROM + eventstore.events2 e +WHERE + e.instance_id IN (SELECT instance_id FROM active_instances) + AND e.aggregate_type = 'instance' + AND e.event_type IN ('instance.added', 'instance.changed', 'instance.removed', 'instance.default.language.set', 'instance.default.org.set', 'instance.iam.project.set', 'instance.iam.console.set') + AND e.aggregate_id IN (SELECT instance_id FROM active_instances); diff --git a/cmd/setup/46/postgres/02_01_create_reduce_added_function.sql b/cmd/setup/46/postgres/02_01_create_reduce_added_function.sql new file mode 100644 index 0000000000..a4c7612a6e --- /dev/null +++ b/cmd/setup/46/postgres/02_01_create_reduce_added_function.sql @@ -0,0 +1,23 @@ +CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2) +RETURNS VOID +LANGUAGE PLpgSQL +AS $$ +BEGIN + INSERT INTO instances ( + id + , "name" + , creation_date + , change_date + , latest_position + , latest_in_position_order + ) VALUES ( + (event).aggregate_id + , (event).payload->>'name' + , (event).created_at + , (event).created_at + , (event).position + , (event).in_tx_order::INT2 + ) + ON CONFLICT (id) DO NOTHING; +END; +$$; diff --git a/cmd/setup/47.go b/cmd/setup/47.go new file mode 100644 index 0000000000..60d6bdc9f4 --- /dev/null +++ b/cmd/setup/47.go @@ -0,0 +1,41 @@ +package setup + +import ( + "context" + "embed" + _ "embed" + + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" +) + +var ( + //go:embed 47/cockroach/*.sql + //go:embed 47/postgres/*.sql + transactionalInstanceDomainTable embed.FS +) + +type TransactionalInstanceDomainTable struct { + dbClient *database.DB +} + +func (mig *TransactionalInstanceDomainTable) Execute(ctx context.Context, _ eventstore.Event) (err error) { + statements, err := readStatements(transactionalInstanceDomainTable, "47", mig.dbClient.Type()) + if err != nil { + return err + } + for _, stmt := range statements { + logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") + _, err = mig.dbClient.ExecContext(ctx, stmt.query) + if err != nil { + return err + } + } + + return nil +} + +func (mig *TransactionalInstanceDomainTable) String() string { + return "47_transactional_instance_domain_table" +} diff --git a/cmd/setup/47/cockroach/00_subscribe_instance_domain_events.sql b/cmd/setup/47/cockroach/00_subscribe_instance_domain_events.sql new file mode 100644 index 0000000000..164b51c883 --- /dev/null +++ b/cmd/setup/47/cockroach/00_subscribe_instance_domain_events.sql @@ -0,0 +1,5 @@ +INSERT INTO subscriptions (subscriber, aggregate_type, event_type) VALUES + ('transactional-instance-domains', 'instance', 'instance.domain.added') + , ('transactional-instance-domains', 'instance', 'instance.domain.primary.set') + , ('transactional-instance-domains', 'instance', 'instance.domain.removed') +; \ No newline at end of file diff --git a/cmd/setup/47/cockroach/01_instance_domains_table.sql b/cmd/setup/47/cockroach/01_instance_domains_table.sql new file mode 100644 index 0000000000..10019f9c56 --- /dev/null +++ b/cmd/setup/47/cockroach/01_instance_domains_table.sql @@ -0,0 +1,15 @@ +CREATE TABLE IF NOT EXISTS instance_domains ( + instance_id TEXT + , domain TEXT + , is_generated BOOLEAN NOT NULL + , is_primary BOOLEAN NOT NULL DEFAULT FALSE + + , change_date TIMESTAMPTZ NOT NULL + , creation_date TIMESTAMPTZ NOT NULL + + , latest_position NUMERIC NOT NULL + , latest_in_position_order INT2 NOT NULL + + , PRIMARY KEY (instance_id, domain) + , CONSTRAINT fk_instance_id FOREIGN KEY (instance_id) REFERENCES instances (id) +); diff --git a/cmd/setup/47/cockroach/02_01_create_reduce_added_function.sql b/cmd/setup/47/cockroach/02_01_create_reduce_added_function.sql new file mode 100644 index 0000000000..86c7649ed1 --- /dev/null +++ b/cmd/setup/47/cockroach/02_01_create_reduce_added_function.sql @@ -0,0 +1,25 @@ +CREATE OR REPLACE FUNCTION reduce_instance_domain_added("event" eventstore.events2) +RETURNS VOID +LANGUAGE PLpgSQL +AS $$ +BEGIN + INSERT INTO instance_domains ( + instance_id + , domain + , is_generated + , creation_date + , change_date + , latest_position + , latest_in_position_order + ) VALUES ( + (event).aggregate_id + , (event).payload->>'domain' + , COALESCE((event).payload->'generated', 'false')::BOOLEAN + , (event).created_at + , (event).created_at + , (event).position + , (event).in_tx_order::INT2 + ) + ON CONFLICT (instance_id, domain) DO NOTHING; +END; +$$; diff --git a/cmd/setup/47/cockroach/02_02_create_reduce_primary_set_function.sql b/cmd/setup/47/cockroach/02_02_create_reduce_primary_set_function.sql new file mode 100644 index 0000000000..b3b1f65c9e --- /dev/null +++ b/cmd/setup/47/cockroach/02_02_create_reduce_primary_set_function.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE FUNCTION reduce_instance_domain_primary_set("event" eventstore.events2) +RETURNS VOID +LANGUAGE PLpgSQL +AS $$ +BEGIN + UPDATE instance_domains SET + is_primary = ((event).payload->>'domain' = domain) + , change_date = (event).created_at + , latest_position = (event).position + , latest_in_position_order = (event).in_tx_order::INT2 + WHERE + instance_id = (event).aggregate_id + AND (latest_position, latest_in_position_order) < ((event).position, (event).in_tx_order::INT2); +END; +$$; diff --git a/cmd/setup/47/cockroach/02_03_create_reduce_removed_function.sql b/cmd/setup/47/cockroach/02_03_create_reduce_removed_function.sql new file mode 100644 index 0000000000..955a3fb90e --- /dev/null +++ b/cmd/setup/47/cockroach/02_03_create_reduce_removed_function.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION reduce_instance_domain_removed("event" eventstore.events2) +RETURNS VOID +LANGUAGE PLpgSQL +AS $$ +BEGIN + DELETE FROM + instance_domains + WHERE + instance_id = (event).aggregate_id + AND domain = (event).payload->>'domain'; +END; +$$; diff --git a/cmd/setup/47/cockroach/03_create_trigger_function.sql b/cmd/setup/47/cockroach/03_create_trigger_function.sql new file mode 100644 index 0000000000..8ba5442796 --- /dev/null +++ b/cmd/setup/47/cockroach/03_create_trigger_function.sql @@ -0,0 +1,33 @@ +CREATE OR REPLACE FUNCTION reduce_instance_domain_events() +RETURNS TRIGGER +LANGUAGE PLpgSQL +AS $$ +DECLARE + "event" eventstore.events2; +BEGIN + SELECT + * + INTO + event + FROM + eventstore.events2 e + WHERE + e.instance_id = (NEW).instance_id + AND e.aggregate_type = (NEW).aggregate_type + AND e.aggregate_id = (NEW).aggregate_id + AND e."sequence" = (NEW)."sequence" + ; + + IF ("event").event_type = 'instance.domain.added' THEN + SELECT reduce_instance_domain_added("event"); + ELSIF ("event").event_type = 'instance.domain.primary.set' THEN + SELECT reduce_instance_domain_primary_set("event"); + ELSIF ("event").event_type = 'instance.domain.removed' THEN + SELECT reduce_instance_domain_removed("event"); + END IF; + + DELETE FROM "queue" WHERE id = (NEW).id; + + RETURN NULL; +END +$$; diff --git a/cmd/setup/47/cockroach/04_create_trigger.sql b/cmd/setup/47/cockroach/04_create_trigger.sql new file mode 100644 index 0000000000..ab4e5f4747 --- /dev/null +++ b/cmd/setup/47/cockroach/04_create_trigger.sql @@ -0,0 +1,5 @@ +CREATE TRIGGER reduce_instance_domain_events +AFTER INSERT ON "queue" +FOR EACH ROW +WHEN (NEW).subscriber = 'transactional-instance-domains' +EXECUTE FUNCTION reduce_instance_domain_events(); diff --git a/cmd/setup/47/cockroach/05_copy_previous_events.sql b/cmd/setup/47/cockroach/05_copy_previous_events.sql new file mode 100644 index 0000000000..c0eb3a33bf --- /dev/null +++ b/cmd/setup/47/cockroach/05_copy_previous_events.sql @@ -0,0 +1,41 @@ +WITH active_instances AS ( + SELECT + instance_id + FROM + eventstore.events2 + WHERE + aggregate_type = 'instance' + AND event_type = 'instance.added' + AND instance_id NOT IN ( + SELECT + instance_id + FROM + eventstore.events2 + WHERE + aggregate_type = 'instance' + AND event_type = 'instance.removed' + ) +) +INSERT INTO "queue" ( + subscriber + , instance_id + , aggregate_type + , aggregate_id + , sequence + , position + , in_position_order +) SELECT + 'transactional-instance-domains' + , e.instance_id + , e.aggregate_type + , e.aggregate_id + , e."sequence" + , e.position + , e.in_tx_order +FROM + eventstore.events2 e +WHERE + e.instance_id IN (SELECT instance_id FROM active_instances) + AND e.aggregate_type = 'instance' + AND e.event_type IN ('instance.domain.added', 'instance.domain.primary.set', 'instance.domain.removed') + AND e.aggregate_id IN (SELECT instance_id FROM active_instances); diff --git a/cmd/setup/47/postgres/02_01_create_reduce_added_function.sql b/cmd/setup/47/postgres/02_01_create_reduce_added_function.sql new file mode 100644 index 0000000000..a4c7612a6e --- /dev/null +++ b/cmd/setup/47/postgres/02_01_create_reduce_added_function.sql @@ -0,0 +1,23 @@ +CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2) +RETURNS VOID +LANGUAGE PLpgSQL +AS $$ +BEGIN + INSERT INTO instances ( + id + , "name" + , creation_date + , change_date + , latest_position + , latest_in_position_order + ) VALUES ( + (event).aggregate_id + , (event).payload->>'name' + , (event).created_at + , (event).created_at + , (event).position + , (event).in_tx_order::INT2 + ) + ON CONFLICT (id) DO NOTHING; +END; +$$; diff --git a/cmd/setup/config.go b/cmd/setup/config.go index b8ce196c71..18b95fe3c8 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -128,8 +128,9 @@ type Steps struct { s38BackChannelLogoutNotificationStart *BackChannelLogoutNotificationStart s40InitPushFunc *InitPushFunc s42Apps7OIDCConfigsLoginVersion *Apps7OIDCConfigsLoginVersion - s43CreateOutbox *CreateOutbox - s44CreateTransactionalInstance *CreateTransactionalInstance + s45EventQueue *EventQueue + s46TransactionalInstanceTable *TransactionalInstanceTable + s47TransactionalInstanceDomainTable *TransactionalInstanceDomainTable } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index c0f6130f7f..3797410770 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -171,8 +171,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient} steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: esPusherDBClient} - steps.s43CreateOutbox = &CreateOutbox{dbClient: queryDBClient} - steps.s44CreateTransactionalInstance = &CreateTransactionalInstance{dbClient: queryDBClient, eventstore: eventstoreClient, BulkLimit: config.InitProjections.BulkLimit} + steps.s45EventQueue = &EventQueue{dbClient: queryDBClient} + steps.s46TransactionalInstanceTable = &TransactionalInstanceTable{dbClient: queryDBClient} + steps.s47TransactionalInstanceDomainTable = &TransactionalInstanceDomainTable{dbClient: queryDBClient} err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -200,8 +201,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) for _, step := range []migration.Migration{ steps.s14NewEventsTable, steps.s40InitPushFunc, - steps.s43CreateOutbox, - steps.s44CreateTransactionalInstance, + steps.s45EventQueue, + steps.s46TransactionalInstanceTable, + steps.s47TransactionalInstanceDomainTable, steps.s1ProjectionTable, steps.s2AssetsTable, steps.s28AddFieldTable,