diff --git a/cmd/mirror/auth.go b/cmd/mirror/auth.go index 0eba10d05f..3d7ae45bce 100644 --- a/cmd/mirror/auth.go +++ b/cmd/mirror/auth.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" "io" + "strconv" "time" "github.com/jackc/pgx/v5/stdlib" @@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) { logging.OnError(err).Fatal("unable to connect to destination database") defer destClient.Close() - copyAuthRequests(ctx, sourceClient, destClient) + copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge) } -func copyAuthRequests(ctx context.Context, source, dest *database.DB) { +func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) { start := time.Now() + logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database") + _, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)") + logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date") + sourceConn, err := source.Conn(ctx) logging.OnError(err).Fatal("unable to acquire connection") defer sourceConn.Close() @@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) { errs := make(chan error, 1) go func() { - err = sourceConn.Raw(func(driverConn interface{}) error { + err = sourceConn.Raw(func(driverConn any) error { conn := driverConn.(*stdlib.Conn).Conn() - _, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT") + _, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT") w.Close() return err }) @@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) { defer destConn.Close() var affected int64 - err = destConn.Raw(func(driverConn interface{}) error { + err = destConn.Raw(func(driverConn any) error { conn := driverConn.(*stdlib.Conn).Conn() if shouldReplace { diff --git a/cmd/mirror/config.go b/cmd/mirror/config.go index 9d0113a1d7..5bb19f12de 100644 --- a/cmd/mirror/config.go +++ b/cmd/mirror/config.go @@ -23,7 +23,8 @@ type Migration struct { Source database.Config Destination database.Config - EventBulkSize uint32 + EventBulkSize uint32 + MaxAuthRequestAge time.Duration Log *logging.Config Machine *id.Config diff --git a/cmd/mirror/defaults.yaml b/cmd/mirror/defaults.yaml index 4b42c06534..4d8a0a4eae 100644 --- a/cmd/mirror/defaults.yaml +++ b/cmd/mirror/defaults.yaml @@ -1,61 +1,64 @@ Source: cockroach: - Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST - Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT - Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE - MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS - MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS - MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME - MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME - Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS + Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST + Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT + Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE + MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS + MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS + MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME + MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME + Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS User: - Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME - Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD + Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME + Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD SSL: - Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE - RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT - Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT - Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY + Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE + RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT + Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT + Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY # Postgres is used as soon as a value is set # The values describe the possible fields to set values postgres: - Host: # ZITADEL_DATABASE_POSTGRES_HOST - Port: # ZITADEL_DATABASE_POSTGRES_PORT - Database: # ZITADEL_DATABASE_POSTGRES_DATABASE - MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS - MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS - MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME - MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME - Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS + Host: # ZITADEL_SOURCE_POSTGRES_HOST + Port: # ZITADEL_SOURCE_POSTGRES_PORT + Database: # ZITADEL_SOURCE_POSTGRES_DATABASE + MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS + MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS + MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME + MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME + Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS User: - Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME - Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD + Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME + Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD SSL: - Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE - RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT - Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT - Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY + Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE + RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT + Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT + Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY Destination: postgres: - Host: localhost # ZITADEL_DATABASE_POSTGRES_HOST - Port: 5432 # ZITADEL_DATABASE_POSTGRES_PORT - Database: zitadel # ZITADEL_DATABASE_POSTGRES_DATABASE - MaxOpenConns: 5 # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS - MaxIdleConns: 2 # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS - MaxConnLifetime: 30m # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME - MaxConnIdleTime: 5m # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME - Options: "" # ZITADEL_DATABASE_POSTGRES_OPTIONS + Host: localhost # ZITADEL_DESTINATION_POSTGRES_HOST + Port: 5432 # ZITADEL_DESTINATION_POSTGRES_PORT + Database: zitadel # ZITADEL_DESTINATION_POSTGRES_DATABASE + MaxOpenConns: 5 # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS + MaxIdleConns: 2 # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS + MaxConnLifetime: 30m # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME + MaxConnIdleTime: 5m # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME + Options: "" # ZITADEL_DESTINATION_POSTGRES_OPTIONS User: - Username: zitadel # ZITADEL_DATABASE_POSTGRES_USER_USERNAME - Password: "" # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD + Username: zitadel # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME + Password: "" # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD SSL: - Mode: disable # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE - RootCert: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT - Cert: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT - Key: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY + Mode: disable # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE + RootCert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT + Cert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT + Key: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY -EventBulkSize: 10000 +EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE +# The maximum duration an auth request was last updated before it gets ignored. +# Default is 30 days +MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE Projections: # The maximum duration a transaction remains open @@ -64,14 +67,14 @@ Projections: TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION # turn off scheduler during operation RequeueEvery: 0s - ConcurrentInstances: 7 - EventBulkLimit: 1000 - Customizations: + ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES + EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT + Customizations: notifications: MaxFailureCount: 1 Eventstore: - MaxRetries: 3 + MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES Auth: Spooler: diff --git a/cmd/mirror/event_store.go b/cmd/mirror/event_store.go index 8ce53b150a..41c529c025 100644 --- a/cmd/mirror/event_store.go +++ b/cmd/mirror/event_store.go @@ -69,6 +69,7 @@ func positionQuery(db *db.DB) string { } func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { + logging.Info("starting to copy events") start := time.Now() reader, writer := io.Pipe() @@ -130,7 +131,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { if err != nil { return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i) } + logging.WithFields("batch_count", i).Info("batch of events copied") + if tag.RowsAffected() < int64(bulkSize) { + logging.WithFields("batch_count", i).Info("last batch of events copied") return nil } @@ -202,6 +206,7 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou } func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) { + logging.Info("starting to copy unique constraints") start := time.Now() reader, writer := io.Pipe() errs := make(chan error, 1) diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index 66b3fb1a26..4e12b29748 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -3,6 +3,7 @@ package mirror import ( "context" "database/sql" + "fmt" "net/http" "sync" "time" @@ -104,6 +105,7 @@ func projections( config *ProjectionsConfig, masterKey string, ) { + logging.Info("starting to fill projections") start := time.Now() client, err := database.Connect(config.Destination, false) @@ -255,8 +257,10 @@ func projections( go execProjections(ctx, instances, failedInstances, &wg) } - for _, instance := range queryInstanceIDs(ctx, client) { + existingInstances := queryInstanceIDs(ctx, client) + for i, instance := range existingInstances { instances <- instance + logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection") } close(instances) wg.Wait() @@ -268,7 +272,7 @@ func projections( func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { for instance := range instances { - logging.WithFields("instance", instance).Info("start projections") + logging.WithFields("instance", instance).Info("starting projections") ctx = internal_authz.WithInstanceID(ctx, instance) err := projection.ProjectInstance(ctx) @@ -311,7 +315,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc wg.Done() } -// returns the instance configured by flag +// queryInstanceIDs returns the instance configured by flag // or all instances which are not removed func queryInstanceIDs(ctx context.Context, source *database.DB) []string { if len(instanceIDs) > 0 { diff --git a/cmd/mirror/system.go b/cmd/mirror/system.go index 00b48eb491..57eb205436 100644 --- a/cmd/mirror/system.go +++ b/cmd/mirror/system.go @@ -46,6 +46,7 @@ func copySystem(ctx context.Context, config *Migration) { } func copyAssets(ctx context.Context, source, dest *database.DB) { + logging.Info("starting to copy assets") start := time.Now() sourceConn, err := source.Conn(ctx) @@ -70,7 +71,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) { logging.OnError(err).Fatal("unable to acquire dest connection") defer destConn.Close() - var eventCount int64 + var assetCount int64 err = destConn.Raw(func(driverConn interface{}) error { conn := driverConn.(*stdlib.Conn).Conn() @@ -82,16 +83,17 @@ func copyAssets(ctx context.Context, source, dest *database.DB) { } tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin") - eventCount = tag.RowsAffected() + assetCount = tag.RowsAffected() return err }) logging.OnError(err).Fatal("unable to copy assets to destination") logging.OnError(<-errs).Fatal("unable to copy assets from source") - logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated") + logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated") } func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) { + logging.Info("starting to copy encryption keys") start := time.Now() sourceConn, err := source.Conn(ctx) @@ -116,7 +118,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) { logging.OnError(err).Fatal("unable to acquire dest connection") defer destConn.Close() - var eventCount int64 + var keyCount int64 err = destConn.Raw(func(driverConn interface{}) error { conn := driverConn.(*stdlib.Conn).Conn() @@ -128,11 +130,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) { } tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin") - eventCount = tag.RowsAffected() + keyCount = tag.RowsAffected() return err }) logging.OnError(err).Fatal("unable to copy encryption keys to destination") logging.OnError(<-errs).Fatal("unable to copy encryption keys from source") - logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated") + logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated") } diff --git a/docs/docs/self-hosting/manage/cli/mirror.mdx b/docs/docs/self-hosting/manage/cli/mirror.mdx index 45bac9b279..ae81800e39 100644 --- a/docs/docs/self-hosting/manage/cli/mirror.mdx +++ b/docs/docs/self-hosting/manage/cli/mirror.mdx @@ -158,6 +158,9 @@ Destination: # As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE +# The maximum duration an auth request was last updated before it gets ignored. +# Default is 30 days +MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE Projections: # Defines how many projections are allowed to run in parallel diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index ec268c25a1..76584b55b0 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -2,9 +2,13 @@ package handler import ( "context" + "fmt" "time" + "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler/v2" @@ -57,11 +61,13 @@ func Start(ctx context.Context) { } func ProjectInstance(ctx context.Context) error { - for _, projection := range projections { + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection") _, err := projection.Trigger(ctx) if err != nil { return err } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done") } return nil } diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 0d87ab06bb..74a27a8312 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -2,8 +2,12 @@ package handler import ( "context" + "fmt" "time" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" @@ -72,11 +76,13 @@ func Projections() []*handler2.Handler { } func ProjectInstance(ctx context.Context) error { - for _, projection := range projections { + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection") _, err := projection.Trigger(ctx) if err != nil { return err } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done") } return nil } diff --git a/internal/notification/projections.go b/internal/notification/projections.go index a2d4d4140e..9b6b975fa1 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -2,8 +2,12 @@ package notification import ( "context" + "fmt" "time" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/eventstore" @@ -68,11 +72,13 @@ func Start(ctx context.Context) { } func ProjectInstance(ctx context.Context) error { - for _, projection := range projections { + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection") _, err := projection.Trigger(ctx) if err != nil { return err } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("notification projection done") } return nil } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index f4e3bbe0d4..07953a27e8 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -2,6 +2,9 @@ package projection import ( "context" + "fmt" + + "github.com/zitadel/logging" internal_authz "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/crypto" @@ -90,6 +93,7 @@ var ( ) type projection interface { + ProjectionName() string Start(ctx context.Context) Init(ctx context.Context) error Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error) @@ -206,21 +210,25 @@ func Start(ctx context.Context) { } func ProjectInstance(ctx context.Context) error { - for _, projection := range projections { + for i, projection := range projections { + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection") _, err := projection.Trigger(ctx) if err != nil { return err } + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("projection done") } return nil } func ProjectInstanceFields(ctx context.Context) error { - for _, fieldProjection := range fields { + for i, fieldProjection := range fields { + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection") err := fieldProjection.Trigger(ctx) if err != nil { return err } + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done") } return nil }