fix(mirror): use correct statements on push (#8414)

# Which Problems Are Solved

The mirror command used the wrong position to filter for events if
different database technologies for source and destination were used.

# How the Problems Are Solved

The statements which diverge are stored on the client so that different
technologies can use different statements.

# Additional Context

- https://discord.com/channels/927474939156643850/1256396896243552347
This commit is contained in:
Silvan
2024-08-12 12:33:45 +02:00
committed by GitHub
parent 3f25e36fbd
commit cd3ffbd3eb
4 changed files with 18 additions and 14 deletions

View File

@@ -44,7 +44,7 @@ Migrate only copies events2 and unique constraints`,
} }
func copyEventstore(ctx context.Context, config *Migration) { func copyEventstore(ctx context.Context, config *Migration) {
sourceClient, err := db.Connect(config.Source, false, dialect.DBPurposeQuery) sourceClient, err := db.Connect(config.Source, false, dialect.DBPurposeEventPusher)
logging.OnError(err).Fatal("unable to connect to source database") logging.OnError(err).Fatal("unable to connect to source database")
defer sourceClient.Close() defer sourceClient.Close()

View File

@@ -71,7 +71,7 @@ func (s *Storage) Push(ctx context.Context, intent *eventstore.PushIntent) (err
return err return err
} }
return push(ctx, tx, intent, commands) return s.push(ctx, tx, intent, commands)
}) })
} }
@@ -144,7 +144,7 @@ func lockAggregates(ctx context.Context, tx *sql.Tx, intent *eventstore.PushInte
return res, nil return res, nil
} }
func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands []*command) (err error) { func (s *Storage) push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands []*command) (err error) {
ctx, span := tracing.NewSpan(ctx) ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }() defer func() { span.EndWithError(err) }()
@@ -171,7 +171,7 @@ func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands
cmd.position.InPositionOrder, cmd.position.InPositionOrder,
) )
stmt.WriteString(pushPositionStmt) stmt.WriteString(s.pushPositionStmt)
stmt.WriteString(`)`) stmt.WriteString(`)`)
} }
stmt.WriteString(` RETURNING created_at, "position"`) stmt.WriteString(` RETURNING created_at, "position"`)

View File

@@ -1297,7 +1297,10 @@ func Test_push(t *testing.T) {
t.Errorf("unexpected error in begin: %v", err) t.Errorf("unexpected error in begin: %v", err)
t.FailNow() t.FailNow()
} }
err = push(context.Background(), tx, tt.args.reducer, tt.args.commands) s := Storage{
pushPositionStmt: initPushStmt("postgres"),
}
err = s.push(context.Background(), tx, tt.args.reducer, tt.args.commands)
tt.want.assertErr(t, err) tt.want.assertErr(t, err)
dbMock.Assert(t) dbMock.Assert(t)
if tt.args.reducer != nil { if tt.args.reducer != nil {

View File

@@ -12,13 +12,12 @@ import (
var ( var (
_ eventstore.Pusher = (*Storage)(nil) _ eventstore.Pusher = (*Storage)(nil)
_ eventstore.Querier = (*Storage)(nil) _ eventstore.Querier = (*Storage)(nil)
pushPositionStmt string
) )
type Storage struct { type Storage struct {
client *database.DB client *database.DB
config *Config config *Config
pushPositionStmt string
} }
type Config struct { type Config struct {
@@ -30,17 +29,19 @@ func New(client *database.DB, config *Config) *Storage {
return &Storage{ return &Storage{
client: client, client: client,
config: config, config: config,
pushPositionStmt: initPushStmt(client.Type()),
} }
} }
func initPushStmt(typ string) { func initPushStmt(typ string) string {
switch typ { switch typ {
case "cockroach": case "cockroach":
pushPositionStmt = ", hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp()" return ", hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp()"
case "postgres": case "postgres":
pushPositionStmt = ", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())" return ", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())"
default: default:
logging.WithFields("database_type", typ).Panic("position statement for type not implemented") logging.WithFields("database_type", typ).Panic("position statement for type not implemented")
return ""
} }
} }