mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-13 14:30:00 +00:00
Revert "fix(mirror): add max auth request age configuration (#9812)"
This reverts commit ba87ac7dc7
.
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/stdlib"
|
"github.com/jackc/pgx/v5/stdlib"
|
||||||
@@ -42,16 +41,12 @@ func copyAuth(ctx context.Context, config *Migration) {
|
|||||||
logging.OnError(err).Fatal("unable to connect to destination database")
|
logging.OnError(err).Fatal("unable to connect to destination database")
|
||||||
defer destClient.Close()
|
defer destClient.Close()
|
||||||
|
|
||||||
copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
|
copyAuthRequests(ctx, sourceClient, destClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
|
func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
|
|
||||||
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
|
|
||||||
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
|
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
logging.OnError(err).Fatal("unable to acquire connection")
|
logging.OnError(err).Fatal("unable to acquire connection")
|
||||||
defer sourceConn.Close()
|
defer sourceConn.Close()
|
||||||
@@ -60,9 +55,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthReq
|
|||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err = sourceConn.Raw(func(driverConn any) error {
|
err = sourceConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
|
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT")
|
||||||
w.Close()
|
w.Close()
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@@ -74,7 +69,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthReq
|
|||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var affected int64
|
var affected int64
|
||||||
err = destConn.Raw(func(driverConn any) error {
|
err = destConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
if shouldReplace {
|
if shouldReplace {
|
||||||
|
@@ -23,8 +23,7 @@ type Migration struct {
|
|||||||
Source database.Config
|
Source database.Config
|
||||||
Destination database.Config
|
Destination database.Config
|
||||||
|
|
||||||
EventBulkSize uint32
|
EventBulkSize uint32
|
||||||
MaxAuthRequestAge time.Duration
|
|
||||||
|
|
||||||
Log *logging.Config
|
Log *logging.Config
|
||||||
Machine *id.Config
|
Machine *id.Config
|
||||||
|
@@ -1,88 +1,84 @@
|
|||||||
Source:
|
Source:
|
||||||
cockroach:
|
cockroach:
|
||||||
Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST
|
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
|
||||||
Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT
|
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
|
||||||
Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE
|
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
|
||||||
MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS
|
MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
|
||||||
MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS
|
MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
|
||||||
EventPushConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_EVENTPUSHCONNRATIO
|
EventPushConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
|
||||||
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
||||||
MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME
|
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME
|
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
|
||||||
Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS
|
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME
|
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
|
||||||
Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD
|
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE
|
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
|
||||||
RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT
|
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
|
||||||
Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT
|
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
|
||||||
Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY
|
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
|
||||||
# Postgres is used as soon as a value is set
|
# Postgres is used as soon as a value is set
|
||||||
# The values describe the possible fields to set values
|
# The values describe the possible fields to set values
|
||||||
postgres:
|
postgres:
|
||||||
Host: # ZITADEL_SOURCE_POSTGRES_HOST
|
Host: # ZITADEL_DATABASE_POSTGRES_HOST
|
||||||
Port: # ZITADEL_SOURCE_POSTGRES_PORT
|
Port: # ZITADEL_DATABASE_POSTGRES_PORT
|
||||||
Database: # ZITADEL_SOURCE_POSTGRES_DATABASE
|
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
|
||||||
MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS
|
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
||||||
MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS
|
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
||||||
MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME
|
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME
|
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
||||||
Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS
|
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME
|
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
||||||
Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD
|
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE
|
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
||||||
RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT
|
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
||||||
Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT
|
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
||||||
Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY
|
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
||||||
|
|
||||||
Destination:
|
Destination:
|
||||||
cockroach:
|
cockroach:
|
||||||
Host: localhost # ZITADEL_DESTINATION_COCKROACH_HOST
|
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
|
||||||
Port: 26257 # ZITADEL_DESTINATION_COCKROACH_PORT
|
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
|
||||||
Database: zitadel # ZITADEL_DESTINATION_COCKROACH_DATABASE
|
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
|
||||||
MaxOpenConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXOPENCONNS
|
MaxOpenConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
|
||||||
MaxIdleConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXIDLECONNS
|
MaxIdleConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
|
||||||
MaxConnLifetime: 30m # ZITADEL_DESTINATION_COCKROACH_MAXCONNLIFETIME
|
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: 5m # ZITADEL_DESTINATION_COCKROACH_MAXCONNIDLETIME
|
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
|
||||||
EventPushConnRatio: 0.01 # ZITADEL_DESTINATION_COCKROACH_EVENTPUSHCONNRATIO
|
EventPushConnRatio: 0.01 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
|
||||||
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DESTINATION_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
|
||||||
Options: "" # ZITADEL_DESTINATION_COCKROACH_OPTIONS
|
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: zitadel # ZITADEL_DESTINATION_COCKROACH_USER_USERNAME
|
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
|
||||||
Password: "" # ZITADEL_DESTINATION_COCKROACH_USER_PASSWORD
|
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: disable # ZITADEL_DESTINATION_COCKROACH_USER_SSL_MODE
|
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
|
||||||
RootCert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_ROOTCERT
|
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
|
||||||
Cert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_CERT
|
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
|
||||||
Key: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_KEY
|
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
|
||||||
# Postgres is used as soon as a value is set
|
# Postgres is used as soon as a value is set
|
||||||
# The values describe the possible fields to set values
|
# The values describe the possible fields to set values
|
||||||
postgres:
|
postgres:
|
||||||
Host: # ZITADEL_DESTINATION_POSTGRES_HOST
|
Host: # ZITADEL_DATABASE_POSTGRES_HOST
|
||||||
Port: # ZITADEL_DESTINATION_POSTGRES_PORT
|
Port: # ZITADEL_DATABASE_POSTGRES_PORT
|
||||||
Database: # ZITADEL_DESTINATION_POSTGRES_DATABASE
|
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
|
||||||
MaxOpenConns: # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS
|
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
|
||||||
MaxIdleConns: # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS
|
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
|
||||||
MaxConnLifetime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME
|
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
|
||||||
MaxConnIdleTime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME
|
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
|
||||||
Options: # ZITADEL_DESTINATION_POSTGRES_OPTIONS
|
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
|
||||||
User:
|
User:
|
||||||
Username: # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME
|
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
||||||
Password: # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD
|
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
||||||
SSL:
|
SSL:
|
||||||
Mode: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE
|
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
|
||||||
RootCert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT
|
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
|
||||||
Cert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT
|
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
|
||||||
Key: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY
|
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
|
||||||
|
|
||||||
|
EventBulkSize: 10000
|
||||||
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
|
||||||
# The maximum duration an auth request was last updated before it gets ignored.
|
|
||||||
# Default is 30 days
|
|
||||||
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
|
|
||||||
|
|
||||||
Projections:
|
Projections:
|
||||||
# The maximum duration a transaction remains open
|
# The maximum duration a transaction remains open
|
||||||
@@ -91,14 +87,14 @@ Projections:
|
|||||||
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
|
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
|
||||||
# turn off scheduler during operation
|
# turn off scheduler during operation
|
||||||
RequeueEvery: 0s
|
RequeueEvery: 0s
|
||||||
ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES
|
ConcurrentInstances: 7
|
||||||
EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT
|
EventBulkLimit: 1000
|
||||||
Customizations:
|
Customizations:
|
||||||
notifications:
|
notifications:
|
||||||
MaxFailureCount: 1
|
MaxFailureCount: 1
|
||||||
|
|
||||||
Eventstore:
|
Eventstore:
|
||||||
MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES
|
MaxRetries: 3
|
||||||
|
|
||||||
Auth:
|
Auth:
|
||||||
Spooler:
|
Spooler:
|
||||||
|
@@ -68,7 +68,6 @@ func positionQuery(db *db.DB) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
||||||
logging.Info("starting to copy events")
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
|
|
||||||
@@ -127,10 +126,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
||||||
}
|
}
|
||||||
logging.WithFields("batch_count", i).Info("batch of events copied")
|
|
||||||
|
|
||||||
if tag.RowsAffected() < int64(bulkSize) {
|
if tag.RowsAffected() < int64(bulkSize) {
|
||||||
logging.WithFields("batch_count", i).Info("last batch of events copied")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,7 +198,6 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
||||||
logging.Info("starting to copy unique constraints")
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
|
@@ -3,7 +3,6 @@ package mirror
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -104,7 +103,6 @@ func projections(
|
|||||||
config *ProjectionsConfig,
|
config *ProjectionsConfig,
|
||||||
masterKey string,
|
masterKey string,
|
||||||
) {
|
) {
|
||||||
logging.Info("starting to fill projections")
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
client, err := database.Connect(config.Destination, false)
|
client, err := database.Connect(config.Destination, false)
|
||||||
@@ -253,10 +251,8 @@ func projections(
|
|||||||
go execProjections(ctx, instances, failedInstances, &wg)
|
go execProjections(ctx, instances, failedInstances, &wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
existingInstances := queryInstanceIDs(ctx, client)
|
for _, instance := range queryInstanceIDs(ctx, client) {
|
||||||
for i, instance := range existingInstances {
|
|
||||||
instances <- instance
|
instances <- instance
|
||||||
logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection")
|
|
||||||
}
|
}
|
||||||
close(instances)
|
close(instances)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@@ -268,7 +264,7 @@ func projections(
|
|||||||
|
|
||||||
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
|
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
|
||||||
for instance := range instances {
|
for instance := range instances {
|
||||||
logging.WithFields("instance", instance).Info("starting projections")
|
logging.WithFields("instance", instance).Info("start projections")
|
||||||
ctx = internal_authz.WithInstanceID(ctx, instance)
|
ctx = internal_authz.WithInstanceID(ctx, instance)
|
||||||
|
|
||||||
err := projection.ProjectInstance(ctx)
|
err := projection.ProjectInstance(ctx)
|
||||||
@@ -303,7 +299,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
|
|||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// queryInstanceIDs returns the instance configured by flag
|
// returns the instance configured by flag
|
||||||
// or all instances which are not removed
|
// or all instances which are not removed
|
||||||
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
|
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
|
||||||
if len(instanceIDs) > 0 {
|
if len(instanceIDs) > 0 {
|
||||||
|
@@ -46,7 +46,6 @@ func copySystem(ctx context.Context, config *Migration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyAssets(ctx context.Context, source, dest *database.DB) {
|
func copyAssets(ctx context.Context, source, dest *database.DB) {
|
||||||
logging.Info("starting to copy assets")
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
@@ -71,7 +70,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
|||||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var assetCount int64
|
var eventCount int64
|
||||||
err = destConn.Raw(func(driverConn interface{}) error {
|
err = destConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
@@ -83,17 +82,16 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
|
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
|
||||||
assetCount = tag.RowsAffected()
|
eventCount = tag.RowsAffected()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
logging.OnError(err).Fatal("unable to copy assets to destination")
|
logging.OnError(err).Fatal("unable to copy assets to destination")
|
||||||
logging.OnError(<-errs).Fatal("unable to copy assets from source")
|
logging.OnError(<-errs).Fatal("unable to copy assets from source")
|
||||||
logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated")
|
logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated")
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
||||||
logging.Info("starting to copy encryption keys")
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
sourceConn, err := source.Conn(ctx)
|
sourceConn, err := source.Conn(ctx)
|
||||||
@@ -118,7 +116,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
|||||||
logging.OnError(err).Fatal("unable to acquire dest connection")
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
||||||
defer destConn.Close()
|
defer destConn.Close()
|
||||||
|
|
||||||
var keyCount int64
|
var eventCount int64
|
||||||
err = destConn.Raw(func(driverConn interface{}) error {
|
err = destConn.Raw(func(driverConn interface{}) error {
|
||||||
conn := driverConn.(*stdlib.Conn).Conn()
|
conn := driverConn.(*stdlib.Conn).Conn()
|
||||||
|
|
||||||
@@ -130,11 +128,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
|
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
|
||||||
keyCount = tag.RowsAffected()
|
eventCount = tag.RowsAffected()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
|
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
|
||||||
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
|
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
|
||||||
logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated")
|
logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated")
|
||||||
}
|
}
|
||||||
|
@@ -163,9 +163,6 @@ Destination:
|
|||||||
|
|
||||||
# As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration
|
# As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration
|
||||||
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
|
||||||
# The maximum duration an auth request was last updated before it gets ignored.
|
|
||||||
# Default is 30 days
|
|
||||||
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
|
|
||||||
|
|
||||||
Projections:
|
Projections:
|
||||||
# Defines how many projections are allowed to run in parallel
|
# Defines how many projections are allowed to run in parallel
|
||||||
|
@@ -2,13 +2,9 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/logging"
|
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
||||||
@@ -61,13 +57,11 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for _, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
|
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,12 +2,8 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/logging"
|
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
|
||||||
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
|
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
@@ -76,13 +72,11 @@ func Projections() []*handler2.Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for _, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
|
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,12 +2,8 @@ package notification
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/logging"
|
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
|
||||||
"github.com/zitadel/zitadel/internal/command"
|
"github.com/zitadel/zitadel/internal/command"
|
||||||
"github.com/zitadel/zitadel/internal/crypto"
|
"github.com/zitadel/zitadel/internal/crypto"
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
@@ -68,13 +64,11 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for _, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")
|
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("notification projection done")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -2,9 +2,6 @@ package projection
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/zitadel/logging"
|
|
||||||
|
|
||||||
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/crypto"
|
"github.com/zitadel/zitadel/internal/crypto"
|
||||||
@@ -92,7 +89,6 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type projection interface {
|
type projection interface {
|
||||||
ProjectionName() string
|
|
||||||
Start(ctx context.Context)
|
Start(ctx context.Context)
|
||||||
Init(ctx context.Context) error
|
Init(ctx context.Context) error
|
||||||
Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error)
|
Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error)
|
||||||
@@ -205,8 +201,7 @@ func Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for _, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
|
|
||||||
_, err := projection.Trigger(ctx)
|
_, err := projection.Trigger(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Reference in New Issue
Block a user