mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-13 16:35:53 +00:00

# Which Problems Are Solved The `auth.auth_requests` table is not cleaned up so long running Zitadel installations can contain many rows. The mirror command can take long because a the data are first copied into memory (or disk) on cockroach and users do not get any output from mirror. This is unfortunate because people don't know if Zitadel got stuck. # How the Problems Are Solved Enhance logging throughout the projection processes and introduce a configuration option for the maximum age of authentication requests. # Additional Changes None # Additional Context closes https://github.com/zitadel/zitadel/issues/9764 --------- Co-authored-by: Livio Spring <livio.a@gmail.com>
259 lines
8.2 KiB
Go
259 lines
8.2 KiB
Go
package mirror
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
_ "embed"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/stdlib"
|
|
"github.com/spf13/cobra"
|
|
"github.com/spf13/viper"
|
|
"github.com/zitadel/logging"
|
|
|
|
db "github.com/zitadel/zitadel/internal/database"
|
|
"github.com/zitadel/zitadel/internal/database/dialect"
|
|
"github.com/zitadel/zitadel/internal/id"
|
|
"github.com/zitadel/zitadel/internal/v2/database"
|
|
"github.com/zitadel/zitadel/internal/v2/eventstore"
|
|
"github.com/zitadel/zitadel/internal/v2/eventstore/postgres"
|
|
"github.com/zitadel/zitadel/internal/zerrors"
|
|
)
|
|
|
|
var shouldIgnorePrevious bool
|
|
|
|
func eventstoreCmd() *cobra.Command {
|
|
cmd := &cobra.Command{
|
|
Use: "eventstore",
|
|
Short: "mirrors the eventstore of an instance from one database to another",
|
|
Long: `mirrors the eventstore of an instance from one database to another
|
|
ZITADEL needs to be initialized and set up with the --for-mirror flag
|
|
Migrate only copies events2 and unique constraints`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
config := mustNewMigrationConfig(viper.GetViper())
|
|
copyEventstore(cmd.Context(), config)
|
|
},
|
|
}
|
|
|
|
cmd.Flags().BoolVar(&shouldReplace, "replace", false, "allow delete unique constraints of defined instances before copy")
|
|
cmd.Flags().BoolVar(&shouldIgnorePrevious, "ignore-previous", false, "ignores previous migrations of the events table")
|
|
|
|
return cmd
|
|
}
|
|
|
|
func copyEventstore(ctx context.Context, config *Migration) {
|
|
sourceClient, err := db.Connect(config.Source, false)
|
|
logging.OnError(err).Fatal("unable to connect to source database")
|
|
defer sourceClient.Close()
|
|
|
|
destClient, err := db.Connect(config.Destination, false)
|
|
logging.OnError(err).Fatal("unable to connect to destination database")
|
|
defer destClient.Close()
|
|
|
|
copyEvents(ctx, sourceClient, destClient, config.EventBulkSize)
|
|
copyUniqueConstraints(ctx, sourceClient, destClient)
|
|
}
|
|
|
|
func positionQuery(db *db.DB) string {
|
|
switch db.Type() {
|
|
case dialect.DatabaseTypePostgres:
|
|
return "SELECT EXTRACT(EPOCH FROM clock_timestamp())"
|
|
case dialect.DatabaseTypeCockroach:
|
|
return "SELECT cluster_logical_timestamp()"
|
|
default:
|
|
logging.WithFields("db_type", db.Type()).Fatal("database type not recognized")
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
|
|
logging.Info("starting to copy events")
|
|
start := time.Now()
|
|
reader, writer := io.Pipe()
|
|
|
|
migrationID, err := id.SonyFlakeGenerator().Next()
|
|
logging.OnError(err).Fatal("unable to generate migration id")
|
|
|
|
sourceConn, err := source.Conn(ctx)
|
|
logging.OnError(err).Fatal("unable to acquire source connection")
|
|
|
|
destConn, err := dest.Conn(ctx)
|
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
|
|
|
destinationES := eventstore.NewEventstoreFromOne(postgres.New(dest, &postgres.Config{
|
|
MaxRetries: 3,
|
|
}))
|
|
|
|
previousMigration, err := queryLastSuccessfulMigration(ctx, destinationES, source.DatabaseName())
|
|
logging.OnError(err).Fatal("unable to query latest successful migration")
|
|
|
|
var maxPosition float64
|
|
err = source.QueryRowContext(ctx,
|
|
func(row *sql.Row) error {
|
|
return row.Scan(&maxPosition)
|
|
},
|
|
"SELECT MAX(position) FROM eventstore.events2 "+instanceClause(),
|
|
)
|
|
logging.OnError(err).Fatal("unable to query max position from source")
|
|
|
|
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
|
|
|
|
nextPos := make(chan bool, 1)
|
|
pos := make(chan float64, 1)
|
|
errs := make(chan error, 3)
|
|
|
|
go func() {
|
|
err := sourceConn.Raw(func(driverConn interface{}) error {
|
|
conn := driverConn.(*stdlib.Conn).Conn()
|
|
nextPos <- true
|
|
var i uint32
|
|
for position := range pos {
|
|
var stmt database.Statement
|
|
stmt.WriteString("COPY (SELECT instance_id, aggregate_type, aggregate_id, event_type, sequence, revision, created_at, regexp_replace(payload::TEXT, '\\\\u0000', '', 'g')::JSON payload, creator, owner, ")
|
|
stmt.WriteArg(position)
|
|
stmt.WriteString(" position, row_number() OVER (PARTITION BY instance_id ORDER BY position, in_tx_order) AS in_tx_order FROM eventstore.events2 ")
|
|
stmt.WriteString(instanceClause())
|
|
stmt.WriteString(" AND ")
|
|
database.NewNumberAtMost(maxPosition).Write(&stmt, "position")
|
|
stmt.WriteString(" AND ")
|
|
database.NewNumberGreater(previousMigration.Position).Write(&stmt, "position")
|
|
stmt.WriteString(" ORDER BY instance_id, position, in_tx_order")
|
|
stmt.WriteString(" LIMIT ")
|
|
stmt.WriteArg(bulkSize)
|
|
stmt.WriteString(" OFFSET ")
|
|
stmt.WriteArg(bulkSize * i)
|
|
stmt.WriteString(") TO STDOUT")
|
|
|
|
// Copy does not allow args so we use we replace the args in the statement
|
|
tag, err := conn.PgConn().CopyTo(ctx, writer, stmt.Debug())
|
|
if err != nil {
|
|
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
|
|
}
|
|
logging.WithFields("batch_count", i).Info("batch of events copied")
|
|
|
|
if tag.RowsAffected() < int64(bulkSize) {
|
|
logging.WithFields("batch_count", i).Info("last batch of events copied")
|
|
return nil
|
|
}
|
|
|
|
nextPos <- true
|
|
i++
|
|
}
|
|
return nil
|
|
})
|
|
writer.Close()
|
|
close(nextPos)
|
|
errs <- err
|
|
}()
|
|
|
|
// generate next position for
|
|
go func() {
|
|
defer close(pos)
|
|
for range nextPos {
|
|
var position float64
|
|
err := dest.QueryRowContext(
|
|
ctx,
|
|
func(row *sql.Row) error {
|
|
return row.Scan(&position)
|
|
},
|
|
positionQuery(dest),
|
|
)
|
|
if err != nil {
|
|
errs <- zerrors.ThrowUnknown(err, "MIGRA-kMyPH", "unable to query next position")
|
|
return
|
|
}
|
|
pos <- position
|
|
}
|
|
}()
|
|
|
|
var eventCount int64
|
|
errs <- destConn.Raw(func(driverConn interface{}) error {
|
|
conn := driverConn.(*stdlib.Conn).Conn()
|
|
|
|
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN")
|
|
eventCount = tag.RowsAffected()
|
|
if err != nil {
|
|
return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination")
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
close(errs)
|
|
writeCopyEventsDone(ctx, destinationES, migrationID, source.DatabaseName(), maxPosition, errs)
|
|
|
|
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
|
|
}
|
|
|
|
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) {
|
|
joinedErrs := make([]error, 0, len(errs))
|
|
for err := range errs {
|
|
joinedErrs = append(joinedErrs, err)
|
|
}
|
|
err := errors.Join(joinedErrs...)
|
|
|
|
if err != nil {
|
|
logging.WithError(err).Error("unable to mirror events")
|
|
err := writeMigrationFailed(ctx, es, id, source, err)
|
|
logging.OnError(err).Fatal("unable to write failed event")
|
|
return
|
|
}
|
|
|
|
err = writeMigrationSucceeded(ctx, es, id, source, position)
|
|
logging.OnError(err).Fatal("unable to write failed event")
|
|
}
|
|
|
|
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
|
|
logging.Info("starting to copy unique constraints")
|
|
start := time.Now()
|
|
reader, writer := io.Pipe()
|
|
errs := make(chan error, 1)
|
|
|
|
sourceConn, err := source.Conn(ctx)
|
|
logging.OnError(err).Fatal("unable to acquire source connection")
|
|
|
|
go func() {
|
|
err := sourceConn.Raw(func(driverConn interface{}) error {
|
|
conn := driverConn.(*stdlib.Conn).Conn()
|
|
var stmt database.Statement
|
|
stmt.WriteString("COPY (SELECT instance_id, unique_type, unique_field FROM eventstore.unique_constraints ")
|
|
stmt.WriteString(instanceClause())
|
|
stmt.WriteString(") TO stdout")
|
|
|
|
_, err := conn.PgConn().CopyTo(ctx, writer, stmt.String())
|
|
writer.Close()
|
|
return err
|
|
})
|
|
errs <- err
|
|
}()
|
|
|
|
destConn, err := dest.Conn(ctx)
|
|
logging.OnError(err).Fatal("unable to acquire dest connection")
|
|
|
|
var eventCount int64
|
|
err = destConn.Raw(func(driverConn interface{}) error {
|
|
conn := driverConn.(*stdlib.Conn).Conn()
|
|
|
|
if shouldReplace {
|
|
var stmt database.Statement
|
|
stmt.WriteString("DELETE FROM eventstore.unique_constraints ")
|
|
stmt.WriteString(instanceClause())
|
|
|
|
_, err := conn.Exec(ctx, stmt.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.unique_constraints FROM stdin")
|
|
eventCount = tag.RowsAffected()
|
|
|
|
return err
|
|
})
|
|
logging.OnError(err).Fatal("unable to copy unique constraints to destination")
|
|
logging.OnError(<-errs).Fatal("unable to copy unique constraints from source")
|
|
logging.WithFields("took", time.Since(start), "count", eventCount).Info("unique constraints migrated")
|
|
}
|