mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 18:57:32 +00:00
fix(mirror): add max auth request age configuration (#9812)
# Which Problems Are Solved The `auth.auth_requests` table is not cleaned up so long running Zitadel installations can contain many rows. The mirror command can take long because a the data are first copied into memory (or disk) on cockroach and users do not get any output from mirror. This is unfortunate because people don't know if Zitadel got stuck. # How the Problems Are Solved Enhance logging throughout the projection processes and introduce a configuration option for the maximum age of authentication requests. # Additional Changes None # Additional Context closes https://github.com/zitadel/zitadel/issues/9764 --------- Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"io"
|
"io"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/stdlib"
|
"github.com/jackc/pgx/v5/stdlib"
|
||||||
@@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) {
|
|||||||
logging.OnError(err).Fatal("unable to connect to destination database")
|
logging.OnError(err).Fatal("unable to connect to destination database")
|
||||||
defer destClient.Close()
|
defer destClient.Close()
|
||||||
|
|
||||||
copyAuthRequests(ctx, sourceClient, destClient)
|
copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
|
||||||
|
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
|
||||||
|
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
logging.OnError(err).Fatal("unable to acquire connection")
|
logging.OnError(err).Fatal("unable to acquire connection")
|
||||||
defer sourceConn.Close()
|
defer sourceConn.Close()
|
||||||
@@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
|||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err = sourceConn.Raw(func(driverConn interface{}) error {
|
err = sourceConn.Raw(func(driverConn any) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT")
|
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
|
||||||
w.Close()
|
w.Close()
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
|||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var affected int64
|
var affected int64
|
||||||
err = destConn.Raw(func(driverConn interface{}) error {
|
err = destConn.Raw(func(driverConn any) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
if shouldReplace {
|
if shouldReplace {
|
||||||
|
@@ -23,7 +23,8 @@ type Migration struct {
|
|||||||
Source database.Config
|
Source database.Config
|
||||||
Destination database.Config
|
Destination database.Config
|
||||||
|
|
||||||
EventBulkSize uint32
|
EventBulkSize uint32
|
||||||
|
MaxAuthRequestAge time.Duration
|
||||||
|
|
||||||
Log *logging.Config
|
Log *logging.Config
|
||||||
Machine *id.Config
|
Machine *id.Config
|
||||||
|
@@ -1,61 +1,64 @@
|
|||||||
Source:
|
Source:
|
||||||
cockroach:
|
cockroach:
|
||||||
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
|
Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST
|
||||||
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
|
Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT
|
||||||
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
|
Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE
|
||||||
MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
|
MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS
|
||||||
MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
|
MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS
|
||||||
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
|
MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
|
MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME
|
||||||
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
|
Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
|
Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME
|
||||||
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
|
Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
|
Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE
|
||||||
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
|
RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT
|
||||||
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
|
Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT
|
||||||
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
|
Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY
|
||||||
# Postgres is used as soon as a value is set
|
# Postgres is used as soon as a value is set
|
||||||
# The values describe the possible fields to set values
|
# The values describe the possible fields to set values
|
||||||
postgres:
|
postgres:
|
||||||
Host: # ZITADEL_DATABASE_POSTGRES_HOST
|
Host: # ZITADEL_SOURCE_POSTGRES_HOST
|
||||||
Port: # ZITADEL_DATABASE_POSTGRES_PORT
|
Port: # ZITADEL_SOURCE_POSTGRES_PORT
|
||||||
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
|
Database: # ZITADEL_SOURCE_POSTGRES_DATABASE
|
||||||
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS
|
||||||
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS
|
||||||
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME
|
||||||
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME
|
||||||
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE
|
||||||
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT
|
||||||
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT
|
||||||
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY
|
||||||
|
|
||||||
Destination:
|
Destination:
|
||||||
postgres:
|
postgres:
|
||||||
Host: localhost # ZITADEL_DATABASE_POSTGRES_HOST
|
Host: localhost # ZITADEL_DESTINATION_POSTGRES_HOST
|
||||||
Port: 5432 # ZITADEL_DATABASE_POSTGRES_PORT
|
Port: 5432 # ZITADEL_DESTINATION_POSTGRES_PORT
|
||||||
Database: zitadel # ZITADEL_DATABASE_POSTGRES_DATABASE
|
Database: zitadel # ZITADEL_DESTINATION_POSTGRES_DATABASE
|
||||||
MaxOpenConns: 5 # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
MaxOpenConns: 5 # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS
|
||||||
MaxIdleConns: 2 # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
MaxIdleConns: 2 # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS
|
||||||
MaxConnLifetime: 30m # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
MaxConnLifetime: 30m # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: 5m # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
MaxConnIdleTime: 5m # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME
|
||||||
Options: "" # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
Options: "" # ZITADEL_DESTINATION_POSTGRES_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: zitadel # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
Username: zitadel # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME
|
||||||
Password: "" # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
Password: "" # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: disable # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
Mode: disable # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE
|
||||||
RootCert: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
RootCert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT
|
||||||
Cert: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
Cert: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT
|
||||||
Key: "" # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
Key: "" # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY
|
||||||
|
|
||||||
EventBulkSize: 10000
|
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
||||||
|
# The maximum duration an auth request was last updated before it gets ignored.
|
||||||
|
# Default is 30 days
|
||||||
|
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
|
||||||
|
|
||||||
Projections:
|
Projections:
|
||||||
# The maximum duration a transaction remains open
|
# The maximum duration a transaction remains open
|
||||||
@@ -64,14 +67,14 @@ Projections:
|
|||||||
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
|
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
|
||||||
# turn off scheduler during operation
|
# turn off scheduler during operation
|
||||||
RequeueEvery: 0s
|
RequeueEvery: 0s
|
||||||
ConcurrentInstances: 7
|
ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES
|
||||||
EventBulkLimit: 1000
|
EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT
|
||||||
Customizations:
|
Customizations:
|
||||||
notifications:
|
notifications:
|
||||||
MaxFailureCount: 1
|
MaxFailureCount: 1
|
||||||
|
|
||||||
Eventstore:
|
Eventstore:
|
||||||
MaxRetries: 3
|
MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES
|
||||||
|
|
||||||
Auth:
|
Auth:
|
||||||
Spooler:
|
Spooler:
|
||||||
|
@@ -69,6 +69,7 @@ func positionQuery(db *db.DB) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||||
|
logging.Info("starting to copy events")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
|
|
||||||
@@ -130,7 +131,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
||||||
}
|
}
|
||||||
|
logging.WithFields("batch_count", i).Info("batch of events copied")
|
||||||
|
|
||||||
if tag.RowsAffected() < int64(bulkSize) {
|
if tag.RowsAffected() < int64(bulkSize) {
|
||||||
|
logging.WithFields("batch_count", i).Info("last batch of events copied")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,6 +206,7 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
||||||
|
logging.Info("starting to copy unique constraints")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
|
@@ -3,6 +3,7 @@ package mirror
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -104,6 +105,7 @@ func projections(
|
|||||||
config *ProjectionsConfig,
|
config *ProjectionsConfig,
|
||||||
masterKey string,
|
masterKey string,
|
||||||
) {
|
) {
|
||||||
|
logging.Info("starting to fill projections")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
client, err := database.Connect(config.Destination, false)
|
client, err := database.Connect(config.Destination, false)
|
||||||
@@ -255,8 +257,10 @@ func projections(
|
|||||||
go execProjections(ctx, instances, failedInstances, &wg)
|
go execProjections(ctx, instances, failedInstances, &wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, instance := range queryInstanceIDs(ctx, client) {
|
existingInstances := queryInstanceIDs(ctx, client)
|
||||||
|
for i, instance := range existingInstances {
|
||||||
instances <- instance
|
instances <- instance
|
||||||
|
logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection")
|
||||||
}
|
}
|
||||||
close(instances)
|
close(instances)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@@ -268,7 +272,7 @@ func projections(
|
|||||||
|
|
||||||
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
|
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
|
||||||
for instance := range instances {
|
for instance := range instances {
|
||||||
logging.WithFields("instance", instance).Info("start projections")
|
logging.WithFields("instance", instance).Info("starting projections")
|
||||||
ctx = internal_authz.WithInstanceID(ctx, instance)
|
ctx = internal_authz.WithInstanceID(ctx, instance)
|
||||||
|
|
||||||
err := projection.ProjectInstance(ctx)
|
err := projection.ProjectInstance(ctx)
|
||||||
@@ -311,7 +315,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
|
|||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns the instance configured by flag
|
// queryInstanceIDs returns the instance configured by flag
|
||||||
// or all instances which are not removed
|
// or all instances which are not removed
|
||||||
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
|
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
|
||||||
if len(instanceIDs) > 0 {
|
if len(instanceIDs) > 0 {
|
||||||
|
@@ -46,6 +46,7 @@ func copySystem(ctx context.Context, config *Migration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyAssets(ctx context.Context, source, dest *database.DB) {
|
func copyAssets(ctx context.Context, source, dest *database.DB) {
|
||||||
|
logging.Info("starting to copy assets")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
@@ -70,7 +71,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
|||||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var eventCount int64
|
var assetCount int64
|
||||||
err = destConn.Raw(func(driverConn interface{}) error {
|
err = destConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
@@ -82,16 +83,17 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
|
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
|
||||||
eventCount = tag.RowsAffected()
|
assetCount = tag.RowsAffected()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
logging.OnError(err).Fatal("unable to copy assets to destination")
|
logging.OnError(err).Fatal("unable to copy assets to destination")
|
||||||
logging.OnError(<-errs).Fatal("unable to copy assets from source")
|
logging.OnError(<-errs).Fatal("unable to copy assets from source")
|
||||||
logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated")
|
logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated")
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
||||||
|
logging.Info("starting to copy encryption keys")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
@@ -116,7 +118,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
|||||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var eventCount int64
|
var keyCount int64
|
||||||
err = destConn.Raw(func(driverConn interface{}) error {
|
err = destConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
@@ -128,11 +130,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
|
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
|
||||||
eventCount = tag.RowsAffected()
|
keyCount = tag.RowsAffected()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
|
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
|
||||||
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
|
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
|
||||||
logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated")
|
logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated")
|
||||||
}
|
}
|
||||||
|
@@ -158,6 +158,9 @@ Destination:
|
|||||||
|
|
||||||
# As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration
|
# As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration
|
||||||
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
||||||
|
# The maximum duration an auth request was last updated before it gets ignored.
|
||||||
|
# Default is 30 days
|
||||||
|
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
|
||||||
|
|
||||||
Projections:
|
Projections:
|
||||||
# Defines how many projections are allowed to run in parallel
|
# Defines how many projections are allowed to run in parallel
|
||||||
|
@@ -2,9 +2,13 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
||||||
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
||||||
@@ -57,11 +61,13 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for _, projection := range projections {
|
for i, projection := range projections {
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,8 +2,12 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
|
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
@@ -72,11 +76,13 @@ func Projections() []*handler2.Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for _, projection := range projections {
|
for i, projection := range projections {
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,8 +2,12 @@ package notification
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/command"
|
"github.com/zitadel/zitadel/internal/command"
|
||||||
"github.com/zitadel/zitadel/internal/crypto"
|
"github.com/zitadel/zitadel/internal/crypto"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
@@ -68,11 +72,13 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for _, projection := range projections {
|
for i, projection := range projections {
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("notification projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,6 +2,9 @@ package projection
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/crypto"
|
"github.com/zitadel/zitadel/internal/crypto"
|
||||||
@@ -90,6 +93,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type projection interface {
|
type projection interface {
|
||||||
|
ProjectionName() string
|
||||||
Start(ctx context.Context)
|
Start(ctx context.Context)
|
||||||
Init(ctx context.Context) error
|
Init(ctx context.Context) error
|
||||||
Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error)
|
Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error)
|
||||||
@@ -206,21 +210,25 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for _, projection := range projections {
|
for i, projection := range projections {
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstanceFields(ctx context.Context) error {
|
func ProjectInstanceFields(ctx context.Context) error {
|
||||||
for _, fieldProjection := range fields {
|
for i, fieldProjection := range fields {
|
||||||
|
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
|
||||||
err := fieldProjection.Trigger(ctx)
|
err := fieldProjection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user