fix(eventstore): use decimal for position (#9881)

# Which Problems Are Solved

Float64 which was used for the event.Position field is [not precise in
go and gets rounded](https://github.com/golang/go/issues/47300). This
can lead to unprecies position tracking of events and therefore
projections especially on cockcoachdb as the position used there is a
big number.

example of a unprecies position:
exact: 1725257931223002628
float64: 1725257931223002624.000000

# How the Problems Are Solved

The float64 was replaced by
[github.com/jackc/pgx-shopspring-decimal](https://github.com/jackc/pgx-shopspring-decimal).

# Additional Changes

Rename `latestSequence`-queries to `latestPosition`

# Additional Context

closes https://github.com/zitadel/zitadel/issues/8863
This commit is contained in:
Silvan
2025-05-14 12:14:08 +02:00
committed by GitHub
parent 5331841675
commit dafde7468d
47 changed files with 338 additions and 236 deletions

View File

@@ -3,6 +3,8 @@ package mirror
import (
"context"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/projection"
"github.com/zitadel/zitadel/internal/v2/readmodel"
@@ -30,12 +32,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
return lastSuccess, nil
}
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) {
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) {
var cmd *eventstore.Command
if len(instanceIDs) > 0 {
cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
} else {
cmd = mirror_event.NewStartedSystemCommand(destination)
@@ -58,12 +60,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i
),
)
if err != nil {
return 0, err
return decimal.Decimal{}, err
}
return position.Position, nil
}
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error {
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error {
return destinationES.Push(
ctx,
eventstore.NewPushIntent(

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
@@ -98,7 +99,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
nextPos := make(chan bool, 1)
pos := make(chan float64, 1)
pos := make(chan decimal.Decimal, 1)
errs := make(chan error, 3)
go func() {
@@ -149,7 +150,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
go func() {
defer close(pos)
for range nextPos {
var position float64
var position decimal.Decimal
err := dest.QueryRowContext(
ctx,
func(row *sql.Row) error {
@@ -184,7 +185,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
}
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) {
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) {
joinedErrs := make([]error, 0, len(errs))
for err := range errs {
joinedErrs = append(joinedErrs, err)

14
go.mod
View File

@@ -39,7 +39,8 @@ require (
github.com/h2non/gock v1.2.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/improbable-eng/grpc-web v0.15.0
github.com/jackc/pgx/v5 v5.7.0
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e
github.com/jackc/pgx/v5 v5.7.4
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52
github.com/jinzhu/gorm v1.9.16
github.com/k3a/html2text v1.2.1
@@ -56,6 +57,7 @@ require (
github.com/redis/go-redis/v9 v9.7.0
github.com/rs/cors v1.11.1
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/shopspring/decimal v1.4.0
github.com/sony/sonyflake v1.2.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
@@ -79,12 +81,12 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.29.0
go.opentelemetry.io/otel/trace v1.29.0
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
golang.org/x/net v0.28.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.19.0
golang.org/x/sync v0.10.0
golang.org/x/text v0.21.0
google.golang.org/api v0.187.0
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd
google.golang.org/grpc v1.65.0
@@ -113,7 +115,7 @@ require (
github.com/google/go-tpm v0.9.0 // indirect
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect
@@ -204,7 +206,7 @@ require (
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/sys v0.25.0
golang.org/x/sys v0.28.0
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

28
go.sum
View File

@@ -412,10 +412,12 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.0 h1:FG6VLIdzvAPhnYqP14sQ2xhFLkiUQHCs6ySqO91kF4g=
github.com/jackc/pgx/v5 v5.7.0/go.mod h1:awP1KNnjylvpxHuHP63gzjhnGkI1iw+PMoIwvoleN/8=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e h1:i3gQ/Zo7sk4LUVbsAjTNeC4gIjoPNIZVzs4EXstssV4=
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e/go.mod h1:zUHglCZ4mpDUPgIwqEKoba6+tcUQzRdb1+DPTuYe9pI=
github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg=
github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ=
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52/go.mod h1:RDZ+4PR3mDOtTpVbI0qBE+rdhmtIrtbssiNn38/1OWA=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
@@ -657,6 +659,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -807,8 +811,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
@@ -888,8 +892,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -932,8 +936,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -948,8 +952,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=

View File

@@ -11,6 +11,7 @@ import (
"github.com/go-jose/go-jose/v4"
"github.com/jonboulle/clockwork"
"github.com/muhlemmer/gu"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/oidc/v3/pkg/op"
@@ -350,14 +351,14 @@ func (o *OPStorage) getSigningKey(ctx context.Context) (op.SigningKey, error) {
if len(keys.Keys) > 0 {
return PrivateKeyToSigningKey(SelectSigningKey(keys.Keys), o.encAlg)
}
var position float64
var position decimal.Decimal
if keys.State != nil {
position = keys.State.Position
}
return nil, o.refreshSigningKey(ctx, o.signingKeyAlgorithm, position)
}
func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, position float64) error {
func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, position decimal.Decimal) error {
ok, err := o.ensureIsLatestKey(ctx, position)
if err != nil || !ok {
return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date")
@@ -369,12 +370,12 @@ func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, pos
return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "")
}
func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position float64) (bool, error) {
maxSequence, err := o.getMaxKeySequence(ctx)
func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position decimal.Decimal) (bool, error) {
maxSequence, err := o.getMaxKeyPosition(ctx)
if err != nil {
return false, fmt.Errorf("error retrieving new events: %w", err)
}
return position >= maxSequence, nil
return position.GreaterThanOrEqual(maxSequence), nil
}
func PrivateKeyToSigningKey(key query.PrivateKey, algorithm crypto.EncryptionAlgorithm) (_ op.SigningKey, err error) {
@@ -412,9 +413,9 @@ func (o *OPStorage) lockAndGenerateSigningKeyPair(ctx context.Context, algorithm
return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), algorithm)
}
func (o *OPStorage) getMaxKeySequence(ctx context.Context) (float64, error) {
return o.eventstore.LatestSequence(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
func (o *OPStorage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
return o.eventstore.LatestPosition(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
ResourceOwner(authz.GetInstance(ctx).InstanceID()).
AwaitOpenTransactions().
AllowTimeTravel().

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/go-jose/go-jose/v4"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/saml/pkg/provider/key"
@@ -76,7 +77,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates))
}
var position float64
var position decimal.Decimal
if certs.State != nil {
position = certs.State.Position
}
@@ -87,7 +88,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
func (p *Storage) refreshCertificate(
ctx context.Context,
usage crypto.KeyUsage,
position float64,
position decimal.Decimal,
) error {
ok, err := p.ensureIsLatestCertificate(ctx, position)
if err != nil {
@@ -103,12 +104,12 @@ func (p *Storage) refreshCertificate(
return nil
}
func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position float64) (bool, error) {
maxSequence, err := p.getMaxKeySequence(ctx)
func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position decimal.Decimal) (bool, error) {
maxSequence, err := p.getMaxKeyPosition(ctx)
if err != nil {
return false, fmt.Errorf("error retrieving new events: %w", err)
}
return position >= maxSequence, nil
return position.GreaterThanOrEqual(maxSequence), nil
}
func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error {
@@ -151,9 +152,9 @@ func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage cr
}
}
func (p *Storage) getMaxKeySequence(ctx context.Context) (float64, error) {
return p.eventstore.LatestSequence(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
func (p *Storage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
return p.eventstore.LatestPosition(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
ResourceOwner(authz.GetInstance(ctx).InstanceID()).
AwaitOpenTransactions().
AddQuery().

View File

@@ -7,6 +7,8 @@ import (
"strings"
"time"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/mitchellh/mapstructure"
@@ -82,6 +84,11 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo
return nil, nil, err
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
if connConfig.MaxOpenConns != 0 {
config.MaxConns = int32(connConfig.MaxOpenConns)
}

View File

@@ -7,6 +7,8 @@ import (
"strings"
"time"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/mitchellh/mapstructure"
@@ -82,6 +84,10 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo
if err != nil {
return nil, nil, err
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
if connConfig.MaxOpenConns != 0 {
config.MaxConns = int32(connConfig.MaxOpenConns)

View File

@@ -5,6 +5,8 @@ import (
"reflect"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -44,7 +46,7 @@ type Event interface {
// CreatedAt is the time the event was created at
CreatedAt() time.Time
// Position is the global position of the event
Position() float64
Position() decimal.Decimal
// Unmarshal parses the payload and stores the result
// in the value pointed to by ptr. If ptr is nil or not a pointer,

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
)
@@ -21,7 +23,7 @@ type BaseEvent struct {
Agg *Aggregate
Seq uint64
Pos float64
Pos decimal.Decimal
Creation time.Time
previousAggregateSequence uint64
previousAggregateTypeSequence uint64
@@ -34,7 +36,7 @@ type BaseEvent struct {
}
// Position implements Event.
func (e *BaseEvent) Position() float64 {
func (e *BaseEvent) Position() decimal.Decimal {
return e.Pos
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
@@ -218,11 +219,11 @@ func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQu
})
}
// LatestSequence filters the latest sequence for the given search query
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
// LatestPosition filters the latest position for the given search query
func (es *Eventstore) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) {
queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID())
return es.querier.LatestSequence(ctx, queryFactory)
return es.querier.LatestPosition(ctx, queryFactory)
}
// InstanceIDs returns the instance ids found by the search query
@@ -271,8 +272,8 @@ type Querier interface {
Health(ctx context.Context) error
// FilterToReducer calls r for every event returned from the storage
FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error
// LatestSequence returns the latest sequence found by the search query
LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error)
// LatestPosition returns the latest position found by the search query
LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error)
// InstanceIDs returns the instance ids found by the search query
InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error)
// Client returns the underlying database connection

View File

@@ -4,6 +4,8 @@ import (
"context"
"testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
)
@@ -98,7 +100,7 @@ func TestCRDB_Filter(t *testing.T) {
}
}
func TestCRDB_LatestSequence(t *testing.T) {
func TestCRDB_LatestPosition(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
@@ -106,7 +108,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
existingEvents []eventstore.Command
}
type res struct {
sequence float64
position decimal.Decimal
}
tests := []struct {
name string
@@ -118,7 +120,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
{
name: "aggregate type filter no sequence",
args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
AddQuery().
AggregateTypes("not found").
Builder(),
@@ -135,7 +137,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
{
name: "aggregate type filter sequence",
args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
AddQuery().
AggregateTypes(eventstore.AggregateType(t.Name())).
Builder(),
@@ -169,12 +171,12 @@ func TestCRDB_LatestSequence(t *testing.T) {
return
}
sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery)
position, err := db.LatestPosition(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.res.sequence > sequence {
t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.sequence, sequence)
if tt.res.position.GreaterThan(position) {
t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.position, position)
}
})
}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
@@ -391,7 +392,7 @@ func (repo *testPusher) Push(ctx context.Context, commands ...Command) (events [
type testQuerier struct {
events []Event
sequence float64
sequence decimal.Decimal
instances []string
err error
t *testing.T
@@ -424,9 +425,9 @@ func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *Searc
return nil
}
func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
func (repo *testQuerier) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) {
if repo.err != nil {
return 0, repo.err
return decimal.Decimal{}, repo.err
}
return repo.sequence, nil
}
@@ -1060,7 +1061,7 @@ func TestEventstore_FilterEvents(t *testing.T) {
}
}
func TestEventstore_LatestSequence(t *testing.T) {
func TestEventstore_LatestPosition(t *testing.T) {
type args struct {
query *SearchQueryBuilder
}
@@ -1080,7 +1081,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "no events",
args: args{
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
columns: ColumnsMaxPosition,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
@@ -1103,7 +1104,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "repo error",
args: args{
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
columns: ColumnsMaxPosition,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
@@ -1126,7 +1127,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "found events",
args: args{
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
columns: ColumnsMaxPosition,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
@@ -1152,7 +1153,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
querier: tt.fields.repo,
}
_, err := es.LatestSequence(context.Background(), tt.args.query)
_, err := es.LatestPosition(context.Background(), tt.args.query)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
)
@@ -123,11 +124,11 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
return additionalIteration, err
}
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil
}
if config.minPosition > 0 {
if config.minPosition.GreaterThan(decimal.NewFromInt(0)) {
currentState.position = config.minPosition
currentState.offset = 0
}
@@ -161,7 +162,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
idx, offset := skipPreviouslyReducedEvents(events, currentState)
if currentState.position == events[len(events)-1].Position() {
if currentState.position.Equal(events[len(events)-1].Position()) {
offset += currentState.offset
}
currentState.position = events[len(events)-1].Position()
@@ -191,9 +192,9 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
}
func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) {
var position float64
var position decimal.Decimal
for i, event := range events {
if event.Position() != position {
if !event.Position().Equal(position) {
offset = 0
position = event.Position()
}

View File

@@ -4,13 +4,13 @@ import (
"context"
"database/sql"
"errors"
"math"
"math/rand"
"slices"
"sync"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
@@ -380,8 +380,8 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) {
type triggerConfig struct {
awaitRunning bool
maxPosition float64
minPosition float64
maxPosition decimal.Decimal
minPosition decimal.Decimal
}
type TriggerOpt func(conf *triggerConfig)
@@ -392,13 +392,13 @@ func WithAwaitRunning() TriggerOpt {
}
}
func WithMaxPosition(position float64) TriggerOpt {
func WithMaxPosition(position decimal.Decimal) TriggerOpt {
return func(conf *triggerConfig) {
conf.maxPosition = position
}
}
func WithMinPosition(position float64) TriggerOpt {
func WithMinPosition(position decimal.Decimal) TriggerOpt {
return func(conf *triggerConfig) {
conf.minPosition = position
}
@@ -510,11 +510,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
return additionalIteration, err
}
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil
}
if config.minPosition > 0 {
if config.minPosition.GreaterThan(decimal.NewFromInt(0)) {
currentState.position = config.minPosition
currentState.offset = 0
}
@@ -602,7 +602,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int {
for i, statement := range statements {
if statement.Position == currentState.position &&
if statement.Position.Equal(currentState.position) &&
statement.Aggregate.ID == currentState.aggregateID &&
statement.Aggregate.Type == currentState.aggregateType &&
statement.Sequence == currentState.sequence {
@@ -666,9 +666,8 @@ func (h *Handler) EventQuery(currentState *state) *eventstore.SearchQueryBuilder
OrderAsc().
InstanceID(currentState.instanceID)
if currentState.position > 0 {
// decrease position by 10 because builder.PositionAfter filters for position > and we need position >=
builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10))
if currentState.position.GreaterThan(decimal.Decimal{}) {
builder = builder.PositionAtLeast(currentState.position)
if currentState.offset > 0 {
builder = builder.Offset(currentState.offset)
}

View File

@@ -7,6 +7,8 @@ import (
"errors"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -14,7 +16,7 @@ import (
type state struct {
instanceID string
position float64
position decimal.Decimal
eventTimestamp time.Time
aggregateType eventstore.AggregateType
aggregateID string
@@ -45,7 +47,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
aggregateType = new(sql.NullString)
sequence = new(sql.NullInt64)
timestamp = new(sql.NullTime)
position = new(sql.NullFloat64)
position = new(decimal.NullDecimal)
offset = new(sql.NullInt64)
)
@@ -75,7 +77,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
currentState.aggregateType = eventstore.AggregateType(aggregateType.String)
currentState.sequence = uint64(sequence.Int64)
currentState.eventTimestamp = timestamp.Time
currentState.position = position.Float64
currentState.position = position.Decimal
// psql does not provide unsigned numbers so we work around it
currentState.offset = uint32(offset.Int64)
return currentState, nil

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database/mock"
@@ -166,7 +167,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{
instanceID: "instance",
eventTimestamp: time.Now(),
position: 42,
position: decimal.NewFromInt(42),
},
},
isErr: func(t *testing.T, err error) {
@@ -192,7 +193,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{
instanceID: "instance",
eventTimestamp: time.Now(),
position: 42,
position: decimal.NewFromInt(42),
},
},
isErr: func(t *testing.T, err error) {
@@ -217,7 +218,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
eventstore.AggregateType("aggregate type"),
uint64(42),
mock.AnyType[time.Time]{},
float64(42),
decimal.NewFromInt(42),
uint32(0),
),
mock.WithExecRowsAffected(1),
@@ -228,7 +229,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{
instanceID: "instance",
eventTimestamp: time.Now(),
position: 42,
position: decimal.NewFromInt(42),
aggregateType: "aggregate type",
aggregateID: "aggregate id",
sequence: 42,
@@ -397,7 +398,7 @@ func TestHandler_currentState(t *testing.T) {
"aggregate type",
int64(42),
testTime,
float64(42),
decimal.NewFromInt(42).String(),
uint16(10),
},
},
@@ -412,7 +413,7 @@ func TestHandler_currentState(t *testing.T) {
currentState: &state{
instanceID: "instance",
eventTimestamp: testTime,
position: 42,
position: decimal.NewFromInt(42),
aggregateType: "aggregate type",
aggregateID: "aggregate id",
sequence: 42,

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"golang.org/x/exp/constraints"
@@ -82,7 +83,7 @@ func (h *Handler) reduce(event eventstore.Event) (*Statement, error) {
type Statement struct {
Aggregate *eventstore.Aggregate
Sequence uint64
Position float64
Position decimal.Decimal
CreationDate time.Time
offset uint32

View File

@@ -2,13 +2,16 @@ package eventstore_test
import (
"context"
"database/sql"
"encoding/json"
"os"
"testing"
"time"
"github.com/cockroachdb/cockroach-go/v2/testserver"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/cmd/initialise"
@@ -39,10 +42,19 @@ func TestMain(m *testing.M) {
testCRDBClient = &database.DB{
Database: new(testDB),
}
testCRDBClient.DB, err = sql.Open("postgres", ts.PGURL().String())
config, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil {
logging.WithFields("error", err).Fatal("unable to connect to db")
logging.WithFields("error", err).Fatal("unable to parse db config")
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool")
}
testCRDBClient.DB = stdlib.OpenDBFromPool(pool)
if err = testCRDBClient.Ping(); err != nil {
logging.WithFields("error", err).Fatal("unable to ping db")
}
@@ -103,10 +115,19 @@ func initDB(db *database.DB) error {
}
func connectLocalhost() (*database.DB, error) {
client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
config, err := pgxpool.ParseConfig("postgresql://root@localhost:26257/defaultdb?sslmode=disable")
if err != nil {
return nil, err
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
return nil, err
}
client := stdlib.OpenDBFromPool(pool)
if err = client.Ping(); err != nil {
return nil, err
}

View File

@@ -1,19 +1,23 @@
package eventstore
import "time"
import (
"time"
"github.com/shopspring/decimal"
)
// ReadModel is the minimum representation of a read model.
// It implements a basic reducer
// it might be saved in a database or in memory
type ReadModel struct {
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []Event `json:"-"`
ResourceOwner string `json:"-"`
InstanceID string `json:"-"`
Position float64 `json:"-"`
AggregateID string `json:"-"`
ProcessedSequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []Event `json:"-"`
ResourceOwner string `json:"-"`
InstanceID string `json:"-"`
Position decimal.Decimal `json:"-"`
}
// AppendEvents adds all the events to the read model.

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
)
@@ -18,7 +20,7 @@ type Event struct {
// Seq is the sequence of the event
Seq uint64
// Pos is the global sequence of the event multiple events can have the same sequence
Pos float64
Pos decimal.Decimal
//CreationDate is the time the event is created
// it's used for human readability.
@@ -91,7 +93,7 @@ func (e *Event) Sequence() uint64 {
}
// Position implements [eventstore.Event]
func (e *Event) Position() float64 {
func (e *Event) Position() decimal.Decimal {
return e.Pos
}

View File

@@ -13,6 +13,7 @@ import (
context "context"
reflect "reflect"
decimal "github.com/shopspring/decimal"
database "github.com/zitadel/zitadel/internal/database"
eventstore "github.com/zitadel/zitadel/internal/eventstore"
gomock "go.uber.org/mock/gomock"
@@ -98,19 +99,19 @@ func (mr *MockQuerierMockRecorder) InstanceIDs(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1)
}
// LatestSequence mocks base method.
func (m *MockQuerier) LatestSequence(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (float64, error) {
// LatestPosition mocks base method.
func (m *MockQuerier) LatestPosition(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1)
ret0, _ := ret[0].(float64)
ret := m.ctrl.Call(m, "LatestPosition", arg0, arg1)
ret0, _ := ret[0].(decimal.Decimal)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// LatestSequence indicates an expected call of LatestSequence.
func (mr *MockQuerierMockRecorder) LatestSequence(arg0, arg1 any) *gomock.Call {
// LatestPosition indicates an expected call of LatestPosition.
func (mr *MockQuerierMockRecorder) LatestPosition(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockQuerier)(nil).LatestSequence), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestPosition", reflect.TypeOf((*MockQuerier)(nil).LatestPosition), arg0, arg1)
}
// MockPusher is a mock of Pusher interface.

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
@@ -186,8 +187,8 @@ func (e *mockEvent) Sequence() uint64 {
return e.sequence
}
func (e *mockEvent) Position() float64 {
return 0
func (e *mockEvent) Position() decimal.Decimal {
return decimal.Decimal{}
}
func (e *mockEvent) CreatedAt() time.Time {

View File

@@ -3,6 +3,8 @@ package repository
import (
"database/sql"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -55,6 +57,8 @@ const (
//OperationNotIn checks if a stored value does not match one of the passed value list
OperationNotIn
OperationGreaterOrEquals
operationCount
)
@@ -233,10 +237,10 @@ func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuer
}
func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetPositionAfter() == 0 {
if builder.GetPositionAtLeast().IsZero() {
return nil
}
query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater)
query.Position = NewFilter(FieldPosition, builder.GetPositionAtLeast(), OperationGreaterOrEquals)
return query.Position
}
@@ -278,7 +282,7 @@ func eventDataFilter(query *eventstore.SearchQuery) *Filter {
}
func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter {
if pos := query.GetPositionAfter(); pos != 0 {
if pos := query.GetPositionAfter(); !pos.Equal(decimal.Decimal{}) {
return NewFilter(FieldPosition, pos, OperationGreater)
}
return nil

View File

@@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
@@ -264,11 +265,11 @@ func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.S
return err
}
// LatestSequence returns the latest sequence found by the search query
func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) {
var position sql.NullFloat64
// LatestPosition returns the latest position found by the search query
func (db *CRDB) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
var position decimal.Decimal
err := query(ctx, db, searchQuery, &position, false)
return position.Float64, err
return position, err
}
// InstanceIDs returns the instance ids found by the search query
@@ -335,7 +336,7 @@ func (db *CRDB) eventQuery(useV1 bool) string {
" FROM eventstore.events2"
}
func (db *CRDB) maxSequenceQuery(useV1 bool) string {
func (db *CRDB) maxPositionQuery(useV1 bool) string {
if useV1 {
return `SELECT event_sequence FROM eventstore.events`
}
@@ -413,6 +414,8 @@ func (db *CRDB) operation(operation repository.Operation) string {
return "="
case repository.OperationGreater:
return ">"
case repository.OperationGreaterOrEquals:
return ">="
case repository.OperationLess:
return "<"
case repository.OperationJSONContains:

View File

@@ -4,6 +4,8 @@ import (
"database/sql"
"testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -312,7 +314,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev
ResourceOwner: sql.NullString{String: "ro", Valid: true},
Typ: "test.created",
Version: "v1",
Pos: 42,
Pos: decimal.NewFromInt(42),
}
for _, opt := range opts {

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
@@ -25,7 +26,7 @@ type querier interface {
conditionFormat(repository.Operation) string
placeholder(query string) string
eventQuery(useV1 bool) string
maxSequenceQuery(useV1 bool) string
maxPositionQuery(useV1 bool) string
instanceIDsQuery(useV1 bool) string
Client() *database.DB
orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string
@@ -74,7 +75,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
// instead of using the max function of the database (which doesn't work for postgres)
// we select the most recent row
if q.Columns == eventstore.ColumnsMaxSequence {
if q.Columns == eventstore.ColumnsMaxPosition {
q.Limit = 1
q.Desc = true
}
@@ -91,7 +92,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
switch q.Columns {
case eventstore.ColumnsEvent,
eventstore.ColumnsMaxSequence:
eventstore.ColumnsMaxPosition:
query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1)
}
@@ -135,8 +136,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) {
switch columns {
case eventstore.ColumnsMaxSequence:
return criteria.maxSequenceQuery(useV1), maxSequenceScanner
case eventstore.ColumnsMaxPosition:
return criteria.maxPositionQuery(useV1), maxPositionScanner
case eventstore.ColumnsInstanceIDs:
return criteria.instanceIDsQuery(useV1), instanceIDsScanner
case eventstore.ColumnsEvent:
@@ -154,13 +155,15 @@ func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string
return criteria.Timetravel(took)
}
func maxSequenceScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*sql.NullFloat64)
func maxPositionScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*decimal.Decimal)
if !ok {
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be decimal.Decimal got: %T", dest)
}
err = row(position)
var res decimal.NullDecimal
err = row(&res)
if err == nil || errors.Is(err, sql.ErrNoRows) {
*position = res.Decimal
return nil
}
return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong")
@@ -189,7 +192,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest)
}
event := new(repository.Event)
position := new(sql.NullFloat64)
position := new(decimal.NullDecimal)
if useV1 {
err = scanner(
@@ -226,7 +229,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
logging.New().WithError(err).Warn("unable to scan row")
return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
}
event.Pos = position.Float64
event.Pos = position.Decimal
return reduce(event)
}
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/database"
@@ -109,36 +110,36 @@ func Test_prepareColumns(t *testing.T) {
{
name: "max column",
args: args{
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
useV1: true,
},
res: res{
query: `SELECT event_sequence FROM eventstore.events`,
expected: sql.NullFloat64{Float64: 43, Valid: true},
expected: decimal.NewFromInt(42),
},
fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
},
},
{
name: "max column v2",
args: args{
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
},
res: res{
query: `SELECT "position" FROM eventstore.events2`,
expected: sql.NullFloat64{Float64: 43, Valid: true},
expected: decimal.NewFromInt(42),
},
fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
},
},
{
name: "max sequence wrong dest type",
args: args{
columns: eventstore.ColumnsMaxSequence,
columns: eventstore.ColumnsMaxPosition,
dest: new(uint64),
},
res: res{
@@ -178,11 +179,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{
@@ -197,11 +198,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{

View File

@@ -5,6 +5,8 @@ import (
"database/sql"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -23,7 +25,7 @@ type SearchQueryBuilder struct {
queries []*SearchQuery
tx *sql.Tx
allowTimeTravel bool
positionAfter float64
positionAtLeast decimal.Decimal
awaitOpenTransactions bool
creationDateAfter time.Time
creationDateBefore time.Time
@@ -74,8 +76,8 @@ func (b *SearchQueryBuilder) GetAllowTimeTravel() bool {
return b.allowTimeTravel
}
func (b SearchQueryBuilder) GetPositionAfter() float64 {
return b.positionAfter
func (b SearchQueryBuilder) GetPositionAtLeast() decimal.Decimal {
return b.positionAtLeast
}
func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool {
@@ -107,7 +109,7 @@ type SearchQuery struct {
aggregateIDs []string
eventTypes []EventType
eventData map[string]interface{}
positionAfter float64
positionAfter decimal.Decimal
}
func (q SearchQuery) GetAggregateTypes() []AggregateType {
@@ -126,7 +128,7 @@ func (q SearchQuery) GetEventData() map[string]interface{} {
return q.eventData
}
func (q SearchQuery) GetPositionAfter() float64 {
func (q SearchQuery) GetPositionAfter() decimal.Decimal {
return q.positionAfter
}
@@ -136,8 +138,8 @@ type Columns int8
const (
//ColumnsEvent represents all fields of an event
ColumnsEvent = iota + 1
// ColumnsMaxSequence represents the latest sequence of the filtered events
ColumnsMaxSequence
// ColumnsMaxPosition represents the latest sequence of the filtered events
ColumnsMaxPosition
// ColumnsInstanceIDs represents the instance ids of the filtered events
ColumnsInstanceIDs
@@ -271,9 +273,9 @@ func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder {
return builder
}
// PositionAfter filters for events which happened after the specified time
func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder {
builder.positionAfter = position
// PositionAtLeast filters for events which happened after the specified time
func (builder *SearchQueryBuilder) PositionAtLeast(position decimal.Decimal) *SearchQueryBuilder {
builder.positionAtLeast = position
return builder
}
@@ -349,7 +351,7 @@ func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery {
return query
}
func (query *SearchQuery) PositionAfter(position float64) *SearchQuery {
func (query *SearchQuery) PositionAfter(position decimal.Decimal) *SearchQuery {
query.positionAfter = position
return query
}

View File

@@ -116,10 +116,10 @@ func TestSearchQuerybuilderSetters(t *testing.T) {
{
name: "set columns",
args: args{
setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxSequence)},
setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxPosition)},
},
res: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
columns: ColumnsMaxPosition,
},
},
{

View File

@@ -5,6 +5,8 @@ import (
"reflect"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -20,7 +22,7 @@ var _ eventstore.Event = (*Event)(nil)
type Event struct {
ID string
Seq uint64
Pos float64
Pos decimal.Decimal
CreationDate time.Time
Typ eventstore.EventType
PreviousSequence uint64
@@ -80,7 +82,7 @@ func (e *Event) Sequence() uint64 {
}
// Position implements [eventstore.Event]
func (e *Event) Position() float64 {
func (e *Event) Position() decimal.Decimal {
return e.Pos
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -21,7 +22,7 @@ type event struct {
typ eventstore.EventType
createdAt time.Time
sequence uint64
position float64
position decimal.Decimal
payload Payload
}
@@ -84,8 +85,8 @@ func (e *event) Sequence() uint64 {
return e.sequence
}
// Sequence implements [eventstore.Event]
func (e *event) Position() float64 {
// Position implements [eventstore.Event]
func (e *event) Position() decimal.Decimal {
return e.position
}

View File

@@ -59,7 +59,7 @@ func Test_Call(t *testing.T) {
args{
ctx: context.Background(),
timeout: time.Second,
sleep: time.Second,
sleep: 2 * time.Second,
method: http.MethodPost,
body: []byte("{\"request\": \"values\"}"),
respBody: []byte("{\"response\": \"values\"}"),

View File

@@ -63,7 +63,7 @@ func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error {
if len(projections) == 0 {
return nil
}
position, err := es.LatestSequence(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1))
position, err := es.LatestPosition(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1))
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"strings"
"time"
"github.com/shopspring/decimal"
"golang.org/x/text/language"
"github.com/zitadel/zitadel/internal/domain"
@@ -140,7 +141,7 @@ func (q *Queries) accessTokenByOIDCSessionAndTokenID(ctx context.Context, oidcSe
// checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination)
// occurred after a certain time and will return an error if so.
func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position float64, fingerprintID string) (err error) {
func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position decimal.Decimal, fingerprintID string) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -165,7 +166,7 @@ func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID,
}
type sessionTerminatedModel struct {
position float64
position decimal.Decimal
sessionID string
userID string
fingerPrintID string

View File

@@ -10,6 +10,7 @@ import (
"time"
sq "github.com/Masterminds/squirrel"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
@@ -26,7 +27,7 @@ type Stateful interface {
type State struct {
LastRun time.Time
Position float64
Position decimal.Decimal
EventCreatedAt time.Time
AggregateID string
AggregateType eventstore.AggregateType
@@ -221,7 +222,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild
var (
creationDate sql.NullTime
lastUpdated sql.NullTime
position sql.NullFloat64
position decimal.NullDecimal
)
err := row.Scan(
&creationDate,
@@ -234,7 +235,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild
return &State{
EventCreatedAt: creationDate.Time,
LastRun: lastUpdated.Time,
Position: position.Float64,
Position: position.Decimal,
}, nil
}
}
@@ -259,7 +260,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec
var (
lastRun sql.NullTime
eventDate sql.NullTime
currentPosition sql.NullFloat64
currentPosition decimal.NullDecimal
aggregateType sql.NullString
aggregateID sql.NullString
sequence sql.NullInt64
@@ -280,7 +281,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec
}
currentState.State.EventCreatedAt = eventDate.Time
currentState.State.LastRun = lastRun.Time
currentState.Position = currentPosition.Float64
currentState.Position = currentPosition.Decimal
currentState.AggregateType = eventstore.AggregateType(aggregateType.String)
currentState.AggregateID = aggregateID.String
currentState.Sequence = uint64(sequence.Int64)

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"regexp"
"testing"
"github.com/shopspring/decimal"
)
var (
@@ -87,7 +89,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
State: State{
EventCreatedAt: testNow,
LastRun: testNow,
Position: 20211108,
Position: decimal.NewFromInt(20211108),
AggregateID: "agg-id",
AggregateType: "agg-type",
Sequence: 20211108,
@@ -134,7 +136,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
ProjectionName: "projection-name",
State: State{
EventCreatedAt: testNow,
Position: 20211108,
Position: decimal.NewFromInt(20211108),
LastRun: testNow,
AggregateID: "agg-id",
AggregateType: "agg-type",
@@ -145,7 +147,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
ProjectionName: "projection-name2",
State: State{
EventCreatedAt: testNow,
Position: 20211108,
Position: decimal.NewFromInt(20211108),
LastRun: testNow,
AggregateID: "agg-id",
AggregateType: "agg-type",

View File

@@ -281,7 +281,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh
return nil, zerrors.ThrowInternal(err, "QUERY-wXnQR", "Errors.Query.SQLStatement")
}
latestSequence, err := q.latestState(ctx, userGrantTable)
latestState, err := q.latestState(ctx, userGrantTable)
if err != nil {
return nil, err
}
@@ -294,7 +294,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh
return nil, err
}
grants.State = latestSequence
grants.State = latestState
return grants, nil
}

View File

@@ -144,7 +144,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer
if err != nil {
return nil, zerrors.ThrowInvalidArgument(err, "QUERY-T84X9", "Errors.Query.InvalidRequest")
}
latestSequence, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable)
latestState, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable)
if err != nil {
return nil, err
}
@@ -157,7 +157,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer
if err != nil {
return nil, err
}
memberships.State = latestSequence
memberships.State = latestState
return memberships, nil
}

View File

@@ -3,6 +3,7 @@ package database
import (
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"golang.org/x/exp/constraints"
)
@@ -94,7 +95,7 @@ func (c numberCompare) String() string {
}
type number interface {
constraints.Integer | constraints.Float | time.Time
constraints.Integer | constraints.Float | time.Time | decimal.Decimal
// TODO: condition must know if it's args are named parameters or not
// constraints.Integer | constraints.Float | time.Time | placeholder
}

View File

@@ -2,6 +2,8 @@ package eventstore
import (
"context"
"github.com/shopspring/decimal"
)
func NewEventstore(querier Querier, pusher Pusher) *EventStore {
@@ -30,12 +32,12 @@ type healthier interface {
}
type GlobalPosition struct {
Position float64
Position decimal.Decimal
InPositionOrder uint32
}
func (gp GlobalPosition) IsLess(other GlobalPosition) bool {
return gp.Position < other.Position || (gp.Position == other.Position && gp.InPositionOrder < other.InPositionOrder)
return gp.Position.LessThan(other.Position) || (gp.Position.Equal(other.Position) && gp.InPositionOrder < other.InPositionOrder)
}
type Reducer interface {

View File

@@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database/mock"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -818,7 +820,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
},
),
@@ -899,11 +901,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
{
time.Now(),
float64(123.1),
decimal.NewFromFloat(123.1).String(),
},
},
),
@@ -984,11 +986,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
{
time.Now(),
float64(123.1),
decimal.NewFromFloat(123.1).String(),
},
},
),
@@ -1044,7 +1046,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
},
),
@@ -1099,7 +1101,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
},
),
@@ -1181,11 +1183,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
{
time.Now(),
float64(123.1),
decimal.NewFromFloat(123.1).String(),
},
},
),
@@ -1272,11 +1274,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{
{
time.Now(),
float64(123),
decimal.NewFromFloat(123).String(),
},
{
time.Now(),
float64(123.1),
decimal.NewFromFloat(123.1).String(),
},
},
),

View File

@@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database"
"github.com/zitadel/zitadel/internal/v2/database/mock"
"github.com/zitadel/zitadel/internal/v2/eventstore"
@@ -541,13 +543,13 @@ func Test_writeFilter(t *testing.T) {
args: args{
filter: eventstore.NewFilter(
eventstore.FilterPagination(
eventstore.PositionGreater(123.4, 0),
eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0),
),
),
},
want: wantQuery{
query: " WHERE instance_id = $1 AND position > $2 ORDER BY position, in_tx_order",
args: []any{"i1", 123.4},
args: []any{"i1", decimal.NewFromFloat(123.4)},
},
},
{
@@ -555,18 +557,18 @@ func Test_writeFilter(t *testing.T) {
args: args{
filter: eventstore.NewFilter(
eventstore.FilterPagination(
// eventstore.PositionGreater(123.4, 0),
// eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0),
// eventstore.PositionLess(125.4, 10),
eventstore.PositionBetween(
&eventstore.GlobalPosition{Position: 123.4},
&eventstore.GlobalPosition{Position: 125.4, InPositionOrder: 10},
&eventstore.GlobalPosition{Position: decimal.NewFromFloat(123.4)},
&eventstore.GlobalPosition{Position: decimal.NewFromFloat(125.4), InPositionOrder: 10},
),
),
),
},
want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order < $3) OR position < $4) AND position > $5 ORDER BY position, in_tx_order",
args: []any{"i1", 125.4, uint32(10), 125.4, 123.4},
args: []any{"i1", decimal.NewFromFloat(125.4), uint32(10), decimal.NewFromFloat(125.4), decimal.NewFromFloat(123.4)},
// TODO: (adlerhurst) would require some refactoring to reuse existing args
// query: " WHERE instance_id = $1 AND position > $2 AND ((position = $3 AND in_tx_order < $4) OR position < $3) ORDER BY position, in_tx_order",
// args: []any{"i1", 123.4, 125.4, uint32(10)},
@@ -577,13 +579,13 @@ func Test_writeFilter(t *testing.T) {
args: args{
filter: eventstore.NewFilter(
eventstore.FilterPagination(
eventstore.PositionGreater(123.4, 12),
eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
),
),
},
want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order",
args: []any{"i1", 123.4, uint32(12), 123.4},
args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4)},
},
},
{
@@ -593,13 +595,13 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination(
eventstore.Limit(10),
eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12),
eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
),
),
},
want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order LIMIT $5 OFFSET $6",
args: []any{"i1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)},
args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
},
},
{
@@ -609,14 +611,14 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination(
eventstore.Limit(10),
eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12),
eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
),
eventstore.AppendAggregateFilter("user"),
),
},
want: wantQuery{
query: " WHERE instance_id = $1 AND aggregate_type = $2 AND ((position = $3 AND in_tx_order > $4) OR position > $5) ORDER BY position, in_tx_order LIMIT $6 OFFSET $7",
args: []any{"i1", "user", 123.4, uint32(12), 123.4, uint32(10), uint32(3)},
args: []any{"i1", "user", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
},
},
{
@@ -626,7 +628,7 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination(
eventstore.Limit(10),
eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12),
eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
),
eventstore.AppendAggregateFilter("user"),
eventstore.AppendAggregateFilter(
@@ -637,7 +639,7 @@ func Test_writeFilter(t *testing.T) {
},
want: wantQuery{
query: " WHERE instance_id = $1 AND (aggregate_type = $2 OR (aggregate_type = $3 AND aggregate_id = $4)) AND ((position = $5 AND in_tx_order > $6) OR position > $7) ORDER BY position, in_tx_order LIMIT $8 OFFSET $9",
args: []any{"i1", "user", "org", "o1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)},
args: []any{"i1", "user", "org", "o1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
},
},
}
@@ -956,7 +958,7 @@ func Test_writeQueryUse_examples(t *testing.T) {
),
eventstore.FilterPagination(
// used because we need to check for first login and an app which is not console
eventstore.PositionGreater(12, 4),
eventstore.PositionGreater(decimal.NewFromInt(12), 4),
),
),
eventstore.NewFilter(
@@ -1065,9 +1067,9 @@ func Test_writeQueryUse_examples(t *testing.T) {
"instance",
"user",
"user.token.added",
float64(12),
decimal.NewFromInt(12),
uint32(4),
float64(12),
decimal.NewFromInt(12),
"instance",
"instance",
[]string{"instance.idp.config.added", "instance.idp.oauth.added", "instance.idp.oidc.added", "instance.idp.jwt.added", "instance.idp.azure.added", "instance.idp.github.added", "instance.idp.github.enterprise.added", "instance.idp.gitlab.added", "instance.idp.gitlab.selfhosted.added", "instance.idp.google.added", "instance.idp.ldap.added", "instance.idp.config.apple.added", "instance.idp.saml.added"},
@@ -1201,7 +1203,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(23),
float64(123),
decimal.NewFromInt(123).String(),
uint32(0),
nil,
"gigi",
@@ -1235,7 +1237,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(23),
float64(123),
decimal.NewFromInt(123).String(),
uint32(0),
[]byte(`{"name": "gigi"}`),
"gigi",
@@ -1269,7 +1271,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(23),
float64(123),
decimal.NewFromInt(123).String(),
uint32(0),
nil,
"gigi",
@@ -1283,7 +1285,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(24),
float64(124),
decimal.NewFromInt(124).String(),
uint32(0),
[]byte(`{"name": "gigi"}`),
"gigi",
@@ -1317,7 +1319,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(23),
float64(123),
decimal.NewFromInt(123).String(),
uint32(0),
nil,
"gigi",
@@ -1331,7 +1333,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(),
"event.type",
uint32(24),
float64(124),
decimal.NewFromInt(124).String(),
uint32(0),
[]byte(`{"name": "gigi"}`),
"gigi",

View File

@@ -7,6 +7,8 @@ import (
"slices"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database"
)
@@ -723,7 +725,7 @@ func (pc *PositionCondition) Min() *GlobalPosition {
// PositionGreater prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func PositionGreater(position float64, inPositionOrder uint32) paginationOpt {
func PositionGreater(position decimal.Decimal, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePosition()
p.position.min = &GlobalPosition{
@@ -743,7 +745,7 @@ func GlobalPositionGreater(position *GlobalPosition) paginationOpt {
// PositionLess prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func PositionLess(position float64, inPositionOrder uint32) paginationOpt {
func PositionLess(position decimal.Decimal, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePosition()
p.position.max = &GlobalPosition{

View File

@@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database"
)
@@ -74,13 +76,13 @@ func TestPaginationOpt(t *testing.T) {
name: "global position greater",
args: args{
opts: []paginationOpt{
GlobalPositionGreater(&GlobalPosition{Position: 10}),
GlobalPositionGreater(&GlobalPosition{Position: decimal.NewFromInt(10)}),
},
},
want: &Pagination{
position: &PositionCondition{
min: &GlobalPosition{
Position: 10,
Position: decimal.NewFromInt(10),
InPositionOrder: 0,
},
},
@@ -90,13 +92,13 @@ func TestPaginationOpt(t *testing.T) {
name: "position greater",
args: args{
opts: []paginationOpt{
PositionGreater(10, 0),
PositionGreater(decimal.NewFromInt(10), 0),
},
},
want: &Pagination{
position: &PositionCondition{
min: &GlobalPosition{
Position: 10,
Position: decimal.NewFromInt(10),
InPositionOrder: 0,
},
},
@@ -107,13 +109,13 @@ func TestPaginationOpt(t *testing.T) {
name: "position less",
args: args{
opts: []paginationOpt{
PositionLess(10, 12),
PositionLess(decimal.NewFromInt(10), 12),
},
},
want: &Pagination{
position: &PositionCondition{
max: &GlobalPosition{
Position: 10,
Position: decimal.NewFromInt(10),
InPositionOrder: 12,
},
},
@@ -123,13 +125,13 @@ func TestPaginationOpt(t *testing.T) {
name: "global position less",
args: args{
opts: []paginationOpt{
GlobalPositionLess(&GlobalPosition{Position: 12, InPositionOrder: 24}),
GlobalPositionLess(&GlobalPosition{Position: decimal.NewFromInt(12), InPositionOrder: 24}),
},
},
want: &Pagination{
position: &PositionCondition{
max: &GlobalPosition{
Position: 12,
Position: decimal.NewFromInt(12),
InPositionOrder: 24,
},
},
@@ -140,19 +142,19 @@ func TestPaginationOpt(t *testing.T) {
args: args{
opts: []paginationOpt{
PositionBetween(
&GlobalPosition{10, 12},
&GlobalPosition{20, 0},
&GlobalPosition{decimal.NewFromInt(10), 12},
&GlobalPosition{decimal.NewFromInt(20), 0},
),
},
},
want: &Pagination{
position: &PositionCondition{
min: &GlobalPosition{
Position: 10,
Position: decimal.NewFromInt(10),
InPositionOrder: 12,
},
max: &GlobalPosition{
Position: 20,
Position: decimal.NewFromInt(20),
InPositionOrder: 0,
},
},

View File

@@ -1,6 +1,8 @@
package readmodel
import (
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/system"
"github.com/zitadel/zitadel/internal/v2/system/mirror"
@@ -8,7 +10,7 @@ import (
type LastSuccessfulMirror struct {
ID string
Position float64
Position decimal.Decimal
source string
}
@@ -53,7 +55,7 @@ func (h *LastSuccessfulMirror) Reduce(events ...*eventstore.StorageEvent) (err e
func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error {
// if position is set we skip all older events
if h.Position > 0 {
if h.Position.GreaterThan(decimal.NewFromInt(0)) {
return nil
}

View File

@@ -1,6 +1,8 @@
package mirror
import (
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -9,7 +11,7 @@ type succeededPayload struct {
// Source is the name of the database data are mirrored from
Source string `json:"source"`
// Position until data will be mirrored
Position float64 `json:"position"`
Position decimal.Decimal `json:"position"`
}
const SucceededType = eventTypePrefix + "succeeded"
@@ -38,7 +40,7 @@ func SucceededEventFromStorage(event *eventstore.StorageEvent) (e *SucceededEven
}, nil
}
func NewSucceededCommand(source string, position float64) *eventstore.Command {
func NewSucceededCommand(source string, position decimal.Decimal) *eventstore.Command {
return &eventstore.Command{
Action: eventstore.Action[any]{
Creator: Creator,