fix(eventstore): use decimal for position (#9881)

Float64 which was used for the event.Position field is [not precise in
go and gets rounded](https://github.com/golang/go/issues/47300). This
can lead to unprecies position tracking of events and therefore
projections especially on cockcoachdb as the position used there is a
big number.

example of a unprecies position:
exact: 1725257931223002628
float64: 1725257931223002624.000000

The float64 was replaced by
[github.com/jackc/pgx-shopspring-decimal](https://github.com/jackc/pgx-shopspring-decimal).

Rename `latestSequence`-queries to `latestPosition`

closes https://github.com/zitadel/zitadel/issues/8863
This commit is contained in:
Silvan
2025-05-14 12:14:08 +02:00
committed by adlerhurst
parent ed7eee8a77
commit e14639c0ad
49 changed files with 366 additions and 236 deletions

View File

@@ -3,6 +3,8 @@ package mirror
import (
"context"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/projection"
"github.com/zitadel/zitadel/internal/v2/readmodel"
@@ -30,12 +32,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
return lastSuccess, nil
}
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) {
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) {
var cmd *eventstore.Command
if len(instanceIDs) > 0 {
cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
} else {
cmd = mirror_event.NewStartedSystemCommand(destination)
@@ -58,12 +60,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i
),
)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
return position.Position, nil
}
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error {
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error {
return destinationES.Push(
ctx,
eventstore.NewPushIntent(

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
@@ -97,7 +98,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
nextPos := make(chan bool, 1)
pos := make(chan float64, 1)
pos := make(chan decimal.Decimal, 1)
errs := make(chan error, 3)
go func() {
@@ -148,7 +149,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
go func() {
defer close(pos)
for range nextPos {
var position float64
var position decimal.Decimal
err := dest.QueryRowContext(
ctx,
func(row *sql.Row) error {
@@ -183,7 +184,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
}
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) {
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) {
joinedErrs := make([]error, 0, len(errs))
for err := range errs {
joinedErrs = append(joinedErrs, err)