diff --git a/cmd/mirror/event.go b/cmd/mirror/event.go index af0ac25e27..2bb0d52f45 100644 --- a/cmd/mirror/event.go +++ b/cmd/mirror/event.go @@ -3,8 +3,6 @@ package mirror import ( "context" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/projection" "github.com/zitadel/zitadel/internal/v2/readmodel" @@ -32,12 +30,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore return lastSuccess, nil } -func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) { +func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) { var cmd *eventstore.Command if len(instanceIDs) > 0 { cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs) if err != nil { - return decimal.Decimal{}, err + return 0, err } } else { cmd = mirror_event.NewStartedSystemCommand(destination) @@ -60,12 +58,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i ), ) if err != nil { - return decimal.Decimal{}, err + return 0, err } return position.Position, nil } -func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error { +func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error { return destinationES.Push( ctx, eventstore.NewPushIntent( diff --git a/cmd/mirror/event_store.go b/cmd/mirror/event_store.go index 180b52543e..a845196b26 100644 --- a/cmd/mirror/event_store.go +++ b/cmd/mirror/event_store.go @@ -9,7 +9,6 @@ import ( "time" "github.com/jackc/pgx/v5/stdlib" - "github.com/shopspring/decimal" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/zitadel/logging" @@ -98,7 +97,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration") nextPos := make(chan bool, 1) - pos := make(chan decimal.Decimal, 1) + pos := make(chan float64, 1) errs := make(chan error, 3) go func() { @@ -149,7 +148,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { go func() { defer close(pos) for range nextPos { - var position decimal.Decimal + var position float64 err := dest.QueryRowContext( ctx, func(row *sql.Row) error { @@ -184,7 +183,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated") } -func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) { +func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) { joinedErrs := make([]error, 0, len(errs)) for err := range errs { joinedErrs = append(joinedErrs, err) diff --git a/go.mod b/go.mod index 6519f94bc1..b35bf04216 100644 --- a/go.mod +++ b/go.mod @@ -42,8 +42,7 @@ require ( github.com/h2non/gock v1.2.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/improbable-eng/grpc-web v0.15.0 - github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e - github.com/jackc/pgx/v5 v5.7.4 + github.com/jackc/pgx/v5 v5.7.0 github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 github.com/jinzhu/gorm v1.9.16 github.com/k3a/html2text v1.2.1 @@ -60,7 +59,6 @@ require ( github.com/redis/go-redis/v9 v9.7.0 github.com/rs/cors v1.11.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 - github.com/shopspring/decimal v1.4.0 github.com/sony/gobreaker/v2 v2.0.0 github.com/sony/sonyflake v1.2.0 github.com/spf13/cobra v1.8.1 @@ -117,7 +115,7 @@ require ( github.com/google/go-tpm v0.9.0 // indirect github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lib/pq v1.10.9 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect diff --git a/go.sum b/go.sum index 2584308759..0745f5aca7 100644 --- a/go.sum +++ b/go.sum @@ -420,12 +420,10 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e h1:i3gQ/Zo7sk4LUVbsAjTNeC4gIjoPNIZVzs4EXstssV4= -github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e/go.mod h1:zUHglCZ4mpDUPgIwqEKoba6+tcUQzRdb1+DPTuYe9pI= -github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg= -github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= -github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= -github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/pgx/v5 v5.7.0 h1:FG6VLIdzvAPhnYqP14sQ2xhFLkiUQHCs6ySqO91kF4g= +github.com/jackc/pgx/v5 v5.7.0/go.mod h1:awP1KNnjylvpxHuHP63gzjhnGkI1iw+PMoIwvoleN/8= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ= github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52/go.mod h1:RDZ+4PR3mDOtTpVbI0qBE+rdhmtIrtbssiNn38/1OWA= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -667,8 +665,6 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= -github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/internal/api/oidc/key.go b/internal/api/oidc/key.go index 4cb06a698e..76f78ab5ab 100644 --- a/internal/api/oidc/key.go +++ b/internal/api/oidc/key.go @@ -11,7 +11,6 @@ import ( "github.com/go-jose/go-jose/v4" "github.com/jonboulle/clockwork" "github.com/muhlemmer/gu" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/oidc/v3/pkg/op" @@ -351,14 +350,14 @@ func (o *OPStorage) getSigningKey(ctx context.Context) (op.SigningKey, error) { if len(keys.Keys) > 0 { return PrivateKeyToSigningKey(SelectSigningKey(keys.Keys), o.encAlg) } - var position decimal.Decimal + var position float64 if keys.State != nil { position = keys.State.Position } return nil, o.refreshSigningKey(ctx, position) } -func (o *OPStorage) refreshSigningKey(ctx context.Context, position decimal.Decimal) error { +func (o *OPStorage) refreshSigningKey(ctx context.Context, position float64) error { ok, err := o.ensureIsLatestKey(ctx, position) if err != nil || !ok { return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date") @@ -370,12 +369,12 @@ func (o *OPStorage) refreshSigningKey(ctx context.Context, position decimal.Deci return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "") } -func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position decimal.Decimal) (bool, error) { - maxSequence, err := o.getMaxKeyPosition(ctx) +func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position float64) (bool, error) { + maxSequence, err := o.getMaxKeySequence(ctx) if err != nil { return false, fmt.Errorf("error retrieving new events: %w", err) } - return position.GreaterThanOrEqual(maxSequence), nil + return position >= maxSequence, nil } func PrivateKeyToSigningKey(key query.PrivateKey, algorithm crypto.EncryptionAlgorithm) (_ op.SigningKey, err error) { @@ -413,9 +412,9 @@ func (o *OPStorage) lockAndGenerateSigningKeyPair(ctx context.Context) error { return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), "RS256") } -func (o *OPStorage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) { - return o.eventstore.LatestPosition(ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). +func (o *OPStorage) getMaxKeySequence(ctx context.Context) (float64, error) { + return o.eventstore.LatestSequence(ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). ResourceOwner(authz.GetInstance(ctx).InstanceID()). AwaitOpenTransactions(). AllowTimeTravel(). diff --git a/internal/api/saml/certificate.go b/internal/api/saml/certificate.go index 14752cd5cd..ff130f7709 100644 --- a/internal/api/saml/certificate.go +++ b/internal/api/saml/certificate.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-jose/go-jose/v4" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/saml/pkg/provider/key" @@ -77,7 +76,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates)) } - var position decimal.Decimal + var position float64 if certs.State != nil { position = certs.State.Position } @@ -88,7 +87,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag func (p *Storage) refreshCertificate( ctx context.Context, usage crypto.KeyUsage, - position decimal.Decimal, + position float64, ) error { ok, err := p.ensureIsLatestCertificate(ctx, position) if err != nil { @@ -104,12 +103,12 @@ func (p *Storage) refreshCertificate( return nil } -func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position decimal.Decimal) (bool, error) { - maxSequence, err := p.getMaxKeyPosition(ctx) +func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position float64) (bool, error) { + maxSequence, err := p.getMaxKeySequence(ctx) if err != nil { return false, fmt.Errorf("error retrieving new events: %w", err) } - return position.GreaterThanOrEqual(maxSequence), nil + return position >= maxSequence, nil } func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error { @@ -152,9 +151,9 @@ func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage cr } } -func (p *Storage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) { - return p.eventstore.LatestPosition(ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). +func (p *Storage) getMaxKeySequence(ctx context.Context) (float64, error) { + return p.eventstore.LatestSequence(ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). ResourceOwner(authz.GetInstance(ctx).InstanceID()). AwaitOpenTransactions(). AddQuery(). diff --git a/internal/database/cockroach/crdb.go b/internal/database/cockroach/crdb.go index 2b1006a5f2..48e912b5f5 100644 --- a/internal/database/cockroach/crdb.go +++ b/internal/database/cockroach/crdb.go @@ -73,16 +73,10 @@ func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) { } func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { - dialect.RegisterAfterConnect(func(ctx context.Context, conn *pgx.Conn) error { + dialect.RegisterAfterConnect(func(ctx context.Context, c *pgx.Conn) error { // CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT // This is needed to fill the fields table of the eventstore during eventstore.Push. - - // This modification is only needed on crdb so we check if the connection is crdb - // postgres doesn't have parameter so we check if the parameter is empty - if conn.PgConn().ParameterStatus("crdb_version") == "" { - return nil - } - _, err := conn.Exec(ctx, "SET enable_multiple_modifications_of_table = on") + _, err := c.Exec(ctx, "SET enable_multiple_modifications_of_table = on") return err }) connConfig := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns) diff --git a/internal/database/dialect/connections.go b/internal/database/dialect/connections.go index 460c12b7c3..13a4d657c3 100644 --- a/internal/database/dialect/connections.go +++ b/internal/database/dialect/connections.go @@ -5,8 +5,6 @@ import ( "errors" "reflect" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" ) @@ -23,12 +21,7 @@ type ConnectionConfig struct { AfterConnect []func(ctx context.Context, c *pgx.Conn) error } -var afterConnectFuncs = []func(ctx context.Context, c *pgx.Conn) error{ - func(ctx context.Context, c *pgx.Conn) error { - pgxdecimal.Register(c.TypeMap()) - return nil - }, -} +var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) { afterConnectFuncs = append(afterConnectFuncs, f) diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index 776b8c5afc..5f4d9a6c9b 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -81,15 +81,13 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { return nil, nil, err } - if len(connConfig.AfterConnect) > 0 { - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - for _, f := range connConfig.AfterConnect { - if err := f(ctx, conn); err != nil { - return err - } + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + for _, f := range connConfig.AfterConnect { + if err := f(ctx, conn); err != nil { + return err } - return nil } + return nil } if connConfig.MaxOpenConns != 0 { diff --git a/internal/eventstore/event.go b/internal/eventstore/event.go index 656a02f33d..3df096f069 100644 --- a/internal/eventstore/event.go +++ b/internal/eventstore/event.go @@ -5,8 +5,6 @@ import ( "reflect" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/zerrors" ) @@ -46,7 +44,7 @@ type Event interface { // CreatedAt is the time the event was created at CreatedAt() time.Time // Position is the global position of the event - Position() decimal.Decimal + Position() float64 // Unmarshal parses the payload and stores the result // in the value pointed to by ptr. If ptr is nil or not a pointer, diff --git a/internal/eventstore/event_base.go b/internal/eventstore/event_base.go index cc21864aad..45706641d8 100644 --- a/internal/eventstore/event_base.go +++ b/internal/eventstore/event_base.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -26,7 +25,7 @@ type BaseEvent struct { Agg *Aggregate Seq uint64 - Pos decimal.Decimal + Pos float64 Creation time.Time previousAggregateSequence uint64 previousAggregateTypeSequence uint64 @@ -39,7 +38,7 @@ type BaseEvent struct { } // Position implements Event. -func (e *BaseEvent) Position() decimal.Decimal { +func (e *BaseEvent) Position() float64 { return e.Pos } diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index e61ca81617..4954df86c8 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -7,7 +7,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -230,11 +229,11 @@ func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQu }) } -// LatestPosition filters the latest position for the given search query -func (es *Eventstore) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) { +// LatestSequence filters the latest sequence for the given search query +func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID()) - return es.querier.LatestPosition(ctx, queryFactory) + return es.querier.LatestSequence(ctx, queryFactory) } // InstanceIDs returns the distinct instance ids found by the search query @@ -266,8 +265,8 @@ type Querier interface { Health(ctx context.Context) error // FilterToReducer calls r for every event returned from the storage FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error - // LatestPosition returns the latest position found by the search query - LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) + // LatestSequence returns the latest sequence found by the search query + LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) // InstanceIDs returns the instance ids found by the search query InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) // Client returns the underlying database connection diff --git a/internal/eventstore/eventstore_querier_test.go b/internal/eventstore/eventstore_querier_test.go index 7ef85167b9..4b7ad78b25 100644 --- a/internal/eventstore/eventstore_querier_test.go +++ b/internal/eventstore/eventstore_querier_test.go @@ -4,8 +4,6 @@ import ( "context" "testing" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" ) @@ -133,7 +131,7 @@ func TestCRDB_Filter(t *testing.T) { } } -func TestCRDB_LatestPosition(t *testing.T) { +func TestCRDB_LatestSequence(t *testing.T) { type args struct { searchQuery *eventstore.SearchQueryBuilder } @@ -141,7 +139,7 @@ func TestCRDB_LatestPosition(t *testing.T) { existingEvents []eventstore.Command } type res struct { - position decimal.Decimal + sequence float64 } tests := []struct { name string @@ -153,7 +151,7 @@ func TestCRDB_LatestPosition(t *testing.T) { { name: "aggregate type filter no sequence", args: args{ - searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). + searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). AddQuery(). AggregateTypes("not found"). Builder(), @@ -170,7 +168,7 @@ func TestCRDB_LatestPosition(t *testing.T) { { name: "aggregate type filter sequence", args: args{ - searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). + searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). AddQuery(). AggregateTypes(eventstore.AggregateType(t.Name())). Builder(), @@ -204,12 +202,12 @@ func TestCRDB_LatestPosition(t *testing.T) { return } - position, err := db.LatestPosition(context.Background(), tt.args.searchQuery) + sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery) if (err != nil) != tt.wantErr { t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr) } - if tt.res.position.GreaterThan(position) { - t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.position, position) + if tt.res.sequence > sequence { + t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.sequence, sequence) } }) } diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 5452572faa..9e1aa77db1 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/service" @@ -398,7 +397,7 @@ func (repo *testPusher) Push(_ context.Context, _ database.ContextQueryExecuter, type testQuerier struct { events []Event - sequence decimal.Decimal + sequence float64 instances []string err error t *testing.T @@ -431,9 +430,9 @@ func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *Searc return nil } -func (repo *testQuerier) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) { +func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { if repo.err != nil { - return decimal.Decimal{}, repo.err + return 0, repo.err } return repo.sequence, nil } @@ -1077,7 +1076,7 @@ func TestEventstore_FilterEvents(t *testing.T) { } } -func TestEventstore_LatestPosition(t *testing.T) { +func TestEventstore_LatestSequence(t *testing.T) { type args struct { query *SearchQueryBuilder } @@ -1097,7 +1096,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "no events", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1120,7 +1119,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "repo error", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1143,7 +1142,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "found events", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1169,7 +1168,7 @@ func TestEventstore_LatestPosition(t *testing.T) { querier: tt.fields.repo, } - _, err := es.LatestPosition(context.Background(), tt.args.query) + _, err := es.LatestSequence(context.Background(), tt.args.query) if (err != nil) != tt.res.wantErr { t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr) } diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index f780b83cc6..ad309ac790 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -8,7 +8,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/eventstore" ) @@ -127,15 +126,10 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) return additionalIteration, err } // stop execution if currentState.eventTimestamp >= config.maxCreatedAt - if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) { + if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } - if config.minPosition.GreaterThan(decimal.NewFromInt(0)) { - currentState.position = config.minPosition - currentState.offset = 0 - } - events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState) if err != nil { return additionalIteration, err @@ -165,7 +159,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState idx, offset := skipPreviouslyReducedEvents(events, currentState) - if currentState.position.Equal(events[len(events)-1].Position()) { + if currentState.position == events[len(events)-1].Position() { offset += currentState.offset } currentState.position = events[len(events)-1].Position() @@ -195,9 +189,9 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState } func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) { - var position decimal.Decimal + var position float64 for i, event := range events { - if !event.Position().Equal(position) { + if event.Position() != position { offset = 0 position = event.Position() } diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index d6419c71ed..dddee8ea1f 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -4,13 +4,13 @@ import ( "context" "database/sql" "errors" + "math" "math/rand" "slices" "sync" "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -381,8 +381,7 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) { type triggerConfig struct { awaitRunning bool - maxPosition decimal.Decimal - minPosition decimal.Decimal + maxPosition float64 } type TriggerOpt func(conf *triggerConfig) @@ -393,18 +392,12 @@ func WithAwaitRunning() TriggerOpt { } } -func WithMaxPosition(position decimal.Decimal) TriggerOpt { +func WithMaxPosition(position float64) TriggerOpt { return func(conf *triggerConfig) { conf.maxPosition = position } } -func WithMinPosition(position decimal.Decimal) TriggerOpt { - return func(conf *triggerConfig) { - conf.minPosition = position - } -} - func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) { config := new(triggerConfig) for _, opt := range opts { @@ -513,15 +506,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return additionalIteration, err } // stop execution if currentState.position >= config.maxPosition - if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) { + if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } - if config.minPosition.GreaterThan(decimal.NewFromInt(0)) { - currentState.position = config.minPosition - currentState.offset = 0 - } - var statements []*Statement statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState) if err != nil { @@ -613,7 +601,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int { for i, statement := range statements { - if statement.Position.Equal(currentState.position) && + if statement.Position == currentState.position && statement.Aggregate.ID == currentState.aggregateID && statement.Aggregate.Type == currentState.aggregateType && statement.Sequence == currentState.sequence { @@ -677,8 +665,9 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder OrderAsc(). InstanceID(currentState.instanceID) - if currentState.position.GreaterThan(decimal.Decimal{}) { - builder = builder.PositionAtLeast(currentState.position) + if currentState.position > 0 { + // decrease position by 10 because builder.PositionAfter filters for position > and we need position >= + builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10)) if currentState.offset > 0 { builder = builder.Offset(currentState.offset) } diff --git a/internal/eventstore/handler/v2/state.go b/internal/eventstore/handler/v2/state.go index c4afaed204..d3b6953488 100644 --- a/internal/eventstore/handler/v2/state.go +++ b/internal/eventstore/handler/v2/state.go @@ -7,8 +7,6 @@ import ( "errors" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" @@ -16,7 +14,7 @@ import ( type state struct { instanceID string - position decimal.Decimal + position float64 eventTimestamp time.Time aggregateType eventstore.AggregateType aggregateID string @@ -47,7 +45,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC aggregateType = new(sql.NullString) sequence = new(sql.NullInt64) timestamp = new(sql.NullTime) - position = new(decimal.NullDecimal) + position = new(sql.NullFloat64) offset = new(sql.NullInt64) ) @@ -77,7 +75,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC currentState.aggregateType = eventstore.AggregateType(aggregateType.String) currentState.sequence = uint64(sequence.Int64) currentState.eventTimestamp = timestamp.Time - currentState.position = position.Decimal + currentState.position = position.Float64 // psql does not provide unsigned numbers so we work around it currentState.offset = uint32(offset.Int64) return currentState, nil diff --git a/internal/eventstore/handler/v2/state_test.go b/internal/eventstore/handler/v2/state_test.go index ef91d78e55..cc5fb1fbab 100644 --- a/internal/eventstore/handler/v2/state_test.go +++ b/internal/eventstore/handler/v2/state_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database/mock" @@ -167,7 +166,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, }, }, isErr: func(t *testing.T, err error) { @@ -193,7 +192,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, }, }, isErr: func(t *testing.T, err error) { @@ -218,7 +217,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { eventstore.AggregateType("aggregate type"), uint64(42), mock.AnyType[time.Time]{}, - decimal.NewFromInt(42), + float64(42), uint32(0), ), mock.WithExecRowsAffected(1), @@ -229,7 +228,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, aggregateType: "aggregate type", aggregateID: "aggregate id", sequence: 42, @@ -398,7 +397,7 @@ func TestHandler_currentState(t *testing.T) { "aggregate type", int64(42), testTime, - decimal.NewFromInt(42).String(), + float64(42), uint16(10), }, }, @@ -413,7 +412,7 @@ func TestHandler_currentState(t *testing.T) { currentState: &state{ instanceID: "instance", eventTimestamp: testTime, - position: decimal.NewFromInt(42), + position: 42, aggregateType: "aggregate type", aggregateID: "aggregate id", sequence: 42, diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index b185c58221..a02e5d3580 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "golang.org/x/exp/constraints" @@ -83,7 +82,7 @@ func (h *Handler) reduce(event eventstore.Event) (*Statement, error) { type Statement struct { Aggregate *eventstore.Aggregate Sequence uint64 - Position decimal.Decimal + Position float64 CreationDate time.Time offset uint32 diff --git a/internal/eventstore/local_crdb_test.go b/internal/eventstore/local_crdb_test.go index b6fc368792..87c5084fe7 100644 --- a/internal/eventstore/local_crdb_test.go +++ b/internal/eventstore/local_crdb_test.go @@ -2,14 +2,13 @@ package eventstore_test import ( "context" + "database/sql" "encoding/json" "os" "testing" "time" "github.com/cockroachdb/cockroach-go/v2/testserver" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/zitadel/logging" @@ -42,15 +41,13 @@ func TestMain(m *testing.M) { testCRDBClient = &database.DB{ Database: new(testDB), } - config, err := pgxpool.ParseConfig(ts.PGURL().String()) + + connConfig, err := pgxpool.ParseConfig(ts.PGURL().String()) if err != nil { - logging.WithFields("error", err).Fatal("unable to parse db config") + logging.WithFields("error", err).Fatal("unable to parse db url") } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } - pool, err := pgxpool.NewWithConfig(context.Background(), config) + connConfig.AfterConnect = new_es.RegisterEventstoreTypes + pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) if err != nil { logging.WithFields("error", err).Fatal("unable to create db pool") } @@ -115,15 +112,10 @@ func initDB(ctx context.Context, db *database.DB) error { } func connectLocalhost() (*database.DB, error) { - config, err := pgxpool.ParseConfig("postgresql://root@localhost:26257/defaultdb?sslmode=disable") + client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable") if err != nil { return nil, err } - pool, err := pgxpool.NewWithConfig(context.Background(), config) - if err != nil { - return nil, err - } - client := stdlib.OpenDBFromPool(pool) if err = client.Ping(); err != nil { return nil, err } diff --git a/internal/eventstore/read_model.go b/internal/eventstore/read_model.go index ae77275732..d2c755cc3a 100644 --- a/internal/eventstore/read_model.go +++ b/internal/eventstore/read_model.go @@ -1,23 +1,19 @@ package eventstore -import ( - "time" - - "github.com/shopspring/decimal" -) +import "time" // ReadModel is the minimum representation of a read model. // It implements a basic reducer // it might be saved in a database or in memory type ReadModel struct { - AggregateID string `json:"-"` - ProcessedSequence uint64 `json:"-"` - CreationDate time.Time `json:"-"` - ChangeDate time.Time `json:"-"` - Events []Event `json:"-"` - ResourceOwner string `json:"-"` - InstanceID string `json:"-"` - Position decimal.Decimal `json:"-"` + AggregateID string `json:"-"` + ProcessedSequence uint64 `json:"-"` + CreationDate time.Time `json:"-"` + ChangeDate time.Time `json:"-"` + Events []Event `json:"-"` + ResourceOwner string `json:"-"` + InstanceID string `json:"-"` + Position float64 `json:"-"` } // AppendEvents adds all the events to the read model. diff --git a/internal/eventstore/repository/event.go b/internal/eventstore/repository/event.go index 1107649934..d0d2660d79 100644 --- a/internal/eventstore/repository/event.go +++ b/internal/eventstore/repository/event.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -23,7 +22,7 @@ type Event struct { // Seq is the sequence of the event Seq uint64 // Pos is the global sequence of the event multiple events can have the same sequence - Pos decimal.Decimal + Pos float64 //CreationDate is the time the event is created // it's used for human readability. @@ -98,7 +97,7 @@ func (e *Event) Sequence() uint64 { } // Position implements [eventstore.Event] -func (e *Event) Position() decimal.Decimal { +func (e *Event) Position() float64 { return e.Pos } diff --git a/internal/eventstore/repository/mock/repository.mock.go b/internal/eventstore/repository/mock/repository.mock.go index 12925bc975..8d5c0430ad 100644 --- a/internal/eventstore/repository/mock/repository.mock.go +++ b/internal/eventstore/repository/mock/repository.mock.go @@ -13,7 +13,6 @@ import ( context "context" reflect "reflect" - decimal "github.com/shopspring/decimal" database "github.com/zitadel/zitadel/internal/database" eventstore "github.com/zitadel/zitadel/internal/eventstore" gomock "go.uber.org/mock/gomock" @@ -99,19 +98,19 @@ func (mr *MockQuerierMockRecorder) InstanceIDs(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1) } -// LatestPosition mocks base method. -func (m *MockQuerier) LatestPosition(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (decimal.Decimal, error) { +// LatestSequence mocks base method. +func (m *MockQuerier) LatestSequence(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (float64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LatestPosition", arg0, arg1) - ret0, _ := ret[0].(decimal.Decimal) + ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1) + ret0, _ := ret[0].(float64) ret1, _ := ret[1].(error) return ret0, ret1 } -// LatestPosition indicates an expected call of LatestPosition. -func (mr *MockQuerierMockRecorder) LatestPosition(arg0, arg1 any) *gomock.Call { +// LatestSequence indicates an expected call of LatestSequence. +func (mr *MockQuerierMockRecorder) LatestSequence(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestPosition", reflect.TypeOf((*MockQuerier)(nil).LatestPosition), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockQuerier)(nil).LatestSequence), arg0, arg1) } // MockPusher is a mock of Pusher interface. diff --git a/internal/eventstore/repository/mock/repository.mock.impl.go b/internal/eventstore/repository/mock/repository.mock.impl.go index 97414cbe8b..9ae0b6b1ea 100644 --- a/internal/eventstore/repository/mock/repository.mock.impl.go +++ b/internal/eventstore/repository/mock/repository.mock.impl.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -188,8 +187,8 @@ func (e *mockEvent) Sequence() uint64 { return e.sequence } -func (e *mockEvent) Position() decimal.Decimal { - return decimal.Decimal{} +func (e *mockEvent) Position() float64 { + return 0 } func (e *mockEvent) CreatedAt() time.Time { diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index c905e3ae01..f84c7f1201 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -3,8 +3,6 @@ package repository import ( "database/sql" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" @@ -60,8 +58,6 @@ const ( //OperationNotIn checks if a stored value does not match one of the passed value list OperationNotIn - OperationGreaterOrEquals - operationCount ) @@ -256,10 +252,10 @@ func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuer } func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter { - if builder.GetPositionAtLeast().IsZero() { + if builder.GetPositionAfter() == 0 { return nil } - query.Position = NewFilter(FieldPosition, builder.GetPositionAtLeast(), OperationGreaterOrEquals) + query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater) return query.Position } @@ -301,7 +297,7 @@ func eventDataFilter(query *eventstore.SearchQuery) *Filter { } func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter { - if pos := query.GetPositionAfter(); !pos.Equal(decimal.Decimal{}) { + if pos := query.GetPositionAfter(); pos != 0 { return NewFilter(FieldPosition, pos, OperationGreater) } return nil diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index ffcfb0786f..68610676c3 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -11,7 +11,6 @@ import ( "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -265,11 +264,11 @@ func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.S return err } -// LatestPosition returns the latest position found by the search query -func (db *CRDB) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) { - var position decimal.Decimal +// LatestSequence returns the latest sequence found by the search query +func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) { + var position sql.NullFloat64 err := query(ctx, db, searchQuery, &position, false) - return position, err + return position.Float64, err } // InstanceIDs returns the instance ids found by the search query @@ -336,7 +335,7 @@ func (db *CRDB) eventQuery(useV1 bool) string { " FROM eventstore.events2" } -func (db *CRDB) maxPositionQuery(useV1 bool) string { +func (db *CRDB) maxSequenceQuery(useV1 bool) string { if useV1 { return `SELECT event_sequence FROM eventstore.events` } @@ -414,8 +413,6 @@ func (db *CRDB) operation(operation repository.Operation) string { return "=" case repository.OperationGreater: return ">" - case repository.OperationGreaterOrEquals: - return ">=" case repository.OperationLess: return "<" case repository.OperationJSONContains: diff --git a/internal/eventstore/repository/sql/crdb_test.go b/internal/eventstore/repository/sql/crdb_test.go index aae2fde78d..a3f3331a82 100644 --- a/internal/eventstore/repository/sql/crdb_test.go +++ b/internal/eventstore/repository/sql/crdb_test.go @@ -4,8 +4,6 @@ import ( "database/sql" "testing" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/repository" ) @@ -314,7 +312,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev ResourceOwner: sql.NullString{String: "ro", Valid: true}, Typ: "test.created", Version: "v1", - Pos: decimal.NewFromInt(42), + Pos: 42, } for _, opt := range opts { diff --git a/internal/eventstore/repository/sql/local_crdb_test.go b/internal/eventstore/repository/sql/local_crdb_test.go index 596be2584c..0f8c934b47 100644 --- a/internal/eventstore/repository/sql/local_crdb_test.go +++ b/internal/eventstore/repository/sql/local_crdb_test.go @@ -8,8 +8,6 @@ import ( "time" "github.com/cockroachdb/cockroach-go/v2/testserver" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/zitadel/logging" @@ -17,6 +15,7 @@ import ( "github.com/zitadel/zitadel/cmd/initialise" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database/cockroach" + new_es "github.com/zitadel/zitadel/internal/eventstore/v3" ) var ( @@ -37,10 +36,7 @@ func TestMain(m *testing.M) { if err != nil { logging.WithFields("error", err).Fatal("unable to parse db url") } - connConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } + connConfig.AfterConnect = new_es.RegisterEventstoreTypes pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) if err != nil { logging.WithFields("error", err).Fatal("unable to create db pool") diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index dd405d4489..4e1cc87aff 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/call" @@ -26,7 +25,7 @@ type querier interface { conditionFormat(repository.Operation) string placeholder(query string) string eventQuery(useV1 bool) string - maxPositionQuery(useV1 bool) string + maxSequenceQuery(useV1 bool) string instanceIDsQuery(useV1 bool) string Client() *database.DB orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string @@ -75,7 +74,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search // instead of using the max function of the database (which doesn't work for postgres) // we select the most recent row - if q.Columns == eventstore.ColumnsMaxPosition { + if q.Columns == eventstore.ColumnsMaxSequence { q.Limit = 1 q.Desc = true } @@ -92,7 +91,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search switch q.Columns { case eventstore.ColumnsEvent, - eventstore.ColumnsMaxPosition: + eventstore.ColumnsMaxSequence: query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1) } @@ -148,8 +147,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) { switch columns { - case eventstore.ColumnsMaxPosition: - return criteria.maxPositionQuery(useV1), maxPositionScanner + case eventstore.ColumnsMaxSequence: + return criteria.maxSequenceQuery(useV1), maxSequenceScanner case eventstore.ColumnsInstanceIDs: return criteria.instanceIDsQuery(useV1), instanceIDsScanner case eventstore.ColumnsEvent: @@ -167,15 +166,13 @@ func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string return criteria.Timetravel(took) } -func maxPositionScanner(row scan, dest interface{}) (err error) { - position, ok := dest.(*decimal.Decimal) +func maxSequenceScanner(row scan, dest interface{}) (err error) { + position, ok := dest.(*sql.NullFloat64) if !ok { - return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be decimal.Decimal got: %T", dest) + return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest) } - var res decimal.NullDecimal - err = row(&res) + err = row(position) if err == nil || errors.Is(err, sql.ErrNoRows) { - *position = res.Decimal return nil } return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong") @@ -204,7 +201,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest) } event := new(repository.Event) - position := new(decimal.NullDecimal) + position := new(sql.NullFloat64) if useV1 { err = scanner( @@ -241,7 +238,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) logging.New().WithError(err).Warn("unable to scan row") return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row") } - event.Pos = position.Decimal + event.Pos = position.Float64 return reduce(event) } } diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index fbf02f1996..abac19ead0 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" - "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/zitadel/zitadel/internal/database" @@ -111,36 +110,36 @@ func Test_prepareColumns(t *testing.T) { { name: "max column", args: args{ - columns: eventstore.ColumnsMaxPosition, - dest: new(decimal.Decimal), + columns: eventstore.ColumnsMaxSequence, + dest: new(sql.NullFloat64), useV1: true, }, res: res{ query: `SELECT event_sequence FROM eventstore.events`, - expected: decimal.NewFromInt(42), + expected: sql.NullFloat64{Float64: 43, Valid: true}, }, fields: fields{ - dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))}, + dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, }, }, { name: "max column v2", args: args{ - columns: eventstore.ColumnsMaxPosition, - dest: new(decimal.Decimal), + columns: eventstore.ColumnsMaxSequence, + dest: new(sql.NullFloat64), }, res: res{ query: `SELECT "position" FROM eventstore.events2`, - expected: decimal.NewFromInt(42), + expected: sql.NullFloat64{Float64: 43, Valid: true}, }, fields: fields{ - dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))}, + dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, }, }, { name: "max sequence wrong dest type", args: args{ - columns: eventstore.ColumnsMaxPosition, + columns: eventstore.ColumnsMaxSequence, dest: new(uint64), }, res: res{ @@ -180,11 +179,11 @@ func Test_prepareColumns(t *testing.T) { res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"}, }, }, fields: fields{ - dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, + dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, }, }, { @@ -199,11 +198,11 @@ func Test_prepareColumns(t *testing.T) { res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"}, }, }, fields: fields{ - dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, + dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, }, }, { @@ -1007,7 +1006,7 @@ func Test_query_events_mocked(t *testing.T) { InstanceID("instanceID"). OrderDesc(). Limit(5). - PositionAtLeast(decimal.NewFromFloat(123.456)). + PositionAfter(123.456). AddQuery(). AggregateTypes("notify"). EventTypes("notify.foo.bar"). @@ -1023,7 +1022,7 @@ func Test_query_events_mocked(t *testing.T) { regexp.QuoteMeta( `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY event_sequence DESC LIMIT $9`, ), - []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, + []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, ), }, res: res{ @@ -1038,7 +1037,7 @@ func Test_query_events_mocked(t *testing.T) { InstanceID("instanceID"). OrderDesc(). Limit(5). - PositionAtLeast(decimal.NewFromFloat(123.456)). + PositionAfter(123.456). AddQuery(). AggregateTypes("notify"). EventTypes("notify.foo.bar"). @@ -1054,7 +1053,7 @@ func Test_query_events_mocked(t *testing.T) { regexp.QuoteMeta( `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`, ), - []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, + []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, ), }, res: res{ diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index 9dd6cfa479..df38d15def 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -5,8 +5,6 @@ import ( "database/sql" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -28,7 +26,7 @@ type SearchQueryBuilder struct { lockRows bool lockOption LockOption allowTimeTravel bool - positionAtLeast decimal.Decimal + positionAfter float64 awaitOpenTransactions bool creationDateAfter time.Time creationDateBefore time.Time @@ -83,8 +81,8 @@ func (b *SearchQueryBuilder) GetAllowTimeTravel() bool { return b.allowTimeTravel } -func (b SearchQueryBuilder) GetPositionAtLeast() decimal.Decimal { - return b.positionAtLeast +func (b SearchQueryBuilder) GetPositionAfter() float64 { + return b.positionAfter } func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool { @@ -120,7 +118,7 @@ type SearchQuery struct { aggregateIDs []string eventTypes []EventType eventData map[string]interface{} - positionAfter decimal.Decimal + positionAfter float64 } func (q SearchQuery) GetAggregateTypes() []AggregateType { @@ -139,7 +137,7 @@ func (q SearchQuery) GetEventData() map[string]interface{} { return q.eventData } -func (q SearchQuery) GetPositionAfter() decimal.Decimal { +func (q SearchQuery) GetPositionAfter() float64 { return q.positionAfter } @@ -163,8 +161,8 @@ type Columns int8 const ( //ColumnsEvent represents all fields of an event ColumnsEvent = iota + 1 - // ColumnsMaxPosition represents the latest sequence of the filtered events - ColumnsMaxPosition + // ColumnsMaxSequence represents the latest sequence of the filtered events + ColumnsMaxSequence // ColumnsInstanceIDs represents the instance ids of the filtered events ColumnsInstanceIDs @@ -298,9 +296,9 @@ func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder { return builder } -// PositionAtLeast filters for events which happened after the specified time -func (builder *SearchQueryBuilder) PositionAtLeast(position decimal.Decimal) *SearchQueryBuilder { - builder.positionAtLeast = position +// PositionAfter filters for events which happened after the specified time +func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder { + builder.positionAfter = position return builder } @@ -407,7 +405,7 @@ func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery { return query } -func (query *SearchQuery) PositionAfter(position decimal.Decimal) *SearchQuery { +func (query *SearchQuery) PositionAfter(position float64) *SearchQuery { query.positionAfter = position return query } diff --git a/internal/eventstore/search_query_test.go b/internal/eventstore/search_query_test.go index da8acf878d..8c654911ea 100644 --- a/internal/eventstore/search_query_test.go +++ b/internal/eventstore/search_query_test.go @@ -116,10 +116,10 @@ func TestSearchQuerybuilderSetters(t *testing.T) { { name: "set columns", args: args{ - setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxPosition)}, + setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxSequence)}, }, res: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, }, }, { diff --git a/internal/eventstore/v1/models/event.go b/internal/eventstore/v1/models/event.go index ab2b608872..8c50d64da0 100644 --- a/internal/eventstore/v1/models/event.go +++ b/internal/eventstore/v1/models/event.go @@ -5,8 +5,6 @@ import ( "reflect" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -22,7 +20,7 @@ var _ eventstore.Event = (*Event)(nil) type Event struct { ID string Seq uint64 - Pos decimal.Decimal + Pos float64 CreationDate time.Time Typ eventstore.EventType PreviousSequence uint64 @@ -82,7 +80,7 @@ func (e *Event) Sequence() uint64 { } // Position implements [eventstore.Event] -func (e *Event) Position() decimal.Decimal { +func (e *Event) Position() float64 { return e.Pos } diff --git a/internal/eventstore/v3/event.go b/internal/eventstore/v3/event.go index c9ea4d2c62..1141a9eacf 100644 --- a/internal/eventstore/v3/event.go +++ b/internal/eventstore/v3/event.go @@ -6,7 +6,6 @@ import ( "strconv" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -43,7 +42,7 @@ type event struct { command *command createdAt time.Time sequence uint64 - position decimal.Decimal + position float64 } // TODO: remove on v3 @@ -153,8 +152,8 @@ func (e *event) Sequence() uint64 { return e.sequence } -// Position implements [eventstore.Event] -func (e *event) Position() decimal.Decimal { +// Sequence implements [eventstore.Event] +func (e *event) Position() float64 { return e.position } diff --git a/internal/execution/execution_test.go b/internal/execution/execution_test.go index 40731a840a..5a45d96625 100644 --- a/internal/execution/execution_test.go +++ b/internal/execution/execution_test.go @@ -61,7 +61,7 @@ func Test_Call(t *testing.T) { args{ ctx: context.Background(), timeout: time.Second, - sleep: 2 * time.Second, + sleep: time.Second, method: http.MethodPost, body: []byte("{\"request\": \"values\"}"), respBody: []byte("{\"response\": \"values\"}"), diff --git a/internal/notification/projections.go b/internal/notification/projections.go index 1923ac6e15..f5c66b8171 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -67,26 +67,6 @@ func Start(ctx context.Context) { worker.Start(ctx) } -func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error { - if len(projections) == 0 { - return nil - } - position, err := es.LatestPosition(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1)) - if err != nil { - return err - } - - for i, projection := range projections { - logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection") - _, err = projection.Trigger(ctx, handler.WithMinPosition(position)) - if err != nil { - return err - } - logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set") - } - return nil -} - func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection") diff --git a/internal/query/access_token.go b/internal/query/access_token.go index 030ddda473..0fc1bbb369 100644 --- a/internal/query/access_token.go +++ b/internal/query/access_token.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "golang.org/x/text/language" "github.com/zitadel/zitadel/internal/domain" @@ -141,7 +140,7 @@ func (q *Queries) accessTokenByOIDCSessionAndTokenID(ctx context.Context, oidcSe // checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination) // occurred after a certain time and will return an error if so. -func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position decimal.Decimal, fingerprintID string) (err error) { +func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position float64, fingerprintID string) (err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() @@ -166,7 +165,7 @@ func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, } type sessionTerminatedModel struct { - position decimal.Decimal + position float64 sessionID string userID string fingerPrintID string diff --git a/internal/query/current_state.go b/internal/query/current_state.go index 790b594c2d..29497e6eec 100644 --- a/internal/query/current_state.go +++ b/internal/query/current_state.go @@ -10,7 +10,6 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/call" @@ -27,7 +26,7 @@ type Stateful interface { type State struct { LastRun time.Time - Position decimal.Decimal + Position float64 EventCreatedAt time.Time AggregateID string AggregateType eventstore.AggregateType @@ -222,7 +221,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild var ( creationDate sql.NullTime lastUpdated sql.NullTime - position decimal.NullDecimal + position sql.NullFloat64 ) err := row.Scan( &creationDate, @@ -235,7 +234,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild return &State{ EventCreatedAt: creationDate.Time, LastRun: lastUpdated.Time, - Position: position.Decimal, + Position: position.Float64, }, nil } } @@ -260,7 +259,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec var ( lastRun sql.NullTime eventDate sql.NullTime - currentPosition decimal.NullDecimal + currentPosition sql.NullFloat64 aggregateType sql.NullString aggregateID sql.NullString sequence sql.NullInt64 @@ -281,7 +280,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec } currentState.State.EventCreatedAt = eventDate.Time currentState.State.LastRun = lastRun.Time - currentState.Position = currentPosition.Decimal + currentState.Position = currentPosition.Float64 currentState.AggregateType = eventstore.AggregateType(aggregateType.String) currentState.AggregateID = aggregateID.String currentState.Sequence = uint64(sequence.Int64) diff --git a/internal/query/current_state_test.go b/internal/query/current_state_test.go index f17509aa9c..c76dae710e 100644 --- a/internal/query/current_state_test.go +++ b/internal/query/current_state_test.go @@ -7,8 +7,6 @@ import ( "fmt" "regexp" "testing" - - "github.com/shopspring/decimal" ) var ( @@ -89,7 +87,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { State: State{ EventCreatedAt: testNow, LastRun: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, AggregateID: "agg-id", AggregateType: "agg-type", Sequence: 20211108, @@ -136,7 +134,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { ProjectionName: "projection-name", State: State{ EventCreatedAt: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, LastRun: testNow, AggregateID: "agg-id", AggregateType: "agg-type", @@ -147,7 +145,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { ProjectionName: "projection-name2", State: State{ EventCreatedAt: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, LastRun: testNow, AggregateID: "agg-id", AggregateType: "agg-type", diff --git a/internal/query/user_grant.go b/internal/query/user_grant.go index e2bbabdc72..265d8eaae1 100644 --- a/internal/query/user_grant.go +++ b/internal/query/user_grant.go @@ -281,7 +281,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh return nil, zerrors.ThrowInternal(err, "QUERY-wXnQR", "Errors.Query.SQLStatement") } - latestState, err := q.latestState(ctx, userGrantTable) + latestSequence, err := q.latestState(ctx, userGrantTable) if err != nil { return nil, err } @@ -294,7 +294,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh return nil, err } - grants.State = latestState + grants.State = latestSequence return grants, nil } diff --git a/internal/query/user_membership.go b/internal/query/user_membership.go index a08617d57b..7ba2629cfa 100644 --- a/internal/query/user_membership.go +++ b/internal/query/user_membership.go @@ -144,7 +144,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer if err != nil { return nil, zerrors.ThrowInvalidArgument(err, "QUERY-T84X9", "Errors.Query.InvalidRequest") } - latestState, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable) + latestSequence, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable) if err != nil { return nil, err } @@ -157,7 +157,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer if err != nil { return nil, err } - memberships.State = latestState + memberships.State = latestSequence return memberships, nil } diff --git a/internal/v2/database/number_filter.go b/internal/v2/database/number_filter.go index 4853806457..ce263ceeee 100644 --- a/internal/v2/database/number_filter.go +++ b/internal/v2/database/number_filter.go @@ -3,7 +3,6 @@ package database import ( "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "golang.org/x/exp/constraints" ) @@ -95,7 +94,7 @@ func (c numberCompare) String() string { } type number interface { - constraints.Integer | constraints.Float | time.Time | decimal.Decimal + constraints.Integer | constraints.Float | time.Time // TODO: condition must know if it's args are named parameters or not // constraints.Integer | constraints.Float | time.Time | placeholder } diff --git a/internal/v2/eventstore/event_store.go b/internal/v2/eventstore/event_store.go index e89786c657..cc447c5e15 100644 --- a/internal/v2/eventstore/event_store.go +++ b/internal/v2/eventstore/event_store.go @@ -2,8 +2,6 @@ package eventstore import ( "context" - - "github.com/shopspring/decimal" ) func NewEventstore(querier Querier, pusher Pusher) *EventStore { @@ -32,12 +30,12 @@ type healthier interface { } type GlobalPosition struct { - Position decimal.Decimal + Position float64 InPositionOrder uint32 } func (gp GlobalPosition) IsLess(other GlobalPosition) bool { - return gp.Position.LessThan(other.Position) || (gp.Position.Equal(other.Position) && gp.InPositionOrder < other.InPositionOrder) + return gp.Position < other.Position || (gp.Position == other.Position && gp.InPositionOrder < other.InPositionOrder) } type Reducer interface { diff --git a/internal/v2/eventstore/postgres/push_test.go b/internal/v2/eventstore/postgres/push_test.go index 6f8b224c41..91fdc1fcd7 100644 --- a/internal/v2/eventstore/postgres/push_test.go +++ b/internal/v2/eventstore/postgres/push_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/zerrors" @@ -820,7 +818,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -901,11 +899,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -986,11 +984,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -1046,7 +1044,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -1101,7 +1099,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -1183,11 +1181,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -1274,11 +1272,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), diff --git a/internal/v2/eventstore/postgres/query_test.go b/internal/v2/eventstore/postgres/query_test.go index 34b73bd820..56f506ac50 100644 --- a/internal/v2/eventstore/postgres/query_test.go +++ b/internal/v2/eventstore/postgres/query_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/eventstore" @@ -543,13 +541,13 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0), + eventstore.PositionGreater(123.4, 0), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND position > $2 ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(123.4)}, + args: []any{"i1", 123.4}, }, }, { @@ -557,18 +555,18 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - // eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0), + // eventstore.PositionGreater(123.4, 0), // eventstore.PositionLess(125.4, 10), eventstore.PositionBetween( - &eventstore.GlobalPosition{Position: decimal.NewFromFloat(123.4)}, - &eventstore.GlobalPosition{Position: decimal.NewFromFloat(125.4), InPositionOrder: 10}, + &eventstore.GlobalPosition{Position: 123.4}, + &eventstore.GlobalPosition{Position: 125.4, InPositionOrder: 10}, ), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order < $3) OR position < $4) AND position > $5 ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(125.4), uint32(10), decimal.NewFromFloat(125.4), decimal.NewFromFloat(123.4)}, + args: []any{"i1", 125.4, uint32(10), 125.4, 123.4}, // TODO: (adlerhurst) would require some refactoring to reuse existing args // query: " WHERE instance_id = $1 AND position > $2 AND ((position = $3 AND in_tx_order < $4) OR position < $3) ORDER BY position, in_tx_order", // args: []any{"i1", 123.4, 125.4, uint32(10)}, @@ -579,13 +577,13 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4)}, + args: []any{"i1", 123.4, uint32(12), 123.4}, }, }, { @@ -595,13 +593,13 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order LIMIT $5 OFFSET $6", - args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, { @@ -611,14 +609,14 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), eventstore.AppendAggregateFilter("user"), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND aggregate_type = $2 AND ((position = $3 AND in_tx_order > $4) OR position > $5) ORDER BY position, in_tx_order LIMIT $6 OFFSET $7", - args: []any{"i1", "user", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", "user", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, { @@ -628,7 +626,7 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), eventstore.AppendAggregateFilter("user"), eventstore.AppendAggregateFilter( @@ -639,7 +637,7 @@ func Test_writeFilter(t *testing.T) { }, want: wantQuery{ query: " WHERE instance_id = $1 AND (aggregate_type = $2 OR (aggregate_type = $3 AND aggregate_id = $4)) AND ((position = $5 AND in_tx_order > $6) OR position > $7) ORDER BY position, in_tx_order LIMIT $8 OFFSET $9", - args: []any{"i1", "user", "org", "o1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", "user", "org", "o1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, } @@ -958,7 +956,7 @@ func Test_writeQueryUse_examples(t *testing.T) { ), eventstore.FilterPagination( // used because we need to check for first login and an app which is not console - eventstore.PositionGreater(decimal.NewFromInt(12), 4), + eventstore.PositionGreater(12, 4), ), ), eventstore.NewFilter( @@ -1067,9 +1065,9 @@ func Test_writeQueryUse_examples(t *testing.T) { "instance", "user", "user.token.added", - decimal.NewFromInt(12), + float64(12), uint32(4), - decimal.NewFromInt(12), + float64(12), "instance", "instance", []string{"instance.idp.config.added", "instance.idp.oauth.added", "instance.idp.oidc.added", "instance.idp.jwt.added", "instance.idp.azure.added", "instance.idp.github.added", "instance.idp.github.enterprise.added", "instance.idp.gitlab.added", "instance.idp.gitlab.selfhosted.added", "instance.idp.google.added", "instance.idp.ldap.added", "instance.idp.config.apple.added", "instance.idp.saml.added"}, @@ -1203,7 +1201,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1237,7 +1235,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), []byte(`{"name": "gigi"}`), "gigi", @@ -1271,7 +1269,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1285,7 +1283,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(24), - decimal.NewFromInt(124).String(), + float64(124), uint32(0), []byte(`{"name": "gigi"}`), "gigi", @@ -1319,7 +1317,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1333,7 +1331,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(24), - decimal.NewFromInt(124).String(), + float64(124), uint32(0), []byte(`{"name": "gigi"}`), "gigi", diff --git a/internal/v2/eventstore/query.go b/internal/v2/eventstore/query.go index f7a30a2139..c9b3cecd37 100644 --- a/internal/v2/eventstore/query.go +++ b/internal/v2/eventstore/query.go @@ -7,8 +7,6 @@ import ( "slices" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" ) @@ -725,7 +723,7 @@ func (pc *PositionCondition) Min() *GlobalPosition { // PositionGreater prepares the condition as follows // if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is NOT set: position > -func PositionGreater(position decimal.Decimal, inPositionOrder uint32) paginationOpt { +func PositionGreater(position float64, inPositionOrder uint32) paginationOpt { return func(p *Pagination) { p.ensurePosition() p.position.min = &GlobalPosition{ @@ -745,7 +743,7 @@ func GlobalPositionGreater(position *GlobalPosition) paginationOpt { // PositionLess prepares the condition as follows // if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is NOT set: position > -func PositionLess(position decimal.Decimal, inPositionOrder uint32) paginationOpt { +func PositionLess(position float64, inPositionOrder uint32) paginationOpt { return func(p *Pagination) { p.ensurePosition() p.position.max = &GlobalPosition{ diff --git a/internal/v2/eventstore/query_test.go b/internal/v2/eventstore/query_test.go index 0f313e9560..00c08914c1 100644 --- a/internal/v2/eventstore/query_test.go +++ b/internal/v2/eventstore/query_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" ) @@ -76,13 +74,13 @@ func TestPaginationOpt(t *testing.T) { name: "global position greater", args: args{ opts: []paginationOpt{ - GlobalPositionGreater(&GlobalPosition{Position: decimal.NewFromInt(10)}), + GlobalPositionGreater(&GlobalPosition{Position: 10}), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 0, }, }, @@ -92,13 +90,13 @@ func TestPaginationOpt(t *testing.T) { name: "position greater", args: args{ opts: []paginationOpt{ - PositionGreater(decimal.NewFromInt(10), 0), + PositionGreater(10, 0), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 0, }, }, @@ -109,13 +107,13 @@ func TestPaginationOpt(t *testing.T) { name: "position less", args: args{ opts: []paginationOpt{ - PositionLess(decimal.NewFromInt(10), 12), + PositionLess(10, 12), }, }, want: &Pagination{ position: &PositionCondition{ max: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 12, }, }, @@ -125,13 +123,13 @@ func TestPaginationOpt(t *testing.T) { name: "global position less", args: args{ opts: []paginationOpt{ - GlobalPositionLess(&GlobalPosition{Position: decimal.NewFromInt(12), InPositionOrder: 24}), + GlobalPositionLess(&GlobalPosition{Position: 12, InPositionOrder: 24}), }, }, want: &Pagination{ position: &PositionCondition{ max: &GlobalPosition{ - Position: decimal.NewFromInt(12), + Position: 12, InPositionOrder: 24, }, }, @@ -142,19 +140,19 @@ func TestPaginationOpt(t *testing.T) { args: args{ opts: []paginationOpt{ PositionBetween( - &GlobalPosition{decimal.NewFromInt(10), 12}, - &GlobalPosition{decimal.NewFromInt(20), 0}, + &GlobalPosition{10, 12}, + &GlobalPosition{20, 0}, ), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 12, }, max: &GlobalPosition{ - Position: decimal.NewFromInt(20), + Position: 20, InPositionOrder: 0, }, }, diff --git a/internal/v2/readmodel/last_successful_mirror.go b/internal/v2/readmodel/last_successful_mirror.go index 6635b73342..80b436b896 100644 --- a/internal/v2/readmodel/last_successful_mirror.go +++ b/internal/v2/readmodel/last_successful_mirror.go @@ -1,8 +1,6 @@ package readmodel import ( - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/system" "github.com/zitadel/zitadel/internal/v2/system/mirror" @@ -10,7 +8,7 @@ import ( type LastSuccessfulMirror struct { ID string - Position decimal.Decimal + Position float64 source string } @@ -55,7 +53,7 @@ func (h *LastSuccessfulMirror) Reduce(events ...*eventstore.StorageEvent) (err e func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error { // if position is set we skip all older events - if h.Position.GreaterThan(decimal.NewFromInt(0)) { + if h.Position > 0 { return nil } diff --git a/internal/v2/system/mirror/succeeded.go b/internal/v2/system/mirror/succeeded.go index 34d74f184f..6d0fba2c25 100644 --- a/internal/v2/system/mirror/succeeded.go +++ b/internal/v2/system/mirror/succeeded.go @@ -1,8 +1,6 @@ package mirror import ( - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -11,7 +9,7 @@ type succeededPayload struct { // Source is the name of the database data are mirrored from Source string `json:"source"` // Position until data will be mirrored - Position decimal.Decimal `json:"position"` + Position float64 `json:"position"` } const SucceededType = eventTypePrefix + "succeeded" @@ -40,7 +38,7 @@ func SucceededEventFromStorage(event *eventstore.StorageEvent) (e *SucceededEven }, nil } -func NewSucceededCommand(source string, position decimal.Decimal) *eventstore.Command { +func NewSucceededCommand(source string, position float64) *eventstore.Command { return &eventstore.Command{ Action: eventstore.Action[any]{ Creator: Creator,