mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 15:57:32 +00:00
Merge branch 'next-rc' into next
# Conflicts: # build/workflow.Dockerfile # cmd/setup/config.go # cmd/setup/setup.go # console/package.json # console/src/app/services/grpc.service.ts # console/yarn.lock # deploy/knative/cockroachdb-statefulset-single-node.yaml # e2e/config/localhost/docker-compose.yaml # go.mod # go.sum # internal/command/oidc_session_test.go # internal/query/idp_template_test.go
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -19,7 +19,7 @@ func MustNewConfig(v *viper.Viper) *Config {
|
||||
config := new(Config)
|
||||
err := v.Unmarshal(config,
|
||||
viper.DecodeHook(mapstructure.ComposeDecodeHookFunc(
|
||||
database.DecodeHook,
|
||||
database.DecodeHook(false),
|
||||
mapstructure.TextUnmarshallerHookFunc(),
|
||||
)),
|
||||
)
|
||||
|
@@ -12,20 +12,17 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed sql/cockroach/*
|
||||
//go:embed sql/postgres/*
|
||||
//go:embed sql/*.sql
|
||||
stmts embed.FS
|
||||
|
||||
createUserStmt string
|
||||
grantStmt string
|
||||
settingsStmt string
|
||||
databaseStmt string
|
||||
createEventstoreStmt string
|
||||
createProjectionsStmt string
|
||||
createSystemStmt string
|
||||
createEncryptionKeysStmt string
|
||||
createEventsStmt string
|
||||
createSystemSequenceStmt string
|
||||
createUniqueConstraints string
|
||||
|
||||
roleAlreadyExistsCode = "42710"
|
||||
@@ -39,7 +36,7 @@ func New() *cobra.Command {
|
||||
Long: `Sets up the minimum requirements to start ZITADEL.
|
||||
|
||||
Prerequisites:
|
||||
- database (PostgreSql or cockroachdb)
|
||||
- PostgreSql database
|
||||
|
||||
The user provided by flags needs privileges to
|
||||
- create the database if it does not exist
|
||||
@@ -53,7 +50,7 @@ The user provided by flags needs privileges to
|
||||
},
|
||||
}
|
||||
|
||||
cmd.AddCommand(newZitadel(), newDatabase(), newUser(), newGrant(), newSettings())
|
||||
cmd.AddCommand(newZitadel(), newDatabase(), newUser(), newGrant())
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -62,7 +59,6 @@ func InitAll(ctx context.Context, config *Config) {
|
||||
VerifyUser(config.Database.Username(), config.Database.Password()),
|
||||
VerifyDatabase(config.Database.DatabaseName()),
|
||||
VerifyGrant(config.Database.DatabaseName(), config.Database.Username()),
|
||||
VerifySettings(config.Database.DatabaseName(), config.Database.Username()),
|
||||
)
|
||||
logging.OnError(err).Fatal("unable to initialize the database")
|
||||
|
||||
@@ -73,7 +69,7 @@ func InitAll(ctx context.Context, config *Config) {
|
||||
func initialise(ctx context.Context, config database.Config, steps ...func(context.Context, *database.DB) error) error {
|
||||
logging.Info("initialization started")
|
||||
|
||||
err := ReadStmts(config.Type())
|
||||
err := ReadStmts()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -97,58 +93,48 @@ func Init(ctx context.Context, db *database.DB, steps ...func(context.Context, *
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadStmts(typ string) (err error) {
|
||||
createUserStmt, err = readStmt(typ, "01_user")
|
||||
func ReadStmts() (err error) {
|
||||
createUserStmt, err = readStmt("01_user")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
databaseStmt, err = readStmt(typ, "02_database")
|
||||
databaseStmt, err = readStmt("02_database")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
grantStmt, err = readStmt(typ, "03_grant_user")
|
||||
grantStmt, err = readStmt("03_grant_user")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createEventstoreStmt, err = readStmt(typ, "04_eventstore")
|
||||
createEventstoreStmt, err = readStmt("04_eventstore")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createProjectionsStmt, err = readStmt(typ, "05_projections")
|
||||
createProjectionsStmt, err = readStmt("05_projections")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createSystemStmt, err = readStmt(typ, "06_system")
|
||||
createSystemStmt, err = readStmt("06_system")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createEncryptionKeysStmt, err = readStmt(typ, "07_encryption_keys_table")
|
||||
createEncryptionKeysStmt, err = readStmt("07_encryption_keys_table")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createEventsStmt, err = readStmt(typ, "08_events_table")
|
||||
createEventsStmt, err = readStmt("08_events_table")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createSystemSequenceStmt, err = readStmt(typ, "09_system_sequence")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createUniqueConstraints, err = readStmt(typ, "10_unique_constraints_table")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
settingsStmt, err = readStmt(typ, "11_settings")
|
||||
createUniqueConstraints, err = readStmt("10_unique_constraints_table")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -156,7 +142,7 @@ func ReadStmts(typ string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func readStmt(typ, step string) (string, error) {
|
||||
stmt, err := stmts.ReadFile("sql/" + typ + "/" + step + ".sql")
|
||||
func readStmt(step string) (string, error) {
|
||||
stmt, err := stmts.ReadFile("sql/" + step + ".sql")
|
||||
return string(stmt), err
|
||||
}
|
||||
|
@@ -1,2 +1,2 @@
|
||||
-- replace %[1]s with the name of the user
|
||||
CREATE USER IF NOT EXISTS "%[1]s"
|
||||
CREATE USER "%[1]s"
|
@@ -1,2 +1,2 @@
|
||||
-- replace %[1]s with the name of the database
|
||||
CREATE DATABASE IF NOT EXISTS "%[1]s";
|
||||
CREATE DATABASE "%[1]s"
|
@@ -11,6 +11,5 @@ The sql-files in this folder initialize the ZITADEL database and user. These obj
|
||||
- 05_projections.sql: creates the schema needed to read the data
|
||||
- 06_system.sql: creates the schema needed for ZITADEL itself
|
||||
- 07_encryption_keys_table.sql: creates the table for encryption keys (for event data)
|
||||
- files 08_enable_hash_sharded_indexes.sql and 09_events_table.sql must run in the same session
|
||||
- 08_enable_hash_sharded_indexes.sql enables the [hash sharded index](https://www.cockroachlabs.com/docs/stable/hash-sharded-indexes.html) feature for this session
|
||||
- 09_events_table.sql creates the table for eventsourcing
|
||||
- 08_events_table.sql creates the table for eventsourcing
|
||||
- 10_unique_constraints_table.sql creates the table to check unique constraints for events
|
||||
|
@@ -1,4 +0,0 @@
|
||||
-- replace the first %[1]s with the database
|
||||
-- replace the second \%[2]s with the user
|
||||
GRANT ALL ON DATABASE "%[1]s" TO "%[2]s";
|
||||
GRANT SYSTEM VIEWACTIVITY TO "%[2]s";
|
@@ -1,116 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS eventstore.events2 (
|
||||
instance_id TEXT NOT NULL
|
||||
, aggregate_type TEXT NOT NULL
|
||||
, aggregate_id TEXT NOT NULL
|
||||
|
||||
, event_type TEXT NOT NULL
|
||||
, "sequence" BIGINT NOT NULL
|
||||
, revision SMALLINT NOT NULL
|
||||
, created_at TIMESTAMPTZ NOT NULL
|
||||
, payload JSONB
|
||||
, creator TEXT NOT NULL
|
||||
, "owner" TEXT NOT NULL
|
||||
|
||||
, "position" DECIMAL NOT NULL
|
||||
, in_tx_order INTEGER NOT NULL
|
||||
|
||||
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, "sequence")
|
||||
, INDEX es_active_instances (created_at DESC) STORING ("position")
|
||||
, INDEX es_wm (aggregate_id, instance_id, aggregate_type, event_type)
|
||||
, INDEX es_projection (instance_id, aggregate_type, event_type, "position" DESC)
|
||||
);
|
||||
|
||||
-- represents an event to be created.
|
||||
CREATE TYPE IF NOT EXISTS eventstore.command AS (
|
||||
instance_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, aggregate_id TEXT
|
||||
, command_type TEXT
|
||||
, revision INT2
|
||||
, payload JSONB
|
||||
, creator TEXT
|
||||
, owner TEXT
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events(commands eventstore.command[]) RETURNS SETOF eventstore.events2 VOLATILE AS $$
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type AS event_type
|
||||
, cs.sequence + ROW_NUMBER() OVER (PARTITION BY ("c").instance_id, ("c").aggregate_type, ("c").aggregate_id ORDER BY ("c").in_tx_order) AS sequence
|
||||
, ("c").revision
|
||||
, hlc_to_timestamp(cluster_logical_timestamp()) AS created_at
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, cs.owner
|
||||
, cluster_logical_timestamp() AS position
|
||||
, ("c").in_tx_order
|
||||
FROM (
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type
|
||||
, ("c").revision
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, ("c").owner
|
||||
, ROW_NUMBER() OVER () AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) AS "c"
|
||||
) AS "c"
|
||||
JOIN (
|
||||
SELECT
|
||||
cmds.instance_id
|
||||
, cmds.aggregate_type
|
||||
, cmds.aggregate_id
|
||||
, CASE WHEN (e.owner IS NOT NULL OR e.owner <> '') THEN e.owner ELSE command_owners.owner END AS owner
|
||||
, COALESCE(MAX(e.sequence), 0) AS sequence
|
||||
FROM (
|
||||
SELECT DISTINCT
|
||||
("cmds").instance_id
|
||||
, ("cmds").aggregate_type
|
||||
, ("cmds").aggregate_id
|
||||
, ("cmds").owner
|
||||
FROM UNNEST(commands) AS "cmds"
|
||||
) AS cmds
|
||||
LEFT JOIN eventstore.events2 AS e
|
||||
ON cmds.instance_id = e.instance_id
|
||||
AND cmds.aggregate_type = e.aggregate_type
|
||||
AND cmds.aggregate_id = e.aggregate_id
|
||||
JOIN (
|
||||
SELECT
|
||||
DISTINCT ON (
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
)
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").owner
|
||||
FROM
|
||||
UNNEST(commands) AS "c"
|
||||
) AS command_owners ON
|
||||
cmds.instance_id = command_owners.instance_id
|
||||
AND cmds.aggregate_type = command_owners.aggregate_type
|
||||
AND cmds.aggregate_id = command_owners.aggregate_id
|
||||
GROUP BY
|
||||
cmds.instance_id
|
||||
, cmds.aggregate_type
|
||||
, cmds.aggregate_id
|
||||
, 4 -- owner
|
||||
) AS cs
|
||||
ON ("c").instance_id = cs.instance_id
|
||||
AND ("c").aggregate_type = cs.aggregate_type
|
||||
AND ("c").aggregate_id = cs.aggregate_id
|
||||
ORDER BY
|
||||
in_tx_order
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||
INSERT INTO eventstore.events2
|
||||
SELECT * FROM eventstore.commands_to_events(commands)
|
||||
RETURNING *
|
||||
$$ LANGUAGE SQL;
|
@@ -1 +0,0 @@
|
||||
CREATE SEQUENCE IF NOT EXISTS eventstore.system_seq
|
@@ -1,6 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS eventstore.unique_constraints (
|
||||
instance_id TEXT,
|
||||
unique_type TEXT,
|
||||
unique_field TEXT,
|
||||
PRIMARY KEY (instance_id, unique_type, unique_field)
|
||||
)
|
@@ -1,4 +0,0 @@
|
||||
-- replace the first %[1]q with the database in double quotes
|
||||
-- replace the second \%[2]q with the user in double quotes$
|
||||
-- For more information see technical advisory 10009 (https://zitadel.com/docs/support/advisory/a10009)
|
||||
ALTER ROLE %[2]q IN DATABASE %[1]q SET enable_durable_locking_for_serializable = on;
|
@@ -1 +0,0 @@
|
||||
CREATE USER "%[1]s"
|
@@ -1 +0,0 @@
|
||||
CREATE DATABASE "%[1]s"
|
@@ -1,3 +0,0 @@
|
||||
CREATE SCHEMA IF NOT EXISTS eventstore;
|
||||
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA eventstore TO "%[1]s";
|
@@ -1,3 +0,0 @@
|
||||
CREATE SCHEMA IF NOT EXISTS projections;
|
||||
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA projections TO "%[1]s";
|
@@ -1,3 +0,0 @@
|
||||
CREATE SCHEMA IF NOT EXISTS system;
|
||||
|
||||
GRANT ALL ON ALL TABLES IN SCHEMA system TO "%[1]s";
|
@@ -1,6 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS system.encryption_keys (
|
||||
id TEXT NOT NULL
|
||||
, key TEXT NOT NULL
|
||||
|
||||
, PRIMARY KEY (id)
|
||||
);
|
@@ -1 +0,0 @@
|
||||
CREATE SEQUENCE IF NOT EXISTS eventstore.system_seq;
|
@@ -19,7 +19,7 @@ func newDatabase() *cobra.Command {
|
||||
Long: `Sets up the ZITADEL database.
|
||||
|
||||
Prerequisites:
|
||||
- cockroachDB or postgreSQL
|
||||
- postgreSQL
|
||||
|
||||
The user provided by flags needs privileges to
|
||||
- create the database if it does not exist
|
||||
|
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func Test_verifyDB(t *testing.T) {
|
||||
err := ReadStmts("cockroach") //TODO: check all dialects
|
||||
err := ReadStmts()
|
||||
if err != nil {
|
||||
t.Errorf("unable to read stmts: %v", err)
|
||||
t.FailNow()
|
||||
@@ -27,7 +27,7 @@ func Test_verifyDB(t *testing.T) {
|
||||
name: "doesn't exists, create fails",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE IF NOT EXISTS \"zitadel\"", sql.ErrTxDone),
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE \"zitadel\"", sql.ErrTxDone),
|
||||
),
|
||||
database: "zitadel",
|
||||
},
|
||||
@@ -37,7 +37,7 @@ func Test_verifyDB(t *testing.T) {
|
||||
name: "doesn't exists, create successful",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE IF NOT EXISTS \"zitadel\"", nil),
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE \"zitadel\"", nil),
|
||||
),
|
||||
database: "zitadel",
|
||||
},
|
||||
@@ -47,7 +47,7 @@ func Test_verifyDB(t *testing.T) {
|
||||
name: "already exists",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE IF NOT EXISTS \"zitadel\"", nil),
|
||||
expectExec("-- replace zitadel with the name of the database\nCREATE DATABASE \"zitadel\"", nil),
|
||||
),
|
||||
database: "zitadel",
|
||||
},
|
||||
|
@@ -19,7 +19,7 @@ func newGrant() *cobra.Command {
|
||||
Long: `Sets ALL grant to the database user.
|
||||
|
||||
Prerequisites:
|
||||
- cockroachDB or postgreSQL
|
||||
- postgreSQL
|
||||
`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
config := MustNewConfig(viper.GetViper())
|
||||
|
@@ -1,45 +0,0 @@
|
||||
package initialise
|
||||
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
)
|
||||
|
||||
func newSettings() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "settings",
|
||||
Short: "Ensures proper settings on the database",
|
||||
Long: `Ensures proper settings on the database.
|
||||
|
||||
Prerequisites:
|
||||
- cockroachDB or postgreSQL
|
||||
|
||||
Cockroach
|
||||
- Sets enable_durable_locking_for_serializable to on for the zitadel user and database
|
||||
`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
config := MustNewConfig(viper.GetViper())
|
||||
|
||||
err := initialise(cmd.Context(), config.Database, VerifySettings(config.Database.DatabaseName(), config.Database.Username()))
|
||||
logging.OnError(err).Fatal("unable to set settings")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func VerifySettings(databaseName, username string) func(context.Context, *database.DB) error {
|
||||
return func(ctx context.Context, db *database.DB) error {
|
||||
if db.Type() == "postgres" {
|
||||
return nil
|
||||
}
|
||||
logging.WithFields("user", username, "database", databaseName).Info("verify settings")
|
||||
|
||||
return exec(ctx, db, fmt.Sprintf(settingsStmt, databaseName, username), nil)
|
||||
}
|
||||
}
|
@@ -19,7 +19,7 @@ func newUser() *cobra.Command {
|
||||
Long: `Sets up the ZITADEL database user.
|
||||
|
||||
Prerequisites:
|
||||
- cockroachDB or postgreSQL
|
||||
- postgreSQL
|
||||
|
||||
The user provided by flags needs privileges to
|
||||
- create the database if it does not exist
|
||||
|
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func Test_verifyUser(t *testing.T) {
|
||||
err := ReadStmts("cockroach") //TODO: check all dialects
|
||||
err := ReadStmts()
|
||||
if err != nil {
|
||||
t.Errorf("unable to read stmts: %v", err)
|
||||
t.FailNow()
|
||||
@@ -28,7 +28,7 @@ func Test_verifyUser(t *testing.T) {
|
||||
name: "doesn't exists, create fails",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER IF NOT EXISTS \"zitadel-user\"", sql.ErrTxDone),
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER \"zitadel-user\"", sql.ErrTxDone),
|
||||
),
|
||||
username: "zitadel-user",
|
||||
password: "",
|
||||
@@ -39,7 +39,7 @@ func Test_verifyUser(t *testing.T) {
|
||||
name: "correct without password",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER IF NOT EXISTS \"zitadel-user\"", nil),
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER \"zitadel-user\"", nil),
|
||||
),
|
||||
username: "zitadel-user",
|
||||
password: "",
|
||||
@@ -50,7 +50,7 @@ func Test_verifyUser(t *testing.T) {
|
||||
name: "correct with password",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER IF NOT EXISTS \"zitadel-user\" WITH PASSWORD 'password'", nil),
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER \"zitadel-user\" WITH PASSWORD 'password'", nil),
|
||||
),
|
||||
username: "zitadel-user",
|
||||
password: "password",
|
||||
@@ -61,7 +61,7 @@ func Test_verifyUser(t *testing.T) {
|
||||
name: "already exists",
|
||||
args: args{
|
||||
db: prepareDB(t,
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER IF NOT EXISTS \"zitadel-user\" WITH PASSWORD 'password'", nil),
|
||||
expectExec("-- replace zitadel-user with the name of the user\nCREATE USER \"zitadel-user\" WITH PASSWORD 'password'", nil),
|
||||
),
|
||||
username: "zitadel-user",
|
||||
password: "",
|
||||
|
@@ -21,7 +21,7 @@ func newZitadel() *cobra.Command {
|
||||
Long: `initialize ZITADEL internals.
|
||||
|
||||
Prerequisites:
|
||||
- cockroachDB or postgreSQL with user and database
|
||||
- postgreSQL with user and database
|
||||
`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
config := MustNewConfig(viper.GetViper())
|
||||
@@ -32,7 +32,7 @@ Prerequisites:
|
||||
}
|
||||
|
||||
func VerifyZitadel(ctx context.Context, db *database.DB, config database.Config) error {
|
||||
err := ReadStmts(config.Type())
|
||||
err := ReadStmts()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -68,11 +68,6 @@ func VerifyZitadel(ctx context.Context, db *database.DB, config database.Config)
|
||||
return err
|
||||
}
|
||||
|
||||
logging.WithFields().Info("verify system sequence")
|
||||
if err := exec(ctx, conn, createSystemSequenceStmt, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logging.WithFields().Info("verify unique constraints")
|
||||
if err := exec(ctx, conn, createUniqueConstraints, nil); err != nil {
|
||||
return err
|
||||
|
@@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func Test_verifyEvents(t *testing.T) {
|
||||
err := ReadStmts("cockroach") //TODO: check all dialects
|
||||
err := ReadStmts()
|
||||
if err != nil {
|
||||
t.Errorf("unable to read stmts: %v", err)
|
||||
t.FailNow()
|
||||
|
@@ -40,7 +40,7 @@ func newKey() *cobra.Command {
|
||||
Long: `create new encryption key(s) (encrypted by the provided master key)
|
||||
provide key(s) by YAML file and/or by argument
|
||||
Requirements:
|
||||
- cockroachdb`,
|
||||
- postgreSQL`,
|
||||
Example: `new -f keys.yaml
|
||||
new key1=somekey key2=anotherkey
|
||||
new -f keys.yaml key2=anotherkey`,
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/stdlib"
|
||||
@@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) {
|
||||
logging.OnError(err).Fatal("unable to connect to destination database")
|
||||
defer destClient.Close()
|
||||
|
||||
copyAuthRequests(ctx, sourceClient, destClient)
|
||||
copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
|
||||
}
|
||||
|
||||
func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
||||
func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
|
||||
start := time.Now()
|
||||
|
||||
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
|
||||
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
|
||||
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
|
||||
|
||||
sourceConn, err := source.Conn(ctx)
|
||||
logging.OnError(err).Fatal("unable to acquire connection")
|
||||
defer sourceConn.Close()
|
||||
@@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
||||
errs := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
err = sourceConn.Raw(func(driverConn interface{}) error {
|
||||
err = sourceConn.Raw(func(driverConn any) error {
|
||||
conn := driverConn.(*stdlib.Conn).Conn()
|
||||
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT")
|
||||
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
|
||||
w.Close()
|
||||
return err
|
||||
})
|
||||
@@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
||||
defer destConn.Close()
|
||||
|
||||
var affected int64
|
||||
err = destConn.Raw(func(driverConn interface{}) error {
|
||||
err = destConn.Raw(func(driverConn any) error {
|
||||
conn := driverConn.(*stdlib.Conn).Conn()
|
||||
|
||||
if shouldReplace {
|
||||
|
@@ -23,7 +23,8 @@ type Migration struct {
|
||||
Source database.Config
|
||||
Destination database.Config
|
||||
|
||||
EventBulkSize uint32
|
||||
EventBulkSize uint32
|
||||
MaxAuthRequestAge time.Duration
|
||||
|
||||
Log *logging.Config
|
||||
Machine *id.Config
|
||||
@@ -76,7 +77,7 @@ func mustNewConfig(v *viper.Viper, config any) {
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
mapstructure.StringToTimeHookFunc(time.RFC3339),
|
||||
mapstructure.StringToSliceHookFunc(","),
|
||||
database.DecodeHook,
|
||||
database.DecodeHook(true),
|
||||
actions.HTTPConfigDecodeHook,
|
||||
hook.EnumHookFunc(internal_authz.MemberTypeString),
|
||||
mapstructure.TextUnmarshallerHookFunc(),
|
||||
|
@@ -1,84 +1,64 @@
|
||||
Source:
|
||||
cockroach:
|
||||
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
|
||||
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
|
||||
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
|
||||
MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
|
||||
MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
|
||||
EventPushConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
|
||||
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
||||
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
|
||||
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
|
||||
Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST
|
||||
Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT
|
||||
Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE
|
||||
MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS
|
||||
MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS
|
||||
MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME
|
||||
Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS
|
||||
User:
|
||||
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
|
||||
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
|
||||
Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME
|
||||
Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD
|
||||
SSL:
|
||||
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
|
||||
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
|
||||
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
|
||||
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
|
||||
Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE
|
||||
RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT
|
||||
Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT
|
||||
Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY
|
||||
# Postgres is used as soon as a value is set
|
||||
# The values describe the possible fields to set values
|
||||
postgres:
|
||||
Host: # ZITADEL_DATABASE_POSTGRES_HOST
|
||||
Port: # ZITADEL_DATABASE_POSTGRES_PORT
|
||||
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
|
||||
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
||||
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
||||
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
||||
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
||||
Host: # ZITADEL_SOURCE_POSTGRES_HOST
|
||||
Port: # ZITADEL_SOURCE_POSTGRES_PORT
|
||||
Database: # ZITADEL_SOURCE_POSTGRES_DATABASE
|
||||
MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS
|
||||
MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS
|
||||
MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME
|
||||
Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS
|
||||
User:
|
||||
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
||||
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
||||
Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME
|
||||
Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD
|
||||
SSL:
|
||||
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
||||
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
||||
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
||||
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
||||
Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE
|
||||
RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT
|
||||
Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT
|
||||
Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY
|
||||
|
||||
Destination:
|
||||
cockroach:
|
||||
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
|
||||
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
|
||||
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
|
||||
MaxOpenConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
|
||||
MaxIdleConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
|
||||
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
|
||||
EventPushConnRatio: 0.01 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
|
||||
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
||||
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
|
||||
User:
|
||||
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
|
||||
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
|
||||
SSL:
|
||||
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
|
||||
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
|
||||
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
|
||||
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
|
||||
# Postgres is used as soon as a value is set
|
||||
# The values describe the possible fields to set values
|
||||
postgres:
|
||||
Host: # ZITADEL_DATABASE_POSTGRES_HOST
|
||||
Port: # ZITADEL_DATABASE_POSTGRES_PORT
|
||||
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
|
||||
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
||||
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
||||
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
||||
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
||||
Host: localhost # ZITADEL_DESTINATION_POSTGRES_HOST
|
||||
Port: 5432 # ZITADEL_DESTINATION_POSTGRES_PORT
|
||||
Database: zitadel # ZITADEL_DESTINATION_POSTGRES_DATABASE
|
||||
MaxOpenConns: 5 # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS
|
||||
MaxIdleConns: 2 # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS
|
||||
MaxConnLifetime: 30m # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME
|
||||
MaxConnIdleTime: 5m # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME
|
||||
Options: "" # ZITADEL_DESTINATION_POSTGRES_OPTIONS
|
||||
User:
|
||||
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
||||
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
||||
Username: zitadel # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME
|
||||
Password: "" # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD
|
||||
SSL:
|
||||
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
||||
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
||||
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
||||
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
||||
Mode: disable # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE
|
||||
RootCert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT
|
||||
Cert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT
|
||||
Key: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY
|
||||
|
||||
EventBulkSize: 10000
|
||||
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
||||
# The maximum duration an auth request was last updated before it gets ignored.
|
||||
# Default is 30 days
|
||||
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
|
||||
|
||||
Projections:
|
||||
# The maximum duration a transaction remains open
|
||||
@@ -87,14 +67,14 @@ Projections:
|
||||
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
|
||||
# turn off scheduler during operation
|
||||
RequeueEvery: 0s
|
||||
ConcurrentInstances: 7
|
||||
EventBulkLimit: 1000
|
||||
Customizations:
|
||||
ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES
|
||||
EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT
|
||||
Customizations:
|
||||
notifications:
|
||||
MaxFailureCount: 1
|
||||
|
||||
Eventstore:
|
||||
MaxRetries: 3
|
||||
MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES
|
||||
|
||||
Auth:
|
||||
Spooler:
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/v2/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/v2/projection"
|
||||
"github.com/zitadel/zitadel/internal/v2/readmodel"
|
||||
"github.com/zitadel/zitadel/internal/v2/system"
|
||||
mirror_event "github.com/zitadel/zitadel/internal/v2/system/mirror"
|
||||
@@ -30,39 +29,6 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
|
||||
return lastSuccess, nil
|
||||
}
|
||||
|
||||
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) {
|
||||
var cmd *eventstore.Command
|
||||
if len(instanceIDs) > 0 {
|
||||
cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
} else {
|
||||
cmd = mirror_event.NewStartedSystemCommand(destination)
|
||||
}
|
||||
|
||||
var position projection.HighestPosition
|
||||
|
||||
err = sourceES.Push(
|
||||
ctx,
|
||||
eventstore.NewPushIntent(
|
||||
system.AggregateInstance,
|
||||
eventstore.AppendAggregate(
|
||||
system.AggregateOwner,
|
||||
system.AggregateType,
|
||||
id,
|
||||
eventstore.CurrentSequenceMatches(0),
|
||||
eventstore.AppendCommands(cmd),
|
||||
),
|
||||
eventstore.PushReducer(&position),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return position.Position, nil
|
||||
}
|
||||
|
||||
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error {
|
||||
return destinationES.Push(
|
||||
ctx,
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
db "github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/database/dialect"
|
||||
"github.com/zitadel/zitadel/internal/id"
|
||||
"github.com/zitadel/zitadel/internal/v2/database"
|
||||
"github.com/zitadel/zitadel/internal/v2/eventstore"
|
||||
@@ -57,9 +58,9 @@ func copyEventstore(ctx context.Context, config *Migration) {
|
||||
|
||||
func positionQuery(db *db.DB) string {
|
||||
switch db.Type() {
|
||||
case "postgres":
|
||||
case dialect.DatabaseTypePostgres:
|
||||
return "SELECT EXTRACT(EPOCH FROM clock_timestamp())"
|
||||
case "cockroach":
|
||||
case dialect.DatabaseTypeCockroach:
|
||||
return "SELECT cluster_logical_timestamp()"
|
||||
default:
|
||||
logging.WithFields("db_type", db.Type()).Fatal("database type not recognized")
|
||||
@@ -68,6 +69,7 @@ func positionQuery(db *db.DB) string {
|
||||
}
|
||||
|
||||
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||
logging.Info("starting to copy events")
|
||||
start := time.Now()
|
||||
reader, writer := io.Pipe()
|
||||
|
||||
@@ -80,9 +82,6 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||
destConn, err := dest.Conn(ctx)
|
||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||
|
||||
sourceES := eventstore.NewEventstoreFromOne(postgres.New(source, &postgres.Config{
|
||||
MaxRetries: 3,
|
||||
}))
|
||||
destinationES := eventstore.NewEventstoreFromOne(postgres.New(dest, &postgres.Config{
|
||||
MaxRetries: 3,
|
||||
}))
|
||||
@@ -90,8 +89,14 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||
previousMigration, err := queryLastSuccessfulMigration(ctx, destinationES, source.DatabaseName())
|
||||
logging.OnError(err).Fatal("unable to query latest successful migration")
|
||||
|
||||
maxPosition, err := writeMigrationStart(ctx, sourceES, migrationID, dest.DatabaseName())
|
||||
logging.OnError(err).Fatal("unable to write migration started event")
|
||||
var maxPosition float64
|
||||
err = source.QueryRowContext(ctx,
|
||||
func(row *sql.Row) error {
|
||||
return row.Scan(&maxPosition)
|
||||
},
|
||||
"SELECT MAX(position) FROM eventstore.events2 "+instanceClause(),
|
||||
)
|
||||
logging.OnError(err).Fatal("unable to query max position from source")
|
||||
|
||||
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
|
||||
|
||||
@@ -126,7 +131,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||
if err != nil {
|
||||
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
||||
}
|
||||
logging.WithFields("batch_count", i).Info("batch of events copied")
|
||||
|
||||
if tag.RowsAffected() < int64(bulkSize) {
|
||||
logging.WithFields("batch_count", i).Info("last batch of events copied")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -198,6 +206,7 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou
|
||||
}
|
||||
|
||||
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
||||
logging.Info("starting to copy unique constraints")
|
||||
start := time.Now()
|
||||
reader, writer := io.Pipe()
|
||||
errs := make(chan error, 1)
|
||||
|
@@ -56,7 +56,6 @@ Order of execution:
|
||||
copyEventstore(cmd.Context(), config)
|
||||
|
||||
projections(cmd.Context(), projectionConfig, masterKey)
|
||||
verifyMigration(cmd.Context(), config)
|
||||
},
|
||||
}
|
||||
|
||||
|
@@ -3,6 +3,7 @@ package mirror
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -84,6 +85,7 @@ type ProjectionsConfig struct {
|
||||
ExternalDomain string
|
||||
ExternalSecure bool
|
||||
InternalAuthZ internal_authz.Config
|
||||
SystemAuthZ internal_authz.Config
|
||||
SystemDefaults systemdefaults.SystemDefaults
|
||||
Telemetry *handlers.TelemetryPusherConfig
|
||||
Login login.Config
|
||||
@@ -103,6 +105,7 @@ func projections(
|
||||
config *ProjectionsConfig,
|
||||
masterKey string,
|
||||
) {
|
||||
logging.Info("starting to fill projections")
|
||||
start := time.Now()
|
||||
|
||||
client, err := database.Connect(config.Destination, false)
|
||||
@@ -117,8 +120,11 @@ func projections(
|
||||
staticStorage, err := config.AssetStorage.NewStorage(client.DB)
|
||||
logging.OnError(err).Fatal("unable create static storage")
|
||||
|
||||
config.Eventstore.Querier = old_es.NewCRDB(client)
|
||||
config.Eventstore.Pusher = new_es.NewEventstore(client)
|
||||
newEventstore := new_es.NewEventstore(client)
|
||||
config.Eventstore.Querier = old_es.NewPostgres(client)
|
||||
config.Eventstore.Pusher = newEventstore
|
||||
config.Eventstore.Searcher = newEventstore
|
||||
|
||||
es := eventstore.NewEventstore(config.Eventstore)
|
||||
esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{
|
||||
MaxRetries: config.Eventstore.MaxRetries,
|
||||
@@ -147,7 +153,7 @@ func projections(
|
||||
sessionTokenVerifier,
|
||||
func(q *query.Queries) domain.PermissionCheck {
|
||||
return func(ctx context.Context, permission, orgID, resourceID string) (err error) {
|
||||
return internal_authz.CheckPermission(ctx, &authz_es.UserMembershipRepo{Queries: q}, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID)
|
||||
return internal_authz.CheckPermission(ctx, &authz_es.UserMembershipRepo{Queries: q}, config.SystemAuthZ.RolePermissionMappings, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID)
|
||||
}
|
||||
},
|
||||
0,
|
||||
@@ -184,7 +190,7 @@ func projections(
|
||||
keys.Target,
|
||||
&http.Client{},
|
||||
func(ctx context.Context, permission, orgID, resourceID string) (err error) {
|
||||
return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID)
|
||||
return internal_authz.CheckPermission(ctx, authZRepo, config.SystemAuthZ.RolePermissionMappings, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID)
|
||||
},
|
||||
sessionTokenVerifier,
|
||||
config.OIDC.DefaultAccessTokenLifetime,
|
||||
@@ -220,7 +226,6 @@ func projections(
|
||||
keys.SMS,
|
||||
keys.OIDC,
|
||||
config.OIDC.DefaultBackChannelLogoutLifetime,
|
||||
client,
|
||||
nil,
|
||||
)
|
||||
|
||||
@@ -248,12 +253,14 @@ func projections(
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < int(config.Projections.ConcurrentInstances); i++ {
|
||||
for range int(config.Projections.ConcurrentInstances) {
|
||||
go execProjections(ctx, instances, failedInstances, &wg)
|
||||
}
|
||||
|
||||
for _, instance := range queryInstanceIDs(ctx, client) {
|
||||
existingInstances := queryInstanceIDs(ctx, client)
|
||||
for i, instance := range existingInstances {
|
||||
instances <- instance
|
||||
logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection")
|
||||
}
|
||||
close(instances)
|
||||
wg.Wait()
|
||||
@@ -265,42 +272,50 @@ func projections(
|
||||
|
||||
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
|
||||
for instance := range instances {
|
||||
logging.WithFields("instance", instance).Info("start projections")
|
||||
logging.WithFields("instance", instance).Info("starting projections")
|
||||
ctx = internal_authz.WithInstanceID(ctx, instance)
|
||||
|
||||
err := projection.ProjectInstance(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("instance", instance).OnError(err).Info("trigger failed")
|
||||
logging.WithFields("instance", instance).WithError(err).Info("trigger failed")
|
||||
failedInstances <- instance
|
||||
continue
|
||||
}
|
||||
|
||||
err = projection.ProjectInstanceFields(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
|
||||
failedInstances <- instance
|
||||
continue
|
||||
}
|
||||
|
||||
err = admin_handler.ProjectInstance(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("instance", instance).OnError(err).Info("trigger admin handler failed")
|
||||
logging.WithFields("instance", instance).WithError(err).Info("trigger admin handler failed")
|
||||
failedInstances <- instance
|
||||
continue
|
||||
}
|
||||
|
||||
err = auth_handler.ProjectInstance(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("instance", instance).OnError(err).Info("trigger auth handler failed")
|
||||
logging.WithFields("instance", instance).WithError(err).Info("trigger auth handler failed")
|
||||
failedInstances <- instance
|
||||
continue
|
||||
}
|
||||
|
||||
err = notification.ProjectInstance(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed")
|
||||
logging.WithFields("instance", instance).WithError(err).Info("trigger notification failed")
|
||||
failedInstances <- instance
|
||||
continue
|
||||
}
|
||||
|
||||
logging.WithFields("instance", instance).Info("projections done")
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// returns the instance configured by flag
|
||||
// queryInstanceIDs returns the instance configured by flag
|
||||
// or all instances which are not removed
|
||||
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
|
||||
if len(instanceIDs) > 0 {
|
||||
|
@@ -46,6 +46,7 @@ func copySystem(ctx context.Context, config *Migration) {
|
||||
}
|
||||
|
||||
func copyAssets(ctx context.Context, source, dest *database.DB) {
|
||||
logging.Info("starting to copy assets")
|
||||
start := time.Now()
|
||||
|
||||
sourceConn, err := source.Conn(ctx)
|
||||
@@ -70,7 +71,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||
defer destConn.Close()
|
||||
|
||||
var eventCount int64
|
||||
var assetCount int64
|
||||
err = destConn.Raw(func(driverConn interface{}) error {
|
||||
conn := driverConn.(*stdlib.Conn).Conn()
|
||||
|
||||
@@ -82,16 +83,17 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
||||
}
|
||||
|
||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
|
||||
eventCount = tag.RowsAffected()
|
||||
assetCount = tag.RowsAffected()
|
||||
|
||||
return err
|
||||
})
|
||||
logging.OnError(err).Fatal("unable to copy assets to destination")
|
||||
logging.OnError(<-errs).Fatal("unable to copy assets from source")
|
||||
logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated")
|
||||
logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated")
|
||||
}
|
||||
|
||||
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
||||
logging.Info("starting to copy encryption keys")
|
||||
start := time.Now()
|
||||
|
||||
sourceConn, err := source.Conn(ctx)
|
||||
@@ -116,7 +118,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||
defer destConn.Close()
|
||||
|
||||
var eventCount int64
|
||||
var keyCount int64
|
||||
err = destConn.Raw(func(driverConn interface{}) error {
|
||||
conn := driverConn.(*stdlib.Conn).Conn()
|
||||
|
||||
@@ -128,11 +130,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
||||
}
|
||||
|
||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
|
||||
eventCount = tag.RowsAffected()
|
||||
keyCount = tag.RowsAffected()
|
||||
|
||||
return err
|
||||
})
|
||||
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
|
||||
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
|
||||
logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated")
|
||||
logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated")
|
||||
}
|
||||
|
@@ -3,7 +3,7 @@ package setup
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"embed"
|
||||
_ "embed"
|
||||
"strings"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
@@ -12,31 +12,20 @@ import (
|
||||
var (
|
||||
//go:embed 07/logstore.sql
|
||||
createLogstoreSchema07 string
|
||||
//go:embed 07/cockroach/access.sql
|
||||
//go:embed 07/postgres/access.sql
|
||||
createAccessLogsTable07 embed.FS
|
||||
//go:embed 07/cockroach/execution.sql
|
||||
//go:embed 07/postgres/execution.sql
|
||||
createExecutionLogsTable07 embed.FS
|
||||
//go:embed 07/access.sql
|
||||
createAccessLogsTable07 string
|
||||
//go:embed 07/execution.sql
|
||||
createExecutionLogsTable07 string
|
||||
)
|
||||
|
||||
type LogstoreTables struct {
|
||||
dbClient *sql.DB
|
||||
username string
|
||||
dbType string
|
||||
}
|
||||
|
||||
func (mig *LogstoreTables) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
accessStmt, err := readStmt(createAccessLogsTable07, "07", mig.dbType, "access.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
executionStmt, err := readStmt(createExecutionLogsTable07, "07", mig.dbType, "execution.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stmt := strings.ReplaceAll(createLogstoreSchema07, "%[1]s", mig.username) + accessStmt + executionStmt
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
stmt := strings.ReplaceAll(createLogstoreSchema07, "%[1]s", mig.username) + createAccessLogsTable07 + createExecutionLogsTable07
|
||||
_, err := mig.dbClient.ExecContext(ctx, stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -1,14 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS logstore.access (
|
||||
log_date TIMESTAMPTZ NOT NULL
|
||||
, protocol INT NOT NULL
|
||||
, request_url TEXT NOT NULL
|
||||
, response_status INT NOT NULL
|
||||
, request_headers JSONB
|
||||
, response_headers JSONB
|
||||
, instance_id TEXT NOT NULL
|
||||
, project_id TEXT NOT NULL
|
||||
, requested_domain TEXT
|
||||
, requested_host TEXT
|
||||
|
||||
, INDEX protocol_date_desc (instance_id, protocol, log_date DESC) STORING (request_url, response_status, request_headers)
|
||||
);
|
@@ -1,11 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS logstore.execution (
|
||||
log_date TIMESTAMPTZ NOT NULL
|
||||
, took INTERVAL
|
||||
, message TEXT NOT NULL
|
||||
, loglevel INT NOT NULL
|
||||
, instance_id TEXT NOT NULL
|
||||
, action_id TEXT NOT NULL
|
||||
, metadata JSONB
|
||||
|
||||
, INDEX log_date_desc (instance_id, log_date DESC) STORING (took)
|
||||
);
|
@@ -2,16 +2,15 @@ package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
_ "embed"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 08/cockroach/08.sql
|
||||
//go:embed 08/postgres/08.sql
|
||||
tokenIndexes08 embed.FS
|
||||
//go:embed 08/08.sql
|
||||
tokenIndexes08 string
|
||||
)
|
||||
|
||||
type AuthTokenIndexes struct {
|
||||
@@ -19,11 +18,7 @@ type AuthTokenIndexes struct {
|
||||
}
|
||||
|
||||
func (mig *AuthTokenIndexes) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
stmt, err := readStmt(tokenIndexes08, "08", mig.dbClient.Type(), "08.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
_, err := mig.dbClient.ExecContext(ctx, tokenIndexes08)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -1,5 +0,0 @@
|
||||
CREATE INDEX IF NOT EXISTS inst_refresh_tkn_idx ON auth.tokens(instance_id, refresh_token_id);
|
||||
CREATE INDEX IF NOT EXISTS inst_app_tkn_idx ON auth.tokens(instance_id, application_id);
|
||||
CREATE INDEX IF NOT EXISTS inst_ro_tkn_idx ON auth.tokens(instance_id, resource_owner);
|
||||
DROP INDEX IF EXISTS auth.tokens@user_user_agent_idx;
|
||||
CREATE INDEX IF NOT EXISTS inst_usr_agnt_tkn_idx ON auth.tokens(instance_id, user_id, user_agent_id);
|
@@ -3,7 +3,7 @@ package setup
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"embed"
|
||||
_ "embed"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/v2/crdb"
|
||||
@@ -18,9 +18,8 @@ var (
|
||||
correctCreationDate10CreateTable string
|
||||
//go:embed 10/10_fill_table.sql
|
||||
correctCreationDate10FillTable string
|
||||
//go:embed 10/cockroach/10_update.sql
|
||||
//go:embed 10/postgres/10_update.sql
|
||||
correctCreationDate10Update embed.FS
|
||||
//go:embed 10/10_update.sql
|
||||
correctCreationDate10Update string
|
||||
//go:embed 10/10_count_wrong_events.sql
|
||||
correctCreationDate10CountWrongEvents string
|
||||
//go:embed 10/10_empty_table.sql
|
||||
@@ -40,11 +39,6 @@ func (mig *CorrectCreationDate) Execute(ctx context.Context, _ eventstore.Event)
|
||||
logging.WithFields("mig", mig.String(), "iteration", i).Debug("start iteration")
|
||||
var affected int64
|
||||
err = crdb.ExecuteTx(ctx, mig.dbClient.DB, nil, func(tx *sql.Tx) error {
|
||||
if mig.dbClient.Type() == "cockroach" {
|
||||
if _, err := tx.Exec("SET experimental_enable_temp_tables=on"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err := tx.ExecContext(ctx, correctCreationDate10CreateTable)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -66,11 +60,7 @@ func (mig *CorrectCreationDate) Execute(ctx context.Context, _ eventstore.Event)
|
||||
return err
|
||||
}
|
||||
|
||||
updateStmt, err := readStmt(correctCreationDate10Update, "10", mig.dbClient.Type(), "10_update.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.ExecContext(ctx, updateStmt)
|
||||
_, err = tx.ExecContext(ctx, correctCreationDate10Update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -1 +0,0 @@
|
||||
UPDATE eventstore.events e SET (creation_date, "position") = (we.next_cd, we.next_cd::DECIMAL) FROM wrong_events we WHERE e.event_sequence = we.event_sequence AND e.instance_id = we.instance_id;
|
@@ -15,8 +15,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 14/cockroach/*.sql
|
||||
//go:embed 14/postgres/*.sql
|
||||
//go:embed 14/*.sql
|
||||
newEventsTable embed.FS
|
||||
)
|
||||
|
||||
@@ -40,7 +39,7 @@ func (mig *NewEventsTable) Execute(ctx context.Context, _ eventstore.Event) erro
|
||||
return err
|
||||
}
|
||||
|
||||
statements, err := readStatements(newEventsTable, "14", mig.dbClient.Type())
|
||||
statements, err := readStatements(newEventsTable, "14")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -1,33 +0,0 @@
|
||||
CREATE TABLE eventstore.events2 (
|
||||
instance_id,
|
||||
aggregate_type,
|
||||
aggregate_id,
|
||||
|
||||
event_type,
|
||||
"sequence",
|
||||
revision,
|
||||
created_at,
|
||||
payload,
|
||||
creator,
|
||||
"owner",
|
||||
|
||||
"position",
|
||||
in_tx_order,
|
||||
|
||||
PRIMARY KEY (instance_id, aggregate_type, aggregate_id, "sequence")
|
||||
) AS SELECT
|
||||
instance_id,
|
||||
aggregate_type,
|
||||
aggregate_id,
|
||||
|
||||
event_type,
|
||||
event_sequence,
|
||||
substr(aggregate_version, 2)::SMALLINT,
|
||||
creation_date,
|
||||
event_data,
|
||||
editor_user,
|
||||
resource_owner,
|
||||
|
||||
creation_date::DECIMAL,
|
||||
event_sequence
|
||||
FROM eventstore.events_old;
|
@@ -1,7 +0,0 @@
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN event_type SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN revision SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN created_at SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN creator SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN "owner" SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN "position" SET NOT NULL;
|
||||
ALTER TABLE eventstore.events2 ALTER COLUMN in_tx_order SET NOT NULL;
|
@@ -1,3 +0,0 @@
|
||||
CREATE INDEX IF NOT EXISTS es_active_instances ON eventstore.events2 (created_at DESC) STORING ("position");
|
||||
CREATE INDEX IF NOT EXISTS es_wm ON eventstore.events2 (aggregate_id, instance_id, aggregate_type, event_type);
|
||||
CREATE INDEX IF NOT EXISTS es_projection ON eventstore.events2 (instance_id, aggregate_type, event_type, "position");
|
@@ -1 +0,0 @@
|
||||
ALTER TABLE eventstore.events RENAME TO events_old;
|
@@ -11,8 +11,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 15/cockroach/*.sql
|
||||
//go:embed 15/postgres/*.sql
|
||||
//go:embed 15/*.sql
|
||||
currentProjectionState embed.FS
|
||||
)
|
||||
|
||||
@@ -21,7 +20,7 @@ type CurrentProjectionState struct {
|
||||
}
|
||||
|
||||
func (mig *CurrentProjectionState) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(currentProjectionState, "15", mig.dbClient.Type())
|
||||
statements, err := readStatements(currentProjectionState, "15")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -1,26 +0,0 @@
|
||||
INSERT INTO projections.failed_events2 (
|
||||
projection_name
|
||||
, instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
, event_creation_date
|
||||
, failed_sequence
|
||||
, failure_count
|
||||
, error
|
||||
, last_failed
|
||||
) SELECT
|
||||
fe.projection_name
|
||||
, fe.instance_id
|
||||
, e.aggregate_type
|
||||
, e.aggregate_id
|
||||
, e.created_at
|
||||
, e.sequence
|
||||
, fe.failure_count
|
||||
, fe.error
|
||||
, fe.last_failed
|
||||
FROM
|
||||
projections.failed_events fe
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = fe.instance_id
|
||||
AND e.sequence = fe.failed_sequence
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,29 +0,0 @@
|
||||
INSERT INTO projections.current_states (
|
||||
projection_name
|
||||
, instance_id
|
||||
, event_date
|
||||
, "position"
|
||||
, last_updated
|
||||
) (SELECT
|
||||
cs.projection_name
|
||||
, cs.instance_id
|
||||
, e.created_at
|
||||
, e.position
|
||||
, cs.timestamp
|
||||
FROM
|
||||
projections.current_sequences cs
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = cs.instance_id
|
||||
AND e.aggregate_type = cs.aggregate_type
|
||||
AND e.sequence = cs.current_sequence
|
||||
AND cs.current_sequence = (
|
||||
SELECT
|
||||
MAX(cs2.current_sequence)
|
||||
FROM
|
||||
projections.current_sequences cs2
|
||||
WHERE
|
||||
cs.projection_name = cs2.projection_name
|
||||
AND cs.instance_id = cs2.instance_id
|
||||
)
|
||||
)
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,28 +0,0 @@
|
||||
INSERT INTO projections.current_states (
|
||||
projection_name
|
||||
, instance_id
|
||||
, event_date
|
||||
, "position"
|
||||
, last_updated
|
||||
) (SELECT
|
||||
cs.view_name
|
||||
, cs.instance_id
|
||||
, e.created_at
|
||||
, e.position
|
||||
, cs.last_successful_spooler_run
|
||||
FROM
|
||||
adminapi.current_sequences cs
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = cs.instance_id
|
||||
AND e.sequence = cs.current_sequence
|
||||
AND cs.current_sequence = (
|
||||
SELECT
|
||||
MAX(cs2.current_sequence)
|
||||
FROM
|
||||
adminapi.current_sequences cs2
|
||||
WHERE
|
||||
cs.view_name = cs2.view_name
|
||||
AND cs.instance_id = cs2.instance_id
|
||||
)
|
||||
)
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,28 +0,0 @@
|
||||
INSERT INTO projections.current_states (
|
||||
projection_name
|
||||
, instance_id
|
||||
, event_date
|
||||
, "position"
|
||||
, last_updated
|
||||
) (SELECT
|
||||
cs.view_name
|
||||
, cs.instance_id
|
||||
, e.created_at
|
||||
, e.position
|
||||
, cs.last_successful_spooler_run
|
||||
FROM
|
||||
auth.current_sequences cs
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = cs.instance_id
|
||||
AND e.sequence = cs.current_sequence
|
||||
AND cs.current_sequence = (
|
||||
SELECT
|
||||
MAX(cs2.current_sequence)
|
||||
FROM
|
||||
auth.current_sequences cs2
|
||||
WHERE
|
||||
cs.view_name = cs2.view_name
|
||||
AND cs.instance_id = cs2.instance_id
|
||||
)
|
||||
)
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,16 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS projections.failed_events2 (
|
||||
projection_name TEXT NOT NULL
|
||||
, instance_id TEXT NOT NULL
|
||||
|
||||
, aggregate_type TEXT NOT NULL
|
||||
, aggregate_id TEXT NOT NULL
|
||||
, event_creation_date TIMESTAMPTZ NOT NULL
|
||||
, failed_sequence INT8 NOT NULL
|
||||
|
||||
, failure_count INT2 NULL DEFAULT 0
|
||||
, error TEXT
|
||||
, last_failed TIMESTAMPTZ
|
||||
|
||||
, PRIMARY KEY (projection_name, instance_id, aggregate_type, aggregate_id, failed_sequence)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS fe2_instance_id_idx on projections.failed_events2 (instance_id);
|
@@ -1,26 +0,0 @@
|
||||
INSERT INTO projections.failed_events2 (
|
||||
projection_name
|
||||
, instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
, event_creation_date
|
||||
, failed_sequence
|
||||
, failure_count
|
||||
, error
|
||||
, last_failed
|
||||
) SELECT
|
||||
fe.view_name
|
||||
, fe.instance_id
|
||||
, e.aggregate_type
|
||||
, e.aggregate_id
|
||||
, e.created_at
|
||||
, e.sequence
|
||||
, fe.failure_count
|
||||
, fe.err_msg
|
||||
, fe.last_failed
|
||||
FROM
|
||||
adminapi.failed_events fe
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = fe.instance_id
|
||||
AND e.sequence = fe.failed_sequence
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,26 +0,0 @@
|
||||
INSERT INTO projections.failed_events2 (
|
||||
projection_name
|
||||
, instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
, event_creation_date
|
||||
, failed_sequence
|
||||
, failure_count
|
||||
, error
|
||||
, last_failed
|
||||
) SELECT
|
||||
fe.view_name
|
||||
, fe.instance_id
|
||||
, e.aggregate_type
|
||||
, e.aggregate_id
|
||||
, e.created_at
|
||||
, e.sequence
|
||||
, fe.failure_count
|
||||
, fe.err_msg
|
||||
, fe.last_failed
|
||||
FROM
|
||||
auth.failed_events fe
|
||||
JOIN eventstore.events2 e ON
|
||||
e.instance_id = fe.instance_id
|
||||
AND e.sequence = fe.failed_sequence
|
||||
ON CONFLICT DO NOTHING;
|
@@ -1,15 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS projections.current_states (
|
||||
projection_name TEXT NOT NULL
|
||||
, instance_id TEXT NOT NULL
|
||||
|
||||
, last_updated TIMESTAMPTZ
|
||||
|
||||
, aggregate_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, "sequence" INT8
|
||||
, event_date TIMESTAMPTZ
|
||||
, "position" DECIMAL
|
||||
|
||||
, PRIMARY KEY (projection_name, instance_id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS cs_instance_id_idx ON projections.current_states (instance_id);
|
@@ -3,17 +3,14 @@ package setup
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 34/cockroach/34_cache_schema.sql
|
||||
addCacheSchemaCockroach string
|
||||
//go:embed 34/postgres/34_cache_schema.sql
|
||||
addCacheSchemaPostgres string
|
||||
//go:embed 34/34_cache_schema.sql
|
||||
addCacheSchema string
|
||||
)
|
||||
|
||||
type AddCacheSchema struct {
|
||||
@@ -21,14 +18,7 @@ type AddCacheSchema struct {
|
||||
}
|
||||
|
||||
func (mig *AddCacheSchema) Execute(ctx context.Context, _ eventstore.Event) (err error) {
|
||||
switch mig.dbClient.Type() {
|
||||
case "cockroach":
|
||||
_, err = mig.dbClient.ExecContext(ctx, addCacheSchemaCockroach)
|
||||
case "postgres":
|
||||
_, err = mig.dbClient.ExecContext(ctx, addCacheSchemaPostgres)
|
||||
default:
|
||||
err = fmt.Errorf("add cache schema: unsupported db type %q", mig.dbClient.Type())
|
||||
}
|
||||
_, err = mig.dbClient.ExecContext(ctx, addCacheSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -1,27 +0,0 @@
|
||||
create schema if not exists cache;
|
||||
|
||||
create table if not exists cache.objects (
|
||||
cache_name varchar not null,
|
||||
id uuid not null default gen_random_uuid(),
|
||||
created_at timestamptz not null default now(),
|
||||
last_used_at timestamptz not null default now(),
|
||||
payload jsonb not null,
|
||||
|
||||
primary key(cache_name, id)
|
||||
);
|
||||
|
||||
create table if not exists cache.string_keys(
|
||||
cache_name varchar not null check (cache_name <> ''),
|
||||
index_id integer not null check (index_id > 0),
|
||||
index_key varchar not null check (index_key <> ''),
|
||||
object_id uuid not null,
|
||||
|
||||
primary key (cache_name, index_id, index_key),
|
||||
constraint fk_object
|
||||
foreign key(cache_name, object_id)
|
||||
references cache.objects(cache_name, id)
|
||||
on delete cascade
|
||||
);
|
||||
|
||||
create index if not exists string_keys_object_id_idx
|
||||
on cache.string_keys (cache_name, object_id); -- for delete cascade
|
@@ -21,7 +21,7 @@ type AddPositionToIndexEsWm struct {
|
||||
}
|
||||
|
||||
func (mig *AddPositionToIndexEsWm) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(addPositionToEsWmIndex, "35", "")
|
||||
statements, err := readStatements(addPositionToEsWmIndex, "35")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -24,8 +24,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 40/cockroach/*.sql
|
||||
//go:embed 40/postgres/*.sql
|
||||
//go:embed 40/*.sql
|
||||
initPushFunc embed.FS
|
||||
)
|
||||
|
||||
@@ -112,5 +111,5 @@ func (mig *InitPushFunc) inTxOrderType(ctx context.Context) (typeName string, er
|
||||
}
|
||||
|
||||
func (mig *InitPushFunc) filePath(fileName string) string {
|
||||
return path.Join("40", mig.dbClient.Type(), fileName)
|
||||
return path.Join("40", fileName)
|
||||
}
|
||||
|
@@ -1,10 +0,0 @@
|
||||
CREATE TYPE IF NOT EXISTS eventstore.command AS (
|
||||
instance_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, aggregate_id TEXT
|
||||
, command_type TEXT
|
||||
, revision INT2
|
||||
, payload JSONB
|
||||
, creator TEXT
|
||||
, owner TEXT
|
||||
);
|
@@ -1,137 +0,0 @@
|
||||
CREATE OR REPLACE FUNCTION eventstore.latest_aggregate_state(
|
||||
instance_id TEXT
|
||||
, aggregate_type TEXT
|
||||
, aggregate_id TEXT
|
||||
|
||||
, sequence OUT BIGINT
|
||||
, owner OUT TEXT
|
||||
)
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
BEGIN
|
||||
SELECT
|
||||
COALESCE(e.sequence, 0) AS sequence
|
||||
, e.owner
|
||||
INTO
|
||||
sequence
|
||||
, owner
|
||||
FROM
|
||||
eventstore.events2 e
|
||||
WHERE
|
||||
e.instance_id = $1
|
||||
AND e.aggregate_type = $2
|
||||
AND e.aggregate_id = $3
|
||||
ORDER BY
|
||||
e.sequence DESC
|
||||
LIMIT 1;
|
||||
|
||||
RETURN;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.commands_to_events2(commands eventstore.command[])
|
||||
RETURNS eventstore.events2[]
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
DECLARE
|
||||
current_sequence BIGINT;
|
||||
current_owner TEXT;
|
||||
|
||||
instance_id TEXT;
|
||||
aggregate_type TEXT;
|
||||
aggregate_id TEXT;
|
||||
|
||||
_events eventstore.events2[];
|
||||
|
||||
_aggregates CURSOR FOR
|
||||
select
|
||||
DISTINCT ("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
FROM
|
||||
UNNEST(commands) AS c;
|
||||
BEGIN
|
||||
OPEN _aggregates;
|
||||
LOOP
|
||||
FETCH NEXT IN _aggregates INTO instance_id, aggregate_type, aggregate_id;
|
||||
-- crdb does not support EXIT WHEN NOT FOUND
|
||||
EXIT WHEN instance_id IS NULL;
|
||||
|
||||
SELECT
|
||||
*
|
||||
INTO
|
||||
current_sequence
|
||||
, current_owner
|
||||
FROM eventstore.latest_aggregate_state(
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
);
|
||||
|
||||
-- RETURN QUERY is not supported by crdb: https://github.com/cockroachdb/cockroach/issues/105240
|
||||
SELECT
|
||||
ARRAY_CAT(_events, ARRAY_AGG(e))
|
||||
INTO
|
||||
_events
|
||||
FROM (
|
||||
SELECT
|
||||
("c").instance_id
|
||||
, ("c").aggregate_type
|
||||
, ("c").aggregate_id
|
||||
, ("c").command_type -- AS event_type
|
||||
, COALESCE(current_sequence, 0) + ROW_NUMBER() OVER () -- AS sequence
|
||||
, ("c").revision
|
||||
, NOW() -- AS created_at
|
||||
, ("c").payload
|
||||
, ("c").creator
|
||||
, COALESCE(current_owner, ("c").owner) -- AS owner
|
||||
, cluster_logical_timestamp() -- AS position
|
||||
, ordinality::{{ .InTxOrderType }} -- AS in_tx_order
|
||||
FROM
|
||||
UNNEST(commands) WITH ORDINALITY AS c
|
||||
WHERE
|
||||
("c").instance_id = instance_id
|
||||
AND ("c").aggregate_type = aggregate_type
|
||||
AND ("c").aggregate_id = aggregate_id
|
||||
) AS e;
|
||||
END LOOP;
|
||||
CLOSE _aggregates;
|
||||
RETURN _events;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.push(commands eventstore.command[]) RETURNS SETOF eventstore.events2 AS $$
|
||||
INSERT INTO eventstore.events2
|
||||
SELECT
|
||||
("e").instance_id
|
||||
, ("e").aggregate_type
|
||||
, ("e").aggregate_id
|
||||
, ("e").event_type
|
||||
, ("e").sequence
|
||||
, ("e").revision
|
||||
, ("e").created_at
|
||||
, ("e").payload
|
||||
, ("e").creator
|
||||
, ("e").owner
|
||||
, ("e")."position"
|
||||
, ("e").in_tx_order
|
||||
FROM
|
||||
UNNEST(eventstore.commands_to_events2(commands)) e
|
||||
ORDER BY
|
||||
in_tx_order
|
||||
RETURNING *
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
/*
|
||||
select (c).* from UNNEST(eventstore.commands_to_events2(
|
||||
ARRAY[
|
||||
ROW('', 'system', 'SYSTEM', 'ct1', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct2', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
, ROW('289525561255060732', 'org', '289575074711790844', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('289525561255060732', 'user', '289575075164906748', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('289525561255060732', 'oidc_session', 'V2_289575178579535100', 'ct3', 1, '{"key": "value"}', 'c1', '289575074711790844')
|
||||
, ROW('', 'system', 'SYSTEM', 'ct3', 1, '{"key": "value"}', 'c1', 'SYSTEM')
|
||||
]::eventstore.command[]
|
||||
) )c;
|
||||
*/
|
||||
|
@@ -1,5 +0,0 @@
|
||||
SELECT data_type
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'eventstore'
|
||||
AND table_name = 'events2'
|
||||
AND column_name = 'in_tx_order';
|
@@ -12,8 +12,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 43/cockroach/*.sql
|
||||
//go:embed 43/postgres/*.sql
|
||||
//go:embed 43/*.sql
|
||||
createFieldsDomainIndex embed.FS
|
||||
)
|
||||
|
||||
@@ -22,7 +21,7 @@ type CreateFieldsDomainIndex struct {
|
||||
}
|
||||
|
||||
func (mig *CreateFieldsDomainIndex) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(createFieldsDomainIndex, "43", mig.dbClient.Type())
|
||||
statements, err := readStatements(createFieldsDomainIndex, "43")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -1,3 +0,0 @@
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS fields_instance_domains_idx
|
||||
ON eventstore.fields (object_id)
|
||||
WHERE object_type = 'instance_domain' AND field_name = 'domain';
|
@@ -21,7 +21,7 @@ type ReplaceCurrentSequencesIndex struct {
|
||||
}
|
||||
|
||||
func (mig *ReplaceCurrentSequencesIndex) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(replaceCurrentSequencesIndex, "44", "")
|
||||
statements, err := readStatements(replaceCurrentSequencesIndex, "44")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -21,7 +21,7 @@ var (
|
||||
)
|
||||
|
||||
func (mig *InitPermissionFunctions) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(permissionFunctions, "46", "")
|
||||
statements, err := readStatements(permissionFunctions, "46")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -21,7 +21,7 @@ var (
|
||||
)
|
||||
|
||||
func (mig *InitPermittedOrgsFunction) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(permittedOrgsFunction, "49", "")
|
||||
statements, err := readStatements(permittedOrgsFunction, "49")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
37
cmd/setup/53.go
Normal file
37
cmd/setup/53.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"fmt"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
type InitPermittedOrgsFunction53 struct {
|
||||
dbClient *database.DB
|
||||
}
|
||||
|
||||
//go:embed 53/*.sql
|
||||
var permittedOrgsFunction53 embed.FS
|
||||
|
||||
func (mig *InitPermittedOrgsFunction53) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||
statements, err := readStatements(permittedOrgsFunction53, "53")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, stmt := range statements {
|
||||
logging.WithFields("file", stmt.file, "migration", mig.String()).Info("execute statement")
|
||||
if _, err := mig.dbClient.ExecContext(ctx, stmt.query); err != nil {
|
||||
return fmt.Errorf("%s %s: %w", mig.String(), stmt.file, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*InitPermittedOrgsFunction53) String() string {
|
||||
return "53_init_permitted_orgs_function"
|
||||
}
|
43
cmd/setup/53/01-get-permissions-from-JSON.sql
Normal file
43
cmd/setup/53/01-get-permissions-from-JSON.sql
Normal file
@@ -0,0 +1,43 @@
|
||||
DROP FUNCTION IF EXISTS eventstore.get_system_permissions;
|
||||
|
||||
CREATE OR REPLACE FUNCTION eventstore.get_system_permissions(
|
||||
permissions_json JSONB
|
||||
/*
|
||||
[
|
||||
{
|
||||
"member_type": "System",
|
||||
"aggregate_id": "",
|
||||
"object_id": "",
|
||||
"permissions": ["iam.read", "iam.write", "iam.polic.read"]
|
||||
},
|
||||
{
|
||||
"member_type": "IAM",
|
||||
"aggregate_id": "310716990375453665",
|
||||
"object_id": "",
|
||||
"permissions": ["iam.read", "iam.write", "iam.polic.read"]
|
||||
}
|
||||
]
|
||||
*/
|
||||
, permm TEXT
|
||||
)
|
||||
RETURNS TABLE (
|
||||
member_type TEXT,
|
||||
aggregate_id TEXT,
|
||||
object_id TEXT
|
||||
)
|
||||
LANGUAGE 'plpgsql'
|
||||
AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT res.member_type, res.aggregate_id, res.object_id FROM (
|
||||
SELECT
|
||||
(perm)->>'member_type' AS member_type,
|
||||
(perm)->>'aggregate_id' AS aggregate_id,
|
||||
(perm)->>'object_id' AS object_id,
|
||||
permission
|
||||
FROM jsonb_array_elements(permissions_json) AS perm
|
||||
CROSS JOIN jsonb_array_elements_text(perm->'permissions') AS permission) AS res
|
||||
WHERE res. permission= permm;
|
||||
END;
|
||||
$$;
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user