proper example code for instance and instance domain

This commit is contained in:
adlerhurst 2025-01-07 09:35:59 +01:00
parent 296f8cb5f1
commit 52b5e6063b
No known key found for this signature in database
42 changed files with 520 additions and 354 deletions

View File

@ -1,47 +0,0 @@
package setup
import (
"context"
"embed"
_ "embed"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 43/01_table_definition.sql
createOutboxTable string
//go:embed 43/cockroach/*.sql
//go:embed 43/postgres/*.sql
createOutboxTriggers embed.FS
)
type CreateOutbox struct {
dbClient *database.DB
}
func (mig *CreateOutbox) Execute(ctx context.Context, _ eventstore.Event) error {
_, err := mig.dbClient.ExecContext(ctx, createOutboxTable)
if err != nil {
return err
}
statements, err := readStatements(createOutboxTriggers, "43", mig.dbClient.Type())
if err != nil {
return err
}
for _, stmt := range statements {
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
_, err = mig.dbClient.ExecContext(ctx, stmt.query)
if err != nil {
return err
}
}
return nil
}
func (mig *CreateOutbox) String() string {
return "43_create_outbox"
}

View File

@ -1,13 +0,0 @@
CREATE TABLE IF NOT EXISTS event_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
, instance_id TEXT NOT NULL
, aggregate_type TEXT NOT NULL
, aggregate_id TEXT NOT NULL
, event_type TEXT NOT NULL
, event_revision INT2 NOT NULL
, created_at TIMESTAMPTZ NOT NULL DEFAULT TRANSACTION_TIMESTAMP()
, payload JSONB NULL
, creator TEXT NOT NULL
, position NUMERIC NOT NULL
, in_position_order INT4 NOT NULL
);

View File

@ -1 +0,0 @@
DROP TRIGGER IF EXISTS copy_to_outbox ON eventstore.events2;

View File

@ -1,29 +0,0 @@
CREATE OR REPLACE FUNCTION copy_events_to_outbox()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO event_outbox (
instance_id
, aggregate_type
, aggregate_id
, event_type
, event_revision
, created_at
, payload
, creator
, position
, in_position_order
) VALUES (
(NEW).instance_id
, (NEW).aggregate_type
, (NEW).aggregate_id
, (NEW).event_type
, (NEW).revision
, (NEW).created_at
, (NEW).payload
, (NEW).creator
, (NEW).position::NUMERIC
, (NEW).in_tx_order
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

View File

@ -1,3 +0,0 @@
CREATE TRIGGER copy_to_outbox
AFTER INSERT ON eventstore.events2
FOR EACH ROW EXECUTE FUNCTION copy_events_to_outbox();

View File

@ -1,35 +0,0 @@
DROP TRIGGER IF EXISTS copy_to_outbox ON eventstore.events2;
CREATE OR REPLACE FUNCTION copy_events_to_outbox()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO event_outbox (
instance_id
, aggregate_type
, aggregate_id
, event_type
, event_revision
, created_at
, payload
, creator
, position
, in_position_order
) VALUES (
(NEW).instance_id
, (NEW).aggregate_type
, (NEW).aggregate_id
, (NEW).event_type
, (NEW).revision
, (NEW).created_at
, (NEW).payload
, (NEW).creator
, pg_current_xact_id()::TEXT::NUMERIC
, (NEW).in_tx_order
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER copy_to_outbox
AFTER INSERT ON eventstore.events2
FOR EACH ROW EXECUTE FUNCTION copy_events_to_outbox();

View File

@ -1,164 +0,0 @@
package setup
import (
"context"
"embed"
_ "embed"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/instance"
)
var (
//go:embed 44/01_table_definition.sql
createTransactionalInstance string
//go:embed 44/cockroach/*.sql
//go:embed 44/postgres/*.sql
createReduceInstanceTrigger embed.FS
)
type CreateTransactionalInstance struct {
dbClient *database.DB
eventstore *eventstore.Eventstore
BulkLimit uint64
}
func (mig *CreateTransactionalInstance) Execute(ctx context.Context, _ eventstore.Event) (err error) {
_, err = mig.dbClient.ExecContext(ctx, createTransactionalInstance)
if err != nil {
return err
}
statements, err := readStatements(createReduceInstanceTrigger, "44", mig.dbClient.Type())
if err != nil {
return err
}
for _, stmt := range statements {
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
_, err = mig.dbClient.ExecContext(ctx, stmt.query)
if err != nil {
return err
}
}
reducer := new(instanceEvents)
for {
err = mig.eventstore.FilterToReducer(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AwaitOpenTransactions().
Limit(mig.BulkLimit).
Offset(reducer.offset).
OrderAsc().
AddQuery().
AggregateTypes(instance.AggregateType).
EventTypes(
instance.InstanceAddedEventType,
instance.InstanceChangedEventType,
instance.InstanceRemovedEventType,
instance.DefaultLanguageSetEventType,
instance.ProjectSetEventType,
instance.ConsoleSetEventType,
instance.DefaultOrgSetEventType,
).
Builder(),
reducer,
)
if err != nil || len(reducer.events) == 0 {
return err
}
tx, err := mig.dbClient.BeginTx(ctx, nil)
if err != nil {
return err
}
for _, event := range reducer.events {
switch e := event.(type) {
case *instance.InstanceAddedEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_added($1, $2, $3, $4)",
e.Aggregate().ID,
e.Name,
e.CreatedAt(),
e.Position(),
)
case *instance.InstanceChangedEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_updated($1, $2, $3, $4)",
e.Aggregate().ID,
e.Name,
e.CreatedAt(),
e.Position(),
)
case *instance.InstanceRemovedEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_removed($1)",
e.Aggregate().ID,
)
case *instance.DefaultLanguageSetEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_removed($1, $2, $3, $4)",
e.Aggregate().ID,
e.Language,
e.CreatedAt(),
e.Position(),
)
case *instance.ProjectSetEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_project_set($1, $2, $3, $4)",
e.Aggregate().ID,
e.ProjectID,
e.CreatedAt(),
e.Position(),
)
case *instance.ConsoleSetEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_console_set($1, $2, $3, $4, $5)",
e.Aggregate().ID,
e.AppID,
e.ClientID,
e.CreatedAt(),
e.Position(),
)
case *instance.DefaultOrgSetEvent:
_, err = tx.ExecContext(ctx,
"SELECT reduce_instance_default_org_set($1, $2, $3, $4)",
e.Aggregate().ID,
e.OrgID,
e.CreatedAt(),
e.Position(),
)
}
if err != nil {
_ = tx.Rollback()
return err
}
if err = tx.Commit(); err != nil {
return err
}
}
reducer.events = nil
reducer.offset += uint32(len(reducer.events))
}
}
func (mig *CreateTransactionalInstance) String() string {
return "44_create_transactional_instance"
}
type instanceEvents struct {
offset uint32
events []eventstore.Event
}
// AppendEvents implements eventstore.reducer.
func (i *instanceEvents) AppendEvents(events ...eventstore.Event) {
i.events = append(i.events, events...)
}
// Reduce implements eventstore.reducer.
func (i *instanceEvents) Reduce() error {
return nil
}

View File

@ -1 +0,0 @@
DROP TRIGGER IF EXISTS reduce_instance_added ON eventstore.events2;

View File

@ -1,23 +0,0 @@
CREATE OR REPLACE FUNCTION reduce_instance_events()
RETURNS TRIGGER
LANGUAGE PLpgSQL
AS $$
BEGIN
IF (NEW).event_type = 'instance.added' THEN
SELECT reduce_instance_added(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.changed' THEN
-- SELECT reduce_instance_changed(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.removed' THEN
-- SELECT reduce_instance_removed(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.default.language.set' THEN
-- SELECT reduce_instance_default_language_set(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.default.org.set' THEN
-- SELECT reduce_instance_default_org_set(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.iam.project.set' THEN
-- SELECT reduce_instance_project_set(NEW::eventstore.events2);
-- ELSIF (NEW).event_type = 'instance.iam.console.set' THEN
-- SELECT reduce_instance_console_set(NEW::eventstore.events2);
END IF;
RETURN NULL;
END
$$;

View File

@ -1,26 +0,0 @@
DROP TRIGGER IF EXISTS reduce_instance_added ON eventstore.events2;
CREATE OR REPLACE FUNCTION zitadel.reduce_instance_added()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO zitadel.instances (
id
, name
, change_date
, creation_date
) VALUES (
(NEW).aggregate_id
, (NEW).payload->>'name'
, (NEW).created_at
, (NEW).created_at
)
ON CONFLICT (id) DO NOTHING;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER reduce_instance_added
AFTER INSERT ON eventstore.events2
FOR EACH ROW
WHEN (NEW).event_type = 'instance.added'
EXECUTE FUNCTION reduce_instance_added();

41
cmd/setup/45.go Normal file
View File

@ -0,0 +1,41 @@
package setup
import (
"context"
"embed"
_ "embed"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 45/cockroach/*.sql
//go:embed 45/postgres/*.sql
eventQueue embed.FS
)
type EventQueue struct {
dbClient *database.DB
}
func (mig *EventQueue) Execute(ctx context.Context, _ eventstore.Event) (err error) {
statements, err := readStatements(eventQueue, "45", mig.dbClient.Type())
if err != nil {
return err
}
for _, stmt := range statements {
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
_, err = mig.dbClient.ExecContext(ctx, stmt.query)
if err != nil {
return err
}
}
return nil
}
func (mig *EventQueue) String() string {
return "45_event_queue"
}

View File

@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS subscriptions (
subscriber TEXT NOT NULL
, instance_id TEXT -- if null susbcription is for all instances
, "all" BOOLEAN
, aggregate_type TEXT
, event_type TEXT
, CONSTRAINT min_args CHECK ("all" OR aggregate_type IS NOT NULL)
);

View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS "queue" (
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
, subscriber TEXT NOT NULL
, instance_id TEXT NOT NULL
, aggregate_type TEXT NOT NULL
, aggregate_id TEXT NOT NULL
, sequence INT8 NOT NULL
, position NUMERIC NOT NULL
, in_position_order INT2 NOT NULL
, CONSTRAINT events_fk FOREIGN KEY (instance_id, aggregate_type, aggregate_id, "sequence") REFERENCES eventstore.events2 (instance_id, aggregate_type, aggregate_id, "sequence")
);

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION queue_events()
RETURNS TRIGGER
LANGUAGE PLpgSQL
AS $$
BEGIN
INSERT INTO "queue" (
subscriber
, instance_id
, aggregate_type
, aggregate_id
, sequence
, position
, in_position_order
) SELECT
DISTINCT subscriber
, (NEW).instance_id
, (NEW).aggregate_type
, (NEW).aggregate_id
, (NEW)."sequence"
, (NEW).position
, (NEW).in_tx_order
FROM
subscriptions
WHERE
(instance_id IS NULL OR (NEW).instance_id = instance_id)
AND ("all" OR (
aggregate_type = (NEW).aggregate_type
AND (
event_type IS NULL
OR (NEW).event_type = event_type
))
);
RETURN NULL;
END;
$$;

View File

@ -0,0 +1,4 @@
CREATE TRIGGER write_event_queue
AFTER INSERT ON eventstore.events2
FOR EACH ROW
EXECUTE FUNCTION queue_events();

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION queue_events()
RETURNS TRIGGER
LANGUAGE PLpgSQL
AS $$
BEGIN
INSERT INTO "queue" (
subscriber
, instance_id
, aggregate_type
, aggregate_id
, sequence
, position
, in_position_order
) SELECT
subscriber
, NEW.instance_id
, NEW.aggregate_type
, NEW.aggregate_id
, NEW."sequence"
, NEW.position
, NEW.in_tx_order
FROM
subscriptions
WHERE
(instance_id IS NULL OR NEW.instance_id = instance_id)
AND ("all" OR (
aggregate_type = NEW.aggregate_type
AND (
event_type IS NULL
OR NEW.event_type = event_type
))
);
RETURN NULL;
END;
$$;

41
cmd/setup/46.go Normal file
View File

@ -0,0 +1,41 @@
package setup
import (
"context"
"embed"
_ "embed"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 46/cockroach/*.sql
//go:embed 46/postgres/*.sql
transactionalInstanceTable embed.FS
)
type TransactionalInstanceTable struct {
dbClient *database.DB
}
func (mig *TransactionalInstanceTable) Execute(ctx context.Context, _ eventstore.Event) (err error) {
statements, err := readStatements(transactionalInstanceTable, "46", mig.dbClient.Type())
if err != nil {
return err
}
for _, stmt := range statements {
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
_, err = mig.dbClient.ExecContext(ctx, stmt.query)
if err != nil {
return err
}
}
return nil
}
func (mig *TransactionalInstanceTable) String() string {
return "46_transactional_instance_table"
}

View File

@ -0,0 +1,9 @@
INSERT INTO subscriptions (subscriber, aggregate_type, event_type) VALUES
('transactional-instances', 'instance', 'instance.added')
, ('transactional-instances', 'instance', 'instance.changed')
, ('transactional-instances', 'instance', 'instance.removed')
, ('transactional-instances', 'instance', 'instance.default.language.set')
, ('transactional-instances', 'instance', 'instance.default.org.set')
, ('transactional-instances', 'instance', 'instance.iam.project.set')
, ('transactional-instances', 'instance', 'instance.iam.console.set')
;

View File

@ -11,5 +11,3 @@ CREATE TABLE IF NOT EXISTS instances (
, console_app_id TEXT , console_app_id TEXT
, default_language TEXT , default_language TEXT
); );
-- | sequence INT8 NOT NULL,

View File

@ -1,6 +1,6 @@
CREATE OR REPLACE FUNCTION reduce_instance_added("event" RECORD) CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2)
RETURNS VOID RETURNS VOID
LANGUAGE SQL LANGUAGE PLpgSQL
AS $$ AS $$
BEGIN BEGIN
INSERT INTO instances ( INSERT INTO instances (

View File

@ -0,0 +1,41 @@
CREATE OR REPLACE FUNCTION reduce_instance_events()
RETURNS TRIGGER
LANGUAGE PLpgSQL
AS $$
DECLARE
"event" eventstore.events2;
BEGIN
SELECT
*
INTO
event
FROM
eventstore.events2 e
WHERE
e.instance_id = (NEW).instance_id
AND e.aggregate_type = (NEW).aggregate_type
AND e.aggregate_id = (NEW).aggregate_id
AND e."sequence" = (NEW)."sequence"
;
IF ("event").event_type = 'instance.added' THEN
SELECT reduce_instance_added("event");
ELSIF ("event").event_type = 'instance.changed' THEN
SELECT reduce_instance_changed("event");
ELSIF ("event").event_type = 'instance.removed' THEN
SELECT reduce_instance_removed("event");
ELSIF ("event").event_type = 'instance.default.language.set' THEN
SELECT reduce_instance_default_language_set("event");
ELSIF ("event").event_type = 'instance.default.org.set' THEN
SELECT reduce_instance_default_org_set("event");
ELSIF ("event").event_type = 'instance.iam.project.set' THEN
SELECT reduce_instance_project_set("event");
ELSIF ("event").event_type = 'instance.iam.console.set' THEN
SELECT reduce_instance_console_set("event");
END IF;
DELETE FROM "queue" WHERE id = (NEW).id;
RETURN NULL;
END
$$;

View File

@ -1,5 +1,5 @@
CREATE TRIGGER reduce_instance_events CREATE TRIGGER reduce_instance_events
AFTER INSERT ON eventstore.events2 AFTER INSERT ON "queue"
FOR EACH ROW FOR EACH ROW
WHEN (NEW).aggregate_type = 'instance' WHEN (NEW).subscriber = 'transactional-instances'
EXECUTE FUNCTION reduce_instance_events(); EXECUTE FUNCTION reduce_instance_events();

View File

@ -0,0 +1,41 @@
WITH active_instances AS (
SELECT
instance_id
FROM
eventstore.events2
WHERE
aggregate_type = 'instance'
AND event_type = 'instance.added'
AND instance_id NOT IN (
SELECT
instance_id
FROM
eventstore.events2
WHERE
aggregate_type = 'instance'
AND event_type = 'instance.removed'
)
)
INSERT INTO "queue" (
subscriber
, instance_id
, aggregate_type
, aggregate_id
, sequence
, position
, in_position_order
) SELECT
'transactional-instances'
, e.instance_id
, e.aggregate_type
, e.aggregate_id
, e."sequence"
, e.position
, e.in_tx_order
FROM
eventstore.events2 e
WHERE
e.instance_id IN (SELECT instance_id FROM active_instances)
AND e.aggregate_type = 'instance'
AND e.event_type IN ('instance.added', 'instance.changed', 'instance.removed', 'instance.default.language.set', 'instance.default.org.set', 'instance.iam.project.set', 'instance.iam.console.set')
AND e.aggregate_id IN (SELECT instance_id FROM active_instances);

View File

@ -0,0 +1,23 @@
CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2)
RETURNS VOID
LANGUAGE PLpgSQL
AS $$
BEGIN
INSERT INTO instances (
id
, "name"
, creation_date
, change_date
, latest_position
, latest_in_position_order
) VALUES (
(event).aggregate_id
, (event).payload->>'name'
, (event).created_at
, (event).created_at
, (event).position
, (event).in_tx_order::INT2
)
ON CONFLICT (id) DO NOTHING;
END;
$$;

41
cmd/setup/47.go Normal file
View File

@ -0,0 +1,41 @@
package setup
import (
"context"
"embed"
_ "embed"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 47/cockroach/*.sql
//go:embed 47/postgres/*.sql
transactionalInstanceDomainTable embed.FS
)
type TransactionalInstanceDomainTable struct {
dbClient *database.DB
}
func (mig *TransactionalInstanceDomainTable) Execute(ctx context.Context, _ eventstore.Event) (err error) {
statements, err := readStatements(transactionalInstanceDomainTable, "47", mig.dbClient.Type())
if err != nil {
return err
}
for _, stmt := range statements {
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
_, err = mig.dbClient.ExecContext(ctx, stmt.query)
if err != nil {
return err
}
}
return nil
}
func (mig *TransactionalInstanceDomainTable) String() string {
return "47_transactional_instance_domain_table"
}

View File

@ -0,0 +1,5 @@
INSERT INTO subscriptions (subscriber, aggregate_type, event_type) VALUES
('transactional-instance-domains', 'instance', 'instance.domain.added')
, ('transactional-instance-domains', 'instance', 'instance.domain.primary.set')
, ('transactional-instance-domains', 'instance', 'instance.domain.removed')
;

View File

@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS instance_domains (
instance_id TEXT
, domain TEXT
, is_generated BOOLEAN NOT NULL
, is_primary BOOLEAN NOT NULL DEFAULT FALSE
, change_date TIMESTAMPTZ NOT NULL
, creation_date TIMESTAMPTZ NOT NULL
, latest_position NUMERIC NOT NULL
, latest_in_position_order INT2 NOT NULL
, PRIMARY KEY (instance_id, domain)
, CONSTRAINT fk_instance_id FOREIGN KEY (instance_id) REFERENCES instances (id)
);

View File

@ -0,0 +1,25 @@
CREATE OR REPLACE FUNCTION reduce_instance_domain_added("event" eventstore.events2)
RETURNS VOID
LANGUAGE PLpgSQL
AS $$
BEGIN
INSERT INTO instance_domains (
instance_id
, domain
, is_generated
, creation_date
, change_date
, latest_position
, latest_in_position_order
) VALUES (
(event).aggregate_id
, (event).payload->>'domain'
, COALESCE((event).payload->'generated', 'false')::BOOLEAN
, (event).created_at
, (event).created_at
, (event).position
, (event).in_tx_order::INT2
)
ON CONFLICT (instance_id, domain) DO NOTHING;
END;
$$;

View File

@ -0,0 +1,15 @@
CREATE OR REPLACE FUNCTION reduce_instance_domain_primary_set("event" eventstore.events2)
RETURNS VOID
LANGUAGE PLpgSQL
AS $$
BEGIN
UPDATE instance_domains SET
is_primary = ((event).payload->>'domain' = domain)
, change_date = (event).created_at
, latest_position = (event).position
, latest_in_position_order = (event).in_tx_order::INT2
WHERE
instance_id = (event).aggregate_id
AND (latest_position, latest_in_position_order) < ((event).position, (event).in_tx_order::INT2);
END;
$$;

View File

@ -0,0 +1,12 @@
CREATE OR REPLACE FUNCTION reduce_instance_domain_removed("event" eventstore.events2)
RETURNS VOID
LANGUAGE PLpgSQL
AS $$
BEGIN
DELETE FROM
instance_domains
WHERE
instance_id = (event).aggregate_id
AND domain = (event).payload->>'domain';
END;
$$;

View File

@ -0,0 +1,33 @@
CREATE OR REPLACE FUNCTION reduce_instance_domain_events()
RETURNS TRIGGER
LANGUAGE PLpgSQL
AS $$
DECLARE
"event" eventstore.events2;
BEGIN
SELECT
*
INTO
event
FROM
eventstore.events2 e
WHERE
e.instance_id = (NEW).instance_id
AND e.aggregate_type = (NEW).aggregate_type
AND e.aggregate_id = (NEW).aggregate_id
AND e."sequence" = (NEW)."sequence"
;
IF ("event").event_type = 'instance.domain.added' THEN
SELECT reduce_instance_domain_added("event");
ELSIF ("event").event_type = 'instance.domain.primary.set' THEN
SELECT reduce_instance_domain_primary_set("event");
ELSIF ("event").event_type = 'instance.domain.removed' THEN
SELECT reduce_instance_domain_removed("event");
END IF;
DELETE FROM "queue" WHERE id = (NEW).id;
RETURN NULL;
END
$$;

View File

@ -0,0 +1,5 @@
CREATE TRIGGER reduce_instance_domain_events
AFTER INSERT ON "queue"
FOR EACH ROW
WHEN (NEW).subscriber = 'transactional-instance-domains'
EXECUTE FUNCTION reduce_instance_domain_events();

View File

@ -0,0 +1,41 @@
WITH active_instances AS (
SELECT
instance_id
FROM
eventstore.events2
WHERE
aggregate_type = 'instance'
AND event_type = 'instance.added'
AND instance_id NOT IN (
SELECT
instance_id
FROM
eventstore.events2
WHERE
aggregate_type = 'instance'
AND event_type = 'instance.removed'
)
)
INSERT INTO "queue" (
subscriber
, instance_id
, aggregate_type
, aggregate_id
, sequence
, position
, in_position_order
) SELECT
'transactional-instance-domains'
, e.instance_id
, e.aggregate_type
, e.aggregate_id
, e."sequence"
, e.position
, e.in_tx_order
FROM
eventstore.events2 e
WHERE
e.instance_id IN (SELECT instance_id FROM active_instances)
AND e.aggregate_type = 'instance'
AND e.event_type IN ('instance.domain.added', 'instance.domain.primary.set', 'instance.domain.removed')
AND e.aggregate_id IN (SELECT instance_id FROM active_instances);

View File

@ -0,0 +1,23 @@
CREATE OR REPLACE FUNCTION reduce_instance_added("event" eventstore.events2)
RETURNS VOID
LANGUAGE PLpgSQL
AS $$
BEGIN
INSERT INTO instances (
id
, "name"
, creation_date
, change_date
, latest_position
, latest_in_position_order
) VALUES (
(event).aggregate_id
, (event).payload->>'name'
, (event).created_at
, (event).created_at
, (event).position
, (event).in_tx_order::INT2
)
ON CONFLICT (id) DO NOTHING;
END;
$$;

View File

@ -128,8 +128,9 @@ type Steps struct {
s38BackChannelLogoutNotificationStart *BackChannelLogoutNotificationStart s38BackChannelLogoutNotificationStart *BackChannelLogoutNotificationStart
s40InitPushFunc *InitPushFunc s40InitPushFunc *InitPushFunc
s42Apps7OIDCConfigsLoginVersion *Apps7OIDCConfigsLoginVersion s42Apps7OIDCConfigsLoginVersion *Apps7OIDCConfigsLoginVersion
s43CreateOutbox *CreateOutbox s45EventQueue *EventQueue
s44CreateTransactionalInstance *CreateTransactionalInstance s46TransactionalInstanceTable *TransactionalInstanceTable
s47TransactionalInstanceDomainTable *TransactionalInstanceDomainTable
} }
func MustNewSteps(v *viper.Viper) *Steps { func MustNewSteps(v *viper.Viper) *Steps {

View File

@ -171,8 +171,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient} steps.s38BackChannelLogoutNotificationStart = &BackChannelLogoutNotificationStart{dbClient: esPusherDBClient, esClient: eventstoreClient}
steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient} steps.s40InitPushFunc = &InitPushFunc{dbClient: esPusherDBClient}
steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: esPusherDBClient} steps.s42Apps7OIDCConfigsLoginVersion = &Apps7OIDCConfigsLoginVersion{dbClient: esPusherDBClient}
steps.s43CreateOutbox = &CreateOutbox{dbClient: queryDBClient} steps.s45EventQueue = &EventQueue{dbClient: queryDBClient}
steps.s44CreateTransactionalInstance = &CreateTransactionalInstance{dbClient: queryDBClient, eventstore: eventstoreClient, BulkLimit: config.InitProjections.BulkLimit} steps.s46TransactionalInstanceTable = &TransactionalInstanceTable{dbClient: queryDBClient}
steps.s47TransactionalInstanceDomainTable = &TransactionalInstanceDomainTable{dbClient: queryDBClient}
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections") logging.OnError(err).Fatal("unable to start projections")
@ -200,8 +201,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
for _, step := range []migration.Migration{ for _, step := range []migration.Migration{
steps.s14NewEventsTable, steps.s14NewEventsTable,
steps.s40InitPushFunc, steps.s40InitPushFunc,
steps.s43CreateOutbox, steps.s45EventQueue,
steps.s44CreateTransactionalInstance, steps.s46TransactionalInstanceTable,
steps.s47TransactionalInstanceDomainTable,
steps.s1ProjectionTable, steps.s1ProjectionTable,
steps.s2AssetsTable, steps.s2AssetsTable,
steps.s28AddFieldTable, steps.s28AddFieldTable,