fix(mirror): ensure multiple runs (#9899)

# Which Problems Are Solved

1. After second execution, mirror starts to fail because of Primary key
constraints on the events table. Because mirror always took the the
first `system.mirror.succeeded` instead of the newest one
2. Mirror panicked during migration of fields tables

# How the Problems Are Solved

1. Adjusted the database query to order descending and limit 1
2. added missing assignment

# Additional Changes

- detailed logging if the copy from statement failed.
This commit is contained in:
Silvan
2025-05-19 11:30:11 +02:00
committed by GitHub
parent e8cefe07a9
commit 59e57af52b
3 changed files with 11 additions and 1 deletions

View File

@@ -8,6 +8,7 @@ import (
"io"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/spf13/cobra"
@@ -173,6 +174,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN")
eventCount = tag.RowsAffected()
if err != nil {
pgErr := new(pgconn.PgError)
errors.As(err, &pgErr)
logging.WithError(err).WithField("pg_err_details", pgErr.Detail).Error("unable to copy events into destination")
return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination")
}

View File

@@ -122,7 +122,11 @@ func projections(
config.Eventstore.Querier = old_es.NewCRDB(client)
esPusherDBClient, err := database.Connect(config.Destination, false, dialect.DBPurposeEventPusher)
logging.OnError(err).Fatal("unable to connect eventstore push client")
config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient)
newES := new_es.NewEventstore(esPusherDBClient)
config.Eventstore.Pusher = newES
config.Eventstore.Searcher = newES
es := eventstore.NewEventstore(config.Eventstore)
esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{
MaxRetries: config.Eventstore.MaxRetries,

View File

@@ -36,6 +36,7 @@ func (p *LastSuccessfulMirror) Filter() *eventstore.Filter {
),
eventstore.FilterPagination(
eventstore.Descending(),
eventstore.Limit(1),
),
)
}