fix(eventstore): use decimal, correct mirror (#9905)

back port #9812, #9878, #9881, #9884

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
This commit is contained in:
Silvan
2025-05-20 12:58:12 +02:00
committed by GitHub
parent 1df6638847
commit f3338cdb9a
58 changed files with 545 additions and 331 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
_ "embed"
"io"
"strconv"
"time"
"github.com/jackc/pgx/v5/stdlib"
@@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) {
logging.OnError(err).Fatal("unable to connect to destination database")
defer destClient.Close()
copyAuthRequests(ctx, sourceClient, destClient)
copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
}
func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
start := time.Now()
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
sourceConn, err := source.Conn(ctx)
logging.OnError(err).Fatal("unable to acquire connection")
defer sourceConn.Close()
@@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
errs := make(chan error, 1)
go func() {
err = sourceConn.Raw(func(driverConn interface{}) error {
err = sourceConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT")
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
w.Close()
return err
})
@@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
defer destConn.Close()
var affected int64
err = destConn.Raw(func(driverConn interface{}) error {
err = destConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
if shouldReplace {

View File

@@ -23,7 +23,8 @@ type Migration struct {
Source database.Config
Destination database.Config
EventBulkSize uint32
EventBulkSize uint32
MaxAuthRequestAge time.Duration
Log *logging.Config
Machine *id.Config

View File

@@ -1,84 +1,88 @@
Source:
cockroach:
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
EventPushConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST
Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT
Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE
MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS
MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS
EventPushConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME
Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS
User:
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD
SSL:
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY
# Postgres is used as soon as a value is set
# The values describe the possible fields to set values
postgres:
Host: # ZITADEL_DATABASE_POSTGRES_HOST
Port: # ZITADEL_DATABASE_POSTGRES_PORT
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
Host: # ZITADEL_SOURCE_POSTGRES_HOST
Port: # ZITADEL_SOURCE_POSTGRES_PORT
Database: # ZITADEL_SOURCE_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS
User:
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME
Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD
SSL:
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY
Destination:
cockroach:
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE
MaxOpenConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS
MaxIdleConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME
EventPushConnRatio: 0.01 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS
Host: localhost # ZITADEL_DESTINATION_COCKROACH_HOST
Port: 26257 # ZITADEL_DESTINATION_COCKROACH_PORT
Database: zitadel # ZITADEL_DESTINATION_COCKROACH_DATABASE
MaxOpenConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXOPENCONNS
MaxIdleConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXIDLECONNS
MaxConnLifetime: 30m # ZITADEL_DESTINATION_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_DESTINATION_COCKROACH_MAXCONNIDLETIME
EventPushConnRatio: 0.01 # ZITADEL_DESTINATION_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DESTINATION_COCKROACH_PROJECTIONSPOOLERCONNRATIO
Options: "" # ZITADEL_DESTINATION_COCKROACH_OPTIONS
User:
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD
Username: zitadel # ZITADEL_DESTINATION_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_DESTINATION_COCKROACH_USER_PASSWORD
SSL:
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY
Mode: disable # ZITADEL_DESTINATION_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_KEY
# Postgres is used as soon as a value is set
# The values describe the possible fields to set values
postgres:
Host: # ZITADEL_DATABASE_POSTGRES_HOST
Port: # ZITADEL_DATABASE_POSTGRES_PORT
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS
Host: # ZITADEL_DESTINATION_POSTGRES_HOST
Port: # ZITADEL_DESTINATION_POSTGRES_PORT
Database: # ZITADEL_DESTINATION_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_DESTINATION_POSTGRES_OPTIONS
User:
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
Username: # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME
Password: # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD
SSL:
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY
Mode: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY
EventBulkSize: 10000
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
# The maximum duration an auth request was last updated before it gets ignored.
# Default is 30 days
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
Projections:
# The maximum duration a transaction remains open
@@ -87,14 +91,14 @@ Projections:
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
# turn off scheduler during operation
RequeueEvery: 0s
ConcurrentInstances: 7
EventBulkLimit: 1000
Customizations:
ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES
EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT
Customizations:
notifications:
MaxFailureCount: 1
Eventstore:
MaxRetries: 3
MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES
Auth:
Spooler:

View File

@@ -3,6 +3,8 @@ package mirror
import (
"context"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/projection"
"github.com/zitadel/zitadel/internal/v2/readmodel"
@@ -30,12 +32,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
return lastSuccess, nil
}
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) {
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) {
var cmd *eventstore.Command
if len(instanceIDs) > 0 {
cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
} else {
cmd = mirror_event.NewStartedSystemCommand(destination)
@@ -58,12 +60,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i
),
)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
return position.Position, nil
}
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error {
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error {
return destinationES.Push(
ctx,
eventstore.NewPushIntent(

View File

@@ -8,7 +8,9 @@ import (
"io"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
@@ -68,6 +70,7 @@ func positionQuery(db *db.DB) string {
}
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.Info("starting to copy events")
start := time.Now()
reader, writer := io.Pipe()
@@ -96,7 +99,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
nextPos := make(chan bool, 1)
pos := make(chan float64, 1)
pos := make(chan decimal.Decimal, 1)
errs := make(chan error, 3)
go func() {
@@ -126,7 +129,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
if err != nil {
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
}
logging.WithFields("batch_count", i).Info("batch of events copied")
if tag.RowsAffected() < int64(bulkSize) {
logging.WithFields("batch_count", i).Info("last batch of events copied")
return nil
}
@@ -144,7 +150,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
go func() {
defer close(pos)
for range nextPos {
var position float64
var position decimal.Decimal
err := dest.QueryRowContext(
ctx,
func(row *sql.Row) error {
@@ -167,6 +173,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN")
eventCount = tag.RowsAffected()
if err != nil {
pgErr := new(pgconn.PgError)
errors.As(err, &pgErr)
logging.WithError(err).WithField("pg_err_details", pgErr.Detail).Error("unable to copy events into destination")
return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination")
}
@@ -179,7 +189,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
}
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) {
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) {
joinedErrs := make([]error, 0, len(errs))
for err := range errs {
joinedErrs = append(joinedErrs, err)
@@ -198,6 +208,7 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou
}
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
logging.Info("starting to copy unique constraints")
start := time.Now()
reader, writer := io.Pipe()
errs := make(chan error, 1)

View File

@@ -3,6 +3,7 @@ package mirror
import (
"context"
"database/sql"
"fmt"
"net/http"
"sync"
"time"
@@ -103,6 +104,7 @@ func projections(
config *ProjectionsConfig,
masterKey string,
) {
logging.Info("starting to fill projections")
start := time.Now()
client, err := database.Connect(config.Destination, false)
@@ -118,7 +120,11 @@ func projections(
logging.OnError(err).Fatal("unable create static storage")
config.Eventstore.Querier = old_es.NewCRDB(client)
config.Eventstore.Pusher = new_es.NewEventstore(client)
newES := new_es.NewEventstore(client)
config.Eventstore.Pusher = newES
config.Eventstore.Searcher = newES
es := eventstore.NewEventstore(config.Eventstore)
esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{
MaxRetries: config.Eventstore.MaxRetries,
@@ -251,8 +257,10 @@ func projections(
go execProjections(ctx, instances, failedInstances, &wg)
}
for _, instance := range queryInstanceIDs(ctx, client) {
existingInstances := queryInstanceIDs(ctx, client)
for i, instance := range existingInstances {
instances <- instance
logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection")
}
close(instances)
wg.Wait()
@@ -264,7 +272,7 @@ func projections(
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
for instance := range instances {
logging.WithFields("instance", instance).Info("start projections")
logging.WithFields("instance", instance).Info("starting projections")
ctx = internal_authz.WithInstanceID(ctx, instance)
err := projection.ProjectInstance(ctx)
@@ -281,6 +289,13 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue
}
err = projection.ProjectInstanceFields(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
failedInstances <- instance
continue
}
err = auth_handler.ProjectInstance(ctx)
if err != nil {
logging.WithFields("instance", instance).OnError(err).Info("trigger auth handler failed")
@@ -299,7 +314,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
wg.Done()
}
// returns the instance configured by flag
// queryInstanceIDs returns the instance configured by flag
// or all instances which are not removed
func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
if len(instanceIDs) > 0 {

View File

@@ -46,6 +46,7 @@ func copySystem(ctx context.Context, config *Migration) {
}
func copyAssets(ctx context.Context, source, dest *database.DB) {
logging.Info("starting to copy assets")
start := time.Now()
sourceConn, err := source.Conn(ctx)
@@ -70,7 +71,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
logging.OnError(err).Fatal("unable to acquire dest connection")
defer destConn.Close()
var eventCount int64
var assetCount int64
err = destConn.Raw(func(driverConn interface{}) error {
conn := driverConn.(*stdlib.Conn).Conn()
@@ -82,16 +83,17 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
}
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
eventCount = tag.RowsAffected()
assetCount = tag.RowsAffected()
return err
})
logging.OnError(err).Fatal("unable to copy assets to destination")
logging.OnError(<-errs).Fatal("unable to copy assets from source")
logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated")
logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated")
}
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
logging.Info("starting to copy encryption keys")
start := time.Now()
sourceConn, err := source.Conn(ctx)
@@ -116,7 +118,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
logging.OnError(err).Fatal("unable to acquire dest connection")
defer destConn.Close()
var eventCount int64
var keyCount int64
err = destConn.Raw(func(driverConn interface{}) error {
conn := driverConn.(*stdlib.Conn).Conn()
@@ -128,11 +130,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
}
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
eventCount = tag.RowsAffected()
keyCount = tag.RowsAffected()
return err
})
logging.OnError(err).Fatal("unable to copy encryption keys to destination")
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated")
logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated")
}