diff --git a/cmd/setup/40.go b/cmd/setup/40.go index ff1188776f..b16b9226f7 100644 --- a/cmd/setup/40.go +++ b/cmd/setup/40.go @@ -2,8 +2,13 @@ package setup import ( "context" + "database/sql" "embed" "fmt" + "io/fs" + "path" + "strings" + "text/template" "github.com/zitadel/logging" @@ -11,6 +16,13 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" ) +// query filenames +const ( + fileInTxOrderType = "00_in_tx_order_type.sql" + fileType = "01_type.sql" + fileFunc = "02_func.sql" +) + var ( //go:embed 40/cockroach/*.sql //go:embed 40/postgres/*.sql @@ -22,10 +34,6 @@ type InitPushFunc struct { } func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err error) { - statements, err := readStatements(initPushFunc, "40", mig.dbClient.Type()) - if err != nil { - return err - } conn, err := mig.dbClient.Conn(ctx) if err != nil { return err @@ -36,7 +44,10 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e // Force the pool to reopen connections to apply the new types mig.dbClient.Pool.Reset() }() - + statements, err := mig.prepareStatements(ctx) + if err != nil { + return err + } for _, stmt := range statements { logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement") if _, err := conn.ExecContext(ctx, stmt.query); err != nil { @@ -50,3 +61,56 @@ func (mig *InitPushFunc) Execute(ctx context.Context, _ eventstore.Event) (err e func (mig *InitPushFunc) String() string { return "40_init_push_func_v4" } + +func (mig *InitPushFunc) prepareStatements(ctx context.Context) ([]statement, error) { + funcTmpl, err := template.ParseFS(initPushFunc, mig.filePath(fileFunc)) + if err != nil { + return nil, fmt.Errorf("prepare steps: %w", err) + } + typeName, err := mig.inTxOrderType(ctx) + if err != nil { + return nil, fmt.Errorf("prepare steps: %w", err) + } + var funcStep strings.Builder + err = funcTmpl.Execute(&funcStep, struct { + InTxOrderType string + }{ + InTxOrderType: typeName, + }) + if err != nil { + return nil, fmt.Errorf("prepare steps: %w", err) + } + typeStatement, err := fs.ReadFile(initPushFunc, mig.filePath(fileType)) + if err != nil { + return nil, fmt.Errorf("prepare steps: %w", err) + } + return []statement{ + { + file: fileType, + query: string(typeStatement), + }, + { + file: fileFunc, + query: funcStep.String(), + }, + }, nil +} + +func (mig *InitPushFunc) inTxOrderType(ctx context.Context) (typeName string, err error) { + query, err := fs.ReadFile(initPushFunc, mig.filePath(fileInTxOrderType)) + if err != nil { + return "", fmt.Errorf("get in_tx_order_type: %w", err) + } + + err = mig.dbClient.QueryRowContext(ctx, func(row *sql.Row) error { + return row.Scan(&typeName) + }, string(query)) + if err != nil { + return "", fmt.Errorf("get in_tx_order_type: %w", err) + } + return typeName, nil +} + +func (mig *InitPushFunc) filePath(fileName string) string { + return path.Join("40", mig.dbClient.Type(), fileName) +} diff --git a/cmd/setup/40/cockroach/00_in_tx_order_type.sql b/cmd/setup/40/cockroach/00_in_tx_order_type.sql new file mode 100644 index 0000000000..68b7daf984 --- /dev/null +++ b/cmd/setup/40/cockroach/00_in_tx_order_type.sql @@ -0,0 +1,5 @@ +SELECT data_type +FROM information_schema.columns +WHERE table_schema = 'eventstore' +AND table_name = 'events2' +AND column_name = 'in_tx_order'; diff --git a/cmd/setup/40/cockroach/01_type.sql b/cmd/setup/40/cockroach/01_type.sql new file mode 100644 index 0000000000..e26af2f828 --- /dev/null +++ b/cmd/setup/40/cockroach/01_type.sql @@ -0,0 +1,10 @@ +CREATE TYPE IF NOT EXISTS eventstore.command AS ( + instance_id TEXT + , aggregate_type TEXT + , aggregate_id TEXT + , command_type TEXT + , revision INT2 + , payload JSONB + , creator TEXT + , owner TEXT +); diff --git a/cmd/setup/40/cockroach/40_init_push_func.sql b/cmd/setup/40/cockroach/02_func.sql similarity index 92% rename from cmd/setup/40/cockroach/40_init_push_func.sql rename to cmd/setup/40/cockroach/02_func.sql index 9a08b5d355..9cb45529ad 100644 --- a/cmd/setup/40/cockroach/40_init_push_func.sql +++ b/cmd/setup/40/cockroach/02_func.sql @@ -1,15 +1,3 @@ --- represents an event to be created. -CREATE TYPE IF NOT EXISTS eventstore.command AS ( - instance_id TEXT - , aggregate_type TEXT - , aggregate_id TEXT - , command_type TEXT - , revision INT2 - , payload JSONB - , creator TEXT - , owner TEXT -); - CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state( instance_id TEXT , aggregate_type TEXT @@ -98,7 +86,7 @@ BEGIN , ("c").creator , COALESCE(current_owner, ("c").owner) -- AS owner , cluster_logical_timestamp() -- AS position - , ordinality::INT -- AS in_tx_order + , ordinality::{{ .InTxOrderType }} -- AS in_tx_order FROM UNNEST(commands) WITH ORDINALITY AS c WHERE diff --git a/cmd/setup/40/postgres/00_in_tx_order_type.sql b/cmd/setup/40/postgres/00_in_tx_order_type.sql new file mode 100644 index 0000000000..68b7daf984 --- /dev/null +++ b/cmd/setup/40/postgres/00_in_tx_order_type.sql @@ -0,0 +1,5 @@ +SELECT data_type +FROM information_schema.columns +WHERE table_schema = 'eventstore' +AND table_name = 'events2' +AND column_name = 'in_tx_order'; diff --git a/cmd/setup/40/postgres/02_func.sql b/cmd/setup/40/postgres/02_func.sql index 0d566ebb42..851547c240 100644 --- a/cmd/setup/40/postgres/02_func.sql +++ b/cmd/setup/40/postgres/02_func.sql @@ -72,7 +72,7 @@ BEGIN , c.creator , COALESCE(current_owner, c.owner) -- AS owner , EXTRACT(EPOCH FROM NOW()) -- AS position - , c.ordinality::INT -- AS in_tx_order + , c.ordinality::{{ .InTxOrderType }} -- AS in_tx_order FROM UNNEST(commands) WITH ORDINALITY AS c WHERE diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 2394af2024..33298033a9 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -4,9 +4,11 @@ import ( "context" "embed" _ "embed" + "errors" "net/http" "path" + "github.com/jackc/pgx/v5/pgconn" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/zitadel/logging" @@ -271,7 +273,23 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) func mustExecuteMigration(ctx context.Context, eventstoreClient *eventstore.Eventstore, step migration.Migration, errorMsg string) { err := migration.Migrate(ctx, eventstoreClient, step) - logging.WithFields("name", step.String()).OnError(err).Fatal(errorMsg) + if err == nil { + return + } + logFields := []any{ + "name", step.String(), + } + pgErr := new(pgconn.PgError) + if errors.As(err, &pgErr) { + logFields = append(logFields, + "severity", pgErr.Severity, + "code", pgErr.Code, + "message", pgErr.Message, + "detail", pgErr.Detail, + "hint", pgErr.Hint, + ) + } + logging.WithFields(logFields...).WithError(err).Fatal(errorMsg) } // readStmt reads a single file from the embedded FS,