From 6bd706be983a6cb98182a95389f96200c09f218a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20M=C3=B6hlmann?= Date: Tue, 24 Sep 2024 19:43:29 +0300 Subject: [PATCH] fix(eventstore): revert precise decimal (#8527) (#8679) (cherry picked from commit aeb379e7deee3d6df293d0fdb8f00157184b7ec6) --- cmd/mirror/event.go | 10 ++-- cmd/mirror/event_store.go | 3 +- go.mod | 2 - go.sum | 4 -- internal/api/oidc/key.go | 15 +++--- internal/api/saml/certificate.go | 15 +++--- internal/database/cockroach/crdb.go | 7 --- internal/database/postgres/pg.go | 6 --- internal/eventstore/event.go | 4 +- internal/eventstore/event_base.go | 6 +-- internal/eventstore/eventstore.go | 11 ++--- .../eventstore/eventstore_querier_test.go | 16 +++---- internal/eventstore/eventstore_test.go | 17 ++++--- .../eventstore/handler/v2/field_handler.go | 9 ++-- internal/eventstore/handler/v2/handler.go | 19 ++++---- internal/eventstore/handler/v2/state.go | 8 ++-- internal/eventstore/handler/v2/state_test.go | 13 +++-- internal/eventstore/handler/v2/statement.go | 3 +- internal/eventstore/local_crdb_test.go | 29 ++--------- internal/eventstore/read_model.go | 22 ++++----- internal/eventstore/repository/event.go | 6 +-- .../repository/mock/repository.mock.go | 15 +++--- .../repository/mock/repository.mock.impl.go | 5 +- .../eventstore/repository/search_query.go | 6 +-- internal/eventstore/repository/sql/crdb.go | 13 ++--- .../eventstore/repository/sql/crdb_test.go | 4 +- internal/eventstore/repository/sql/query.go | 25 +++++----- .../eventstore/repository/sql/query_test.go | 27 +++++------ internal/eventstore/search_query.go | 16 +++---- internal/eventstore/search_query_test.go | 4 +- internal/eventstore/v1/models/event.go | 6 +-- internal/eventstore/v3/event.go | 7 ++- internal/query/access_token.go | 7 ++- internal/query/current_state.go | 11 ++--- internal/query/current_state_test.go | 8 ++-- internal/query/user_grant.go | 4 +- internal/query/user_membership.go | 4 +- internal/v2/database/number_filter.go | 3 +- internal/v2/eventstore/event_store.go | 6 +-- internal/v2/eventstore/postgres/push_test.go | 24 +++++----- internal/v2/eventstore/postgres/query_test.go | 48 +++++++++---------- internal/v2/eventstore/query.go | 6 +-- internal/v2/eventstore/query_test.go | 26 +++++----- .../v2/readmodel/last_successful_mirror.go | 6 +-- internal/v2/system/mirror/succeeded.go | 6 +-- load-test/Makefile | 21 ++++---- load-test/src/use_cases/manipulate_user.ts | 1 - 47 files changed, 215 insertions(+), 319 deletions(-) diff --git a/cmd/mirror/event.go b/cmd/mirror/event.go index af0ac25e27..2bb0d52f45 100644 --- a/cmd/mirror/event.go +++ b/cmd/mirror/event.go @@ -3,8 +3,6 @@ package mirror import ( "context" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/projection" "github.com/zitadel/zitadel/internal/v2/readmodel" @@ -32,12 +30,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore return lastSuccess, nil } -func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) { +func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) { var cmd *eventstore.Command if len(instanceIDs) > 0 { cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs) if err != nil { - return decimal.Decimal{}, err + return 0, err } } else { cmd = mirror_event.NewStartedSystemCommand(destination) @@ -60,12 +58,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i ), ) if err != nil { - return decimal.Decimal{}, err + return 0, err } return position.Position, nil } -func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error { +func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error { return destinationES.Push( ctx, eventstore.NewPushIntent( diff --git a/cmd/mirror/event_store.go b/cmd/mirror/event_store.go index 2eab4eb0da..23145bdc37 100644 --- a/cmd/mirror/event_store.go +++ b/cmd/mirror/event_store.go @@ -9,7 +9,6 @@ import ( "time" "github.com/jackc/pgx/v5/stdlib" - "github.com/shopspring/decimal" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/zitadel/logging" @@ -181,7 +180,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated") } -func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) { +func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) { joinedErrs := make([]error, 0, len(errs)) for err := range errs { joinedErrs = append(joinedErrs, err) diff --git a/go.mod b/go.mod index 0137251400..2d6c815520 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,6 @@ require ( github.com/h2non/gock v1.2.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/improbable-eng/grpc-web v0.15.0 - github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e github.com/jackc/pgx/v5 v5.6.0 github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 github.com/jinzhu/gorm v1.9.16 @@ -55,7 +54,6 @@ require ( github.com/rakyll/statik v0.1.7 github.com/rs/cors v1.11.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 - github.com/shopspring/decimal v1.4.0 github.com/sony/sonyflake v1.2.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 diff --git a/go.sum b/go.sum index de6c80d513..606f4048cc 100644 --- a/go.sum +++ b/go.sum @@ -405,8 +405,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e h1:i3gQ/Zo7sk4LUVbsAjTNeC4gIjoPNIZVzs4EXstssV4= -github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e/go.mod h1:zUHglCZ4mpDUPgIwqEKoba6+tcUQzRdb1+DPTuYe9pI= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= @@ -651,8 +649,6 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= -github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/internal/api/oidc/key.go b/internal/api/oidc/key.go index f7aa88409e..a7e156fe78 100644 --- a/internal/api/oidc/key.go +++ b/internal/api/oidc/key.go @@ -11,7 +11,6 @@ import ( "github.com/go-jose/go-jose/v4" "github.com/jonboulle/clockwork" "github.com/muhlemmer/gu" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/oidc/v3/pkg/op" @@ -351,14 +350,14 @@ func (o *OPStorage) getSigningKey(ctx context.Context) (op.SigningKey, error) { if len(keys.Keys) > 0 { return o.privateKeyToSigningKey(selectSigningKey(keys.Keys)) } - var position decimal.Decimal + var position float64 if keys.State != nil { position = keys.State.Position } return nil, o.refreshSigningKey(ctx, o.signingKeyAlgorithm, position) } -func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, position decimal.Decimal) error { +func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, position float64) error { ok, err := o.ensureIsLatestKey(ctx, position) if err != nil || !ok { return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date") @@ -370,12 +369,12 @@ func (o *OPStorage) refreshSigningKey(ctx context.Context, algorithm string, pos return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "") } -func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position decimal.Decimal) (bool, error) { +func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position float64) (bool, error) { maxSequence, err := o.getMaxKeySequence(ctx) if err != nil { return false, fmt.Errorf("error retrieving new events: %w", err) } - return position.GreaterThanOrEqual(maxSequence), nil + return position >= maxSequence, nil } func (o *OPStorage) privateKeyToSigningKey(key query.PrivateKey) (_ op.SigningKey, err error) { @@ -413,9 +412,9 @@ func (o *OPStorage) lockAndGenerateSigningKeyPair(ctx context.Context, algorithm return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), algorithm) } -func (o *OPStorage) getMaxKeySequence(ctx context.Context) (decimal.Decimal, error) { - return o.eventstore.LatestPosition(ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). +func (o *OPStorage) getMaxKeySequence(ctx context.Context) (float64, error) { + return o.eventstore.LatestSequence(ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). ResourceOwner(authz.GetInstance(ctx).InstanceID()). AwaitOpenTransactions(). AllowTimeTravel(). diff --git a/internal/api/saml/certificate.go b/internal/api/saml/certificate.go index 079655391e..2eac0e4d36 100644 --- a/internal/api/saml/certificate.go +++ b/internal/api/saml/certificate.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-jose/go-jose/v4" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/saml/pkg/provider/key" @@ -77,7 +76,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates)) } - var position decimal.Decimal + var position float64 if certs.State != nil { position = certs.State.Position } @@ -88,7 +87,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag func (p *Storage) refreshCertificate( ctx context.Context, usage crypto.KeyUsage, - position decimal.Decimal, + position float64, ) error { ok, err := p.ensureIsLatestCertificate(ctx, position) if err != nil { @@ -104,12 +103,12 @@ func (p *Storage) refreshCertificate( return nil } -func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position decimal.Decimal) (bool, error) { +func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position float64) (bool, error) { maxSequence, err := p.getMaxKeySequence(ctx) if err != nil { return false, fmt.Errorf("error retrieving new events: %w", err) } - return position.GreaterThanOrEqual(maxSequence), nil + return position >= maxSequence, nil } func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error { @@ -152,9 +151,9 @@ func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage cr } } -func (p *Storage) getMaxKeySequence(ctx context.Context) (decimal.Decimal, error) { - return p.eventstore.LatestPosition(ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). +func (p *Storage) getMaxKeySequence(ctx context.Context) (float64, error) { + return p.eventstore.LatestSequence(ctx, + eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). ResourceOwner(authz.GetInstance(ctx).InstanceID()). AwaitOpenTransactions(). AddQuery(). diff --git a/internal/database/cockroach/crdb.go b/internal/database/cockroach/crdb.go index 988f678e16..2f685f6d92 100644 --- a/internal/database/cockroach/crdb.go +++ b/internal/database/cockroach/crdb.go @@ -7,8 +7,6 @@ import ( "strings" "time" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/mitchellh/mapstructure" @@ -84,11 +82,6 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo return nil, err } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } - if connConfig.MaxOpenConns != 0 { config.MaxConns = int32(connConfig.MaxOpenConns) } diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index 24b31a55fb..ecafbe877a 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -7,8 +7,6 @@ import ( "strings" "time" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/mitchellh/mapstructure" @@ -84,10 +82,6 @@ func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpo if err != nil { return nil, err } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } if connConfig.MaxOpenConns != 0 { config.MaxConns = int32(connConfig.MaxOpenConns) diff --git a/internal/eventstore/event.go b/internal/eventstore/event.go index 656a02f33d..3df096f069 100644 --- a/internal/eventstore/event.go +++ b/internal/eventstore/event.go @@ -5,8 +5,6 @@ import ( "reflect" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/zerrors" ) @@ -46,7 +44,7 @@ type Event interface { // CreatedAt is the time the event was created at CreatedAt() time.Time // Position is the global position of the event - Position() decimal.Decimal + Position() float64 // Unmarshal parses the payload and stores the result // in the value pointed to by ptr. If ptr is nil or not a pointer, diff --git a/internal/eventstore/event_base.go b/internal/eventstore/event_base.go index 0cd5b6440c..c2b56128a8 100644 --- a/internal/eventstore/event_base.go +++ b/internal/eventstore/event_base.go @@ -5,8 +5,6 @@ import ( "encoding/json" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/service" ) @@ -23,7 +21,7 @@ type BaseEvent struct { Agg *Aggregate Seq uint64 - Pos decimal.Decimal + Pos float64 Creation time.Time previousAggregateSequence uint64 previousAggregateTypeSequence uint64 @@ -36,7 +34,7 @@ type BaseEvent struct { } // Position implements Event. -func (e *BaseEvent) Position() decimal.Decimal { +func (e *BaseEvent) Position() float64 { return e.Pos } diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 066a876da3..e456135828 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -8,7 +8,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -218,11 +217,11 @@ func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQu }) } -// LatestPosition filters the latest position for the given search query -func (es *Eventstore) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) { +// LatestSequence filters the latest sequence for the given search query +func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID()) - return es.querier.LatestPosition(ctx, queryFactory) + return es.querier.LatestSequence(ctx, queryFactory) } // InstanceIDs returns the instance ids found by the search query @@ -267,8 +266,8 @@ type Querier interface { Health(ctx context.Context) error // FilterToReducer calls r for every event returned from the storage FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error - // LatestPosition returns the latest position found by the search query - LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) + // LatestSequence returns the latest sequence found by the search query + LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) // InstanceIDs returns the instance ids found by the search query InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) } diff --git a/internal/eventstore/eventstore_querier_test.go b/internal/eventstore/eventstore_querier_test.go index 6a01c6fbf0..856bb4a20e 100644 --- a/internal/eventstore/eventstore_querier_test.go +++ b/internal/eventstore/eventstore_querier_test.go @@ -4,8 +4,6 @@ import ( "context" "testing" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" ) @@ -100,7 +98,7 @@ func TestCRDB_Filter(t *testing.T) { } } -func TestCRDB_LatestPosition(t *testing.T) { +func TestCRDB_LatestSequence(t *testing.T) { type args struct { searchQuery *eventstore.SearchQueryBuilder } @@ -108,7 +106,7 @@ func TestCRDB_LatestPosition(t *testing.T) { existingEvents []eventstore.Command } type res struct { - position decimal.Decimal + sequence float64 } tests := []struct { name string @@ -120,7 +118,7 @@ func TestCRDB_LatestPosition(t *testing.T) { { name: "aggregate type filter no sequence", args: args{ - searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). + searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). AddQuery(). AggregateTypes("not found"). Builder(), @@ -137,7 +135,7 @@ func TestCRDB_LatestPosition(t *testing.T) { { name: "aggregate type filter sequence", args: args{ - searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition). + searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). AddQuery(). AggregateTypes(eventstore.AggregateType(t.Name())). Builder(), @@ -171,12 +169,12 @@ func TestCRDB_LatestPosition(t *testing.T) { return } - position, err := db.LatestPosition(context.Background(), tt.args.searchQuery) + sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery) if (err != nil) != tt.wantErr { t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr) } - if tt.res.position.GreaterThan(position) { - t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.position, position) + if tt.res.sequence > sequence { + t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.sequence, sequence) } }) } diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 86f7809f24..53ef4e54cf 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/service" @@ -391,7 +390,7 @@ func (repo *testPusher) Push(ctx context.Context, commands ...Command) (events [ type testQuerier struct { events []Event - sequence decimal.Decimal + sequence float64 instances []string err error t *testing.T @@ -424,9 +423,9 @@ func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *Searc return nil } -func (repo *testQuerier) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) { +func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { if repo.err != nil { - return decimal.Decimal{}, repo.err + return 0, repo.err } return repo.sequence, nil } @@ -1056,7 +1055,7 @@ func TestEventstore_FilterEvents(t *testing.T) { } } -func TestEventstore_LatestPosition(t *testing.T) { +func TestEventstore_LatestSequence(t *testing.T) { type args struct { query *SearchQueryBuilder } @@ -1076,7 +1075,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "no events", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1099,7 +1098,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "repo error", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1122,7 +1121,7 @@ func TestEventstore_LatestPosition(t *testing.T) { name: "found events", args: args{ query: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, queries: []*SearchQuery{ { builder: &SearchQueryBuilder{}, @@ -1148,7 +1147,7 @@ func TestEventstore_LatestPosition(t *testing.T) { querier: tt.fields.repo, } - _, err := es.LatestPosition(context.Background(), tt.args.query) + _, err := es.LatestSequence(context.Background(), tt.args.query) if (err != nil) != tt.res.wantErr { t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr) } diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index 2c371d67ec..8b71f32519 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -8,7 +8,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/eventstore" ) @@ -124,7 +123,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) return additionalIteration, err } // stop execution if currentState.eventTimestamp >= config.maxCreatedAt - if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) { + if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } @@ -157,7 +156,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState idx, offset := skipPreviouslyReducedEvents(events, currentState) - if currentState.position.Equal(events[len(events)-1].Position()) { + if currentState.position == events[len(events)-1].Position() { offset += currentState.offset } currentState.position = events[len(events)-1].Position() @@ -187,9 +186,9 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState } func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) { - var position decimal.Decimal + var position float64 for i, event := range events { - if !event.Position().Equal(position) { + if event.Position() != position { offset = 0 position = event.Position() } diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index aaefec2e9b..b395035b8f 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -4,13 +4,13 @@ import ( "context" "database/sql" "errors" + "math" "math/rand" "slices" "sync" "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -379,7 +379,7 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) { type triggerConfig struct { awaitRunning bool - maxPosition decimal.Decimal + maxPosition float64 } type TriggerOpt func(conf *triggerConfig) @@ -390,7 +390,7 @@ func WithAwaitRunning() TriggerOpt { } } -func WithMaxPosition(position decimal.Decimal) TriggerOpt { +func WithMaxPosition(position float64) TriggerOpt { return func(conf *triggerConfig) { conf.maxPosition = position } @@ -500,7 +500,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add return additionalIteration, err } // stop execution if currentState.eventTimestamp >= config.maxCreatedAt - if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) { + if config.maxPosition != 0 && currentState.position >= config.maxPosition { return false, nil } @@ -576,7 +576,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int { for i, statement := range statements { - if statement.Position.Equal(currentState.position) && + if statement.Position == currentState.position && statement.AggregateID == currentState.aggregateID && statement.AggregateType == currentState.aggregateType && statement.Sequence == currentState.sequence { @@ -609,14 +609,14 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, currentState return nil } - _, err = tx.ExecContext(ctx, "SAVEPOINT exec") + _, err = tx.Exec("SAVEPOINT exec") if err != nil { h.log().WithError(err).Debug("create savepoint failed") return err } var shouldContinue bool defer func() { - _, errSave := tx.ExecContext(ctx, "RELEASE SAVEPOINT exec") + _, errSave := tx.Exec("RELEASE SAVEPOINT exec") if err == nil { err = errSave } @@ -644,8 +644,9 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder OrderAsc(). InstanceID(currentState.instanceID) - if currentState.position.GreaterThan(decimal.Decimal{}) { - builder = builder.PositionGreaterEqual(currentState.position) + if currentState.position > 0 { + // decrease position by 10 because builder.PositionAfter filters for position > and we need position >= + builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10)) if currentState.offset > 0 { builder = builder.Offset(currentState.offset) } diff --git a/internal/eventstore/handler/v2/state.go b/internal/eventstore/handler/v2/state.go index c4afaed204..d3b6953488 100644 --- a/internal/eventstore/handler/v2/state.go +++ b/internal/eventstore/handler/v2/state.go @@ -7,8 +7,6 @@ import ( "errors" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" @@ -16,7 +14,7 @@ import ( type state struct { instanceID string - position decimal.Decimal + position float64 eventTimestamp time.Time aggregateType eventstore.AggregateType aggregateID string @@ -47,7 +45,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC aggregateType = new(sql.NullString) sequence = new(sql.NullInt64) timestamp = new(sql.NullTime) - position = new(decimal.NullDecimal) + position = new(sql.NullFloat64) offset = new(sql.NullInt64) ) @@ -77,7 +75,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC currentState.aggregateType = eventstore.AggregateType(aggregateType.String) currentState.sequence = uint64(sequence.Int64) currentState.eventTimestamp = timestamp.Time - currentState.position = position.Decimal + currentState.position = position.Float64 // psql does not provide unsigned numbers so we work around it currentState.offset = uint32(offset.Int64) return currentState, nil diff --git a/internal/eventstore/handler/v2/state_test.go b/internal/eventstore/handler/v2/state_test.go index ef91d78e55..cc5fb1fbab 100644 --- a/internal/eventstore/handler/v2/state_test.go +++ b/internal/eventstore/handler/v2/state_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database/mock" @@ -167,7 +166,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, }, }, isErr: func(t *testing.T, err error) { @@ -193,7 +192,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, }, }, isErr: func(t *testing.T, err error) { @@ -218,7 +217,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { eventstore.AggregateType("aggregate type"), uint64(42), mock.AnyType[time.Time]{}, - decimal.NewFromInt(42), + float64(42), uint32(0), ), mock.WithExecRowsAffected(1), @@ -229,7 +228,7 @@ func TestHandler_updateLastUpdated(t *testing.T) { updatedState: &state{ instanceID: "instance", eventTimestamp: time.Now(), - position: decimal.NewFromInt(42), + position: 42, aggregateType: "aggregate type", aggregateID: "aggregate id", sequence: 42, @@ -398,7 +397,7 @@ func TestHandler_currentState(t *testing.T) { "aggregate type", int64(42), testTime, - decimal.NewFromInt(42).String(), + float64(42), uint16(10), }, }, @@ -413,7 +412,7 @@ func TestHandler_currentState(t *testing.T) { currentState: &state{ instanceID: "instance", eventTimestamp: testTime, - position: decimal.NewFromInt(42), + position: 42, aggregateType: "aggregate type", aggregateID: "aggregate id", sequence: 42, diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index 4bc660d9f9..207f3d0f58 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "golang.org/x/exp/constraints" @@ -84,7 +83,7 @@ type Statement struct { AggregateType eventstore.AggregateType AggregateID string Sequence uint64 - Position decimal.Decimal + Position float64 CreationDate time.Time InstanceID string diff --git a/internal/eventstore/local_crdb_test.go b/internal/eventstore/local_crdb_test.go index e46509ba9f..6df9e9fd29 100644 --- a/internal/eventstore/local_crdb_test.go +++ b/internal/eventstore/local_crdb_test.go @@ -2,16 +2,13 @@ package eventstore_test import ( "context" + "database/sql" "encoding/json" "os" "testing" "time" "github.com/cockroachdb/cockroach-go/v2/testserver" - pgxdecimal "github.com/jackc/pgx-shopspring-decimal" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/jackc/pgx/v5/stdlib" "github.com/zitadel/logging" "github.com/zitadel/zitadel/cmd/initialise" @@ -42,19 +39,10 @@ func TestMain(m *testing.M) { testCRDBClient = &database.DB{ Database: new(testDB), } - config, err := pgxpool.ParseConfig(ts.PGURL().String()) + testCRDBClient.DB, err = sql.Open("postgres", ts.PGURL().String()) if err != nil { - logging.WithFields("error", err).Fatal("unable to parse db config") + logging.WithFields("error", err).Fatal("unable to connect to db") } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } - pool, err := pgxpool.NewWithConfig(context.Background(), config) - if err != nil { - logging.WithFields("error", err).Fatal("unable to create db pool") - } - testCRDBClient.DB = stdlib.OpenDBFromPool(pool) if err = testCRDBClient.Ping(); err != nil { logging.WithFields("error", err).Fatal("unable to ping db") } @@ -115,19 +103,10 @@ func initDB(db *database.DB) error { } func connectLocalhost() (*database.DB, error) { - config, err := pgxpool.ParseConfig("postgresql://root@localhost:26257/defaultdb?sslmode=disable") + client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable") if err != nil { return nil, err } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - pgxdecimal.Register(conn.TypeMap()) - return nil - } - pool, err := pgxpool.NewWithConfig(context.Background(), config) - if err != nil { - return nil, err - } - client := stdlib.OpenDBFromPool(pool) if err = client.Ping(); err != nil { return nil, err } diff --git a/internal/eventstore/read_model.go b/internal/eventstore/read_model.go index ae77275732..d2c755cc3a 100644 --- a/internal/eventstore/read_model.go +++ b/internal/eventstore/read_model.go @@ -1,23 +1,19 @@ package eventstore -import ( - "time" - - "github.com/shopspring/decimal" -) +import "time" // ReadModel is the minimum representation of a read model. // It implements a basic reducer // it might be saved in a database or in memory type ReadModel struct { - AggregateID string `json:"-"` - ProcessedSequence uint64 `json:"-"` - CreationDate time.Time `json:"-"` - ChangeDate time.Time `json:"-"` - Events []Event `json:"-"` - ResourceOwner string `json:"-"` - InstanceID string `json:"-"` - Position decimal.Decimal `json:"-"` + AggregateID string `json:"-"` + ProcessedSequence uint64 `json:"-"` + CreationDate time.Time `json:"-"` + ChangeDate time.Time `json:"-"` + Events []Event `json:"-"` + ResourceOwner string `json:"-"` + InstanceID string `json:"-"` + Position float64 `json:"-"` } // AppendEvents adds all the events to the read model. diff --git a/internal/eventstore/repository/event.go b/internal/eventstore/repository/event.go index 2f4c1d8843..57b85f15ba 100644 --- a/internal/eventstore/repository/event.go +++ b/internal/eventstore/repository/event.go @@ -5,8 +5,6 @@ import ( "encoding/json" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" ) @@ -20,7 +18,7 @@ type Event struct { // Seq is the sequence of the event Seq uint64 // Pos is the global sequence of the event multiple events can have the same sequence - Pos decimal.Decimal + Pos float64 //CreationDate is the time the event is created // it's used for human readability. @@ -93,7 +91,7 @@ func (e *Event) Sequence() uint64 { } // Position implements [eventstore.Event] -func (e *Event) Position() decimal.Decimal { +func (e *Event) Position() float64 { return e.Pos } diff --git a/internal/eventstore/repository/mock/repository.mock.go b/internal/eventstore/repository/mock/repository.mock.go index ebd5f501a2..a854de2995 100644 --- a/internal/eventstore/repository/mock/repository.mock.go +++ b/internal/eventstore/repository/mock/repository.mock.go @@ -13,7 +13,6 @@ import ( context "context" reflect "reflect" - decimal "github.com/shopspring/decimal" eventstore "github.com/zitadel/zitadel/internal/eventstore" gomock "go.uber.org/mock/gomock" ) @@ -84,19 +83,19 @@ func (mr *MockQuerierMockRecorder) InstanceIDs(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1) } -// LatestPosition mocks base method. -func (m *MockQuerier) LatestPosition(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (decimal.Decimal, error) { +// LatestSequence mocks base method. +func (m *MockQuerier) LatestSequence(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (float64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LatestPosition", arg0, arg1) - ret0, _ := ret[0].(decimal.Decimal) + ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1) + ret0, _ := ret[0].(float64) ret1, _ := ret[1].(error) return ret0, ret1 } -// LatestPosition indicates an expected call of LatestPosition. -func (mr *MockQuerierMockRecorder) LatestPosition(arg0, arg1 any) *gomock.Call { +// LatestSequence indicates an expected call of LatestSequence. +func (mr *MockQuerierMockRecorder) LatestSequence(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestPosition", reflect.TypeOf((*MockQuerier)(nil).LatestPosition), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockQuerier)(nil).LatestSequence), arg0, arg1) } // MockPusher is a mock of Pusher interface. diff --git a/internal/eventstore/repository/mock/repository.mock.impl.go b/internal/eventstore/repository/mock/repository.mock.impl.go index 41ad4befd3..d41521ad8f 100644 --- a/internal/eventstore/repository/mock/repository.mock.impl.go +++ b/internal/eventstore/repository/mock/repository.mock.impl.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -187,8 +186,8 @@ func (e *mockEvent) Sequence() uint64 { return e.sequence } -func (e *mockEvent) Position() decimal.Decimal { - return decimal.Decimal{} +func (e *mockEvent) Position() float64 { + return 0 } func (e *mockEvent) CreatedAt() time.Time { diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index b28574cc84..39cca8b149 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -55,8 +55,6 @@ const ( //OperationNotIn checks if a stored value does not match one of the passed value list OperationNotIn - OperationGreaterEqual - operationCount ) @@ -234,10 +232,10 @@ func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuer } func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter { - if builder.GetPositionAfter().IsZero() { + if builder.GetPositionAfter() == 0 { return nil } - query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreaterEqual) + query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater) return query.Position } diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index c778015497..a60a2ef7b8 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -11,7 +11,6 @@ import ( "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -266,11 +265,11 @@ func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.S return err } -// LatestPosition returns the latest position found by the search query -func (db *CRDB) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) { - var position decimal.Decimal +// LatestSequence returns the latest sequence found by the search query +func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) { + var position sql.NullFloat64 err := query(ctx, db, searchQuery, &position, false) - return position, err + return position.Float64, err } // InstanceIDs returns the instance ids found by the search query @@ -337,7 +336,7 @@ func (db *CRDB) eventQuery(useV1 bool) string { " FROM eventstore.events2" } -func (db *CRDB) maxPositionQuery(useV1 bool) string { +func (db *CRDB) maxSequenceQuery(useV1 bool) string { if useV1 { return `SELECT event_sequence FROM eventstore.events` } @@ -415,8 +414,6 @@ func (db *CRDB) operation(operation repository.Operation) string { return "=" case repository.OperationGreater: return ">" - case repository.OperationGreaterEqual: - return ">=" case repository.OperationLess: return "<" case repository.OperationJSONContains: diff --git a/internal/eventstore/repository/sql/crdb_test.go b/internal/eventstore/repository/sql/crdb_test.go index aae2fde78d..a3f3331a82 100644 --- a/internal/eventstore/repository/sql/crdb_test.go +++ b/internal/eventstore/repository/sql/crdb_test.go @@ -4,8 +4,6 @@ import ( "database/sql" "testing" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/repository" ) @@ -314,7 +312,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev ResourceOwner: sql.NullString{String: "ro", Valid: true}, Typ: "test.created", Version: "v1", - Pos: decimal.NewFromInt(42), + Pos: 42, } for _, opt := range opts { diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index c20bb62275..3cddcb7924 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/call" @@ -26,7 +25,7 @@ type querier interface { conditionFormat(repository.Operation) string placeholder(query string) string eventQuery(useV1 bool) string - maxPositionQuery(useV1 bool) string + maxSequenceQuery(useV1 bool) string instanceIDsQuery(useV1 bool) string db() *database.DB orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string @@ -75,7 +74,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search // instead of using the max function of the database (which doesn't work for postgres) // we select the most recent row - if q.Columns == eventstore.ColumnsMaxPosition { + if q.Columns == eventstore.ColumnsMaxSequence { q.Limit = 1 q.Desc = true } @@ -92,7 +91,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search switch q.Columns { case eventstore.ColumnsEvent, - eventstore.ColumnsMaxPosition: + eventstore.ColumnsMaxSequence: query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1) } @@ -136,8 +135,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) { switch columns { - case eventstore.ColumnsMaxPosition: - return criteria.maxPositionQuery(useV1), maxPositionScanner + case eventstore.ColumnsMaxSequence: + return criteria.maxSequenceQuery(useV1), maxSequenceScanner case eventstore.ColumnsInstanceIDs: return criteria.instanceIDsQuery(useV1), instanceIDsScanner case eventstore.ColumnsEvent: @@ -155,15 +154,13 @@ func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string return criteria.Timetravel(took) } -func maxPositionScanner(row scan, dest interface{}) (err error) { - position, ok := dest.(*decimal.Decimal) +func maxSequenceScanner(row scan, dest interface{}) (err error) { + position, ok := dest.(*sql.NullFloat64) if !ok { - return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be decimal.Decimal got: %T", dest) + return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest) } - var res decimal.NullDecimal - err = row(&res) + err = row(position) if err == nil || errors.Is(err, sql.ErrNoRows) { - *position = res.Decimal return nil } return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong") @@ -192,7 +189,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest) } event := new(repository.Event) - position := new(decimal.NullDecimal) + position := new(sql.NullFloat64) if useV1 { err = scanner( @@ -229,7 +226,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) logging.New().WithError(err).Warn("unable to scan row") return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row") } - event.Pos = position.Decimal + event.Pos = position.Float64 return reduce(event) } } diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index 654fa6d0b5..5d54b27c21 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" - "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/zitadel/zitadel/internal/database" @@ -110,36 +109,36 @@ func Test_prepareColumns(t *testing.T) { { name: "max column", args: args{ - columns: eventstore.ColumnsMaxPosition, - dest: new(decimal.Decimal), + columns: eventstore.ColumnsMaxSequence, + dest: new(sql.NullFloat64), useV1: true, }, res: res{ query: `SELECT event_sequence FROM eventstore.events`, - expected: decimal.NewFromInt(42), + expected: sql.NullFloat64{Float64: 43, Valid: true}, }, fields: fields{ - dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))}, + dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, }, }, { name: "max column v2", args: args{ - columns: eventstore.ColumnsMaxPosition, - dest: new(decimal.Decimal), + columns: eventstore.ColumnsMaxSequence, + dest: new(sql.NullFloat64), }, res: res{ query: `SELECT "position" FROM eventstore.events2`, - expected: decimal.NewFromInt(42), + expected: sql.NullFloat64{Float64: 43, Valid: true}, }, fields: fields{ - dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))}, + dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, }, }, { name: "max sequence wrong dest type", args: args{ - columns: eventstore.ColumnsMaxPosition, + columns: eventstore.ColumnsMaxSequence, dest: new(uint64), }, res: res{ @@ -179,11 +178,11 @@ func Test_prepareColumns(t *testing.T) { res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"}, }, }, fields: fields{ - dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, + dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, }, }, { @@ -198,11 +197,11 @@ func Test_prepareColumns(t *testing.T) { res: res{ query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, expected: []eventstore.Event{ - &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"}, + &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"}, }, }, fields: fields{ - dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, + dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, }, }, { diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index c7f9a65da3..0c08a260eb 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -5,8 +5,6 @@ import ( "database/sql" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -25,7 +23,7 @@ type SearchQueryBuilder struct { queries []*SearchQuery tx *sql.Tx allowTimeTravel bool - positionGreaterEqual decimal.Decimal + positionAfter float64 awaitOpenTransactions bool creationDateAfter time.Time creationDateBefore time.Time @@ -76,8 +74,8 @@ func (b *SearchQueryBuilder) GetAllowTimeTravel() bool { return b.allowTimeTravel } -func (b SearchQueryBuilder) GetPositionAfter() decimal.Decimal { - return b.positionGreaterEqual +func (b SearchQueryBuilder) GetPositionAfter() float64 { + return b.positionAfter } func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool { @@ -133,8 +131,8 @@ type Columns int8 const ( //ColumnsEvent represents all fields of an event ColumnsEvent = iota + 1 - // ColumnsMaxPosition represents the latest sequence of the filtered events - ColumnsMaxPosition + // ColumnsMaxSequence represents the latest sequence of the filtered events + ColumnsMaxSequence // ColumnsInstanceIDs represents the instance ids of the filtered events ColumnsInstanceIDs @@ -269,8 +267,8 @@ func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder { } // PositionAfter filters for events which happened after the specified time -func (builder *SearchQueryBuilder) PositionGreaterEqual(position decimal.Decimal) *SearchQueryBuilder { - builder.positionGreaterEqual = position +func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder { + builder.positionAfter = position return builder } diff --git a/internal/eventstore/search_query_test.go b/internal/eventstore/search_query_test.go index da8acf878d..8c654911ea 100644 --- a/internal/eventstore/search_query_test.go +++ b/internal/eventstore/search_query_test.go @@ -116,10 +116,10 @@ func TestSearchQuerybuilderSetters(t *testing.T) { { name: "set columns", args: args{ - setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxPosition)}, + setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxSequence)}, }, res: &SearchQueryBuilder{ - columns: ColumnsMaxPosition, + columns: ColumnsMaxSequence, }, }, { diff --git a/internal/eventstore/v1/models/event.go b/internal/eventstore/v1/models/event.go index ab2b608872..8c50d64da0 100644 --- a/internal/eventstore/v1/models/event.go +++ b/internal/eventstore/v1/models/event.go @@ -5,8 +5,6 @@ import ( "reflect" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -22,7 +20,7 @@ var _ eventstore.Event = (*Event)(nil) type Event struct { ID string Seq uint64 - Pos decimal.Decimal + Pos float64 CreationDate time.Time Typ eventstore.EventType PreviousSequence uint64 @@ -82,7 +80,7 @@ func (e *Event) Sequence() uint64 { } // Position implements [eventstore.Event] -func (e *Event) Position() decimal.Decimal { +func (e *Event) Position() float64 { return e.Pos } diff --git a/internal/eventstore/v3/event.go b/internal/eventstore/v3/event.go index f489d98396..e1c95f13ff 100644 --- a/internal/eventstore/v3/event.go +++ b/internal/eventstore/v3/event.go @@ -4,7 +4,6 @@ import ( "encoding/json" "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -22,7 +21,7 @@ type event struct { typ eventstore.EventType createdAt time.Time sequence uint64 - position decimal.Decimal + position float64 payload Payload } @@ -85,8 +84,8 @@ func (e *event) Sequence() uint64 { return e.sequence } -// Position implements [eventstore.Event] -func (e *event) Position() decimal.Decimal { +// Sequence implements [eventstore.Event] +func (e *event) Position() float64 { return e.position } diff --git a/internal/query/access_token.go b/internal/query/access_token.go index a5edc068cd..4180a6ad5e 100644 --- a/internal/query/access_token.go +++ b/internal/query/access_token.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/shopspring/decimal" "golang.org/x/text/language" "github.com/zitadel/zitadel/internal/domain" @@ -141,7 +140,7 @@ func (q *Queries) accessTokenByOIDCSessionAndTokenID(ctx context.Context, oidcSe // checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination) // occurred after a certain time and will return an error if so. -func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position decimal.Decimal, fingerprintID string) (err error) { +func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position float64, fingerprintID string) (err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() @@ -163,7 +162,7 @@ func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, } type sessionTerminatedModel struct { - position decimal.Decimal + position float64 sessionID string userID string fingerPrintID string @@ -183,7 +182,7 @@ func (s *sessionTerminatedModel) AppendEvents(events ...eventstore.Event) { func (s *sessionTerminatedModel) Query() *eventstore.SearchQueryBuilder { query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - PositionGreaterEqual(s.position). + PositionAfter(s.position). AddQuery(). AggregateTypes(session.AggregateType). AggregateIDs(s.sessionID). diff --git a/internal/query/current_state.go b/internal/query/current_state.go index 790b594c2d..29497e6eec 100644 --- a/internal/query/current_state.go +++ b/internal/query/current_state.go @@ -10,7 +10,6 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/call" @@ -27,7 +26,7 @@ type Stateful interface { type State struct { LastRun time.Time - Position decimal.Decimal + Position float64 EventCreatedAt time.Time AggregateID string AggregateType eventstore.AggregateType @@ -222,7 +221,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild var ( creationDate sql.NullTime lastUpdated sql.NullTime - position decimal.NullDecimal + position sql.NullFloat64 ) err := row.Scan( &creationDate, @@ -235,7 +234,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild return &State{ EventCreatedAt: creationDate.Time, LastRun: lastUpdated.Time, - Position: position.Decimal, + Position: position.Float64, }, nil } } @@ -260,7 +259,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec var ( lastRun sql.NullTime eventDate sql.NullTime - currentPosition decimal.NullDecimal + currentPosition sql.NullFloat64 aggregateType sql.NullString aggregateID sql.NullString sequence sql.NullInt64 @@ -281,7 +280,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec } currentState.State.EventCreatedAt = eventDate.Time currentState.State.LastRun = lastRun.Time - currentState.Position = currentPosition.Decimal + currentState.Position = currentPosition.Float64 currentState.AggregateType = eventstore.AggregateType(aggregateType.String) currentState.AggregateID = aggregateID.String currentState.Sequence = uint64(sequence.Int64) diff --git a/internal/query/current_state_test.go b/internal/query/current_state_test.go index f17509aa9c..c76dae710e 100644 --- a/internal/query/current_state_test.go +++ b/internal/query/current_state_test.go @@ -7,8 +7,6 @@ import ( "fmt" "regexp" "testing" - - "github.com/shopspring/decimal" ) var ( @@ -89,7 +87,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { State: State{ EventCreatedAt: testNow, LastRun: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, AggregateID: "agg-id", AggregateType: "agg-type", Sequence: 20211108, @@ -136,7 +134,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { ProjectionName: "projection-name", State: State{ EventCreatedAt: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, LastRun: testNow, AggregateID: "agg-id", AggregateType: "agg-type", @@ -147,7 +145,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) { ProjectionName: "projection-name2", State: State{ EventCreatedAt: testNow, - Position: decimal.NewFromInt(20211108), + Position: 20211108, LastRun: testNow, AggregateID: "agg-id", AggregateType: "agg-type", diff --git a/internal/query/user_grant.go b/internal/query/user_grant.go index e2bbabdc72..265d8eaae1 100644 --- a/internal/query/user_grant.go +++ b/internal/query/user_grant.go @@ -281,7 +281,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh return nil, zerrors.ThrowInternal(err, "QUERY-wXnQR", "Errors.Query.SQLStatement") } - latestState, err := q.latestState(ctx, userGrantTable) + latestSequence, err := q.latestState(ctx, userGrantTable) if err != nil { return nil, err } @@ -294,7 +294,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh return nil, err } - grants.State = latestState + grants.State = latestSequence return grants, nil } diff --git a/internal/query/user_membership.go b/internal/query/user_membership.go index a08617d57b..7ba2629cfa 100644 --- a/internal/query/user_membership.go +++ b/internal/query/user_membership.go @@ -144,7 +144,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer if err != nil { return nil, zerrors.ThrowInvalidArgument(err, "QUERY-T84X9", "Errors.Query.InvalidRequest") } - latestState, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable) + latestSequence, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable) if err != nil { return nil, err } @@ -157,7 +157,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer if err != nil { return nil, err } - memberships.State = latestState + memberships.State = latestSequence return memberships, nil } diff --git a/internal/v2/database/number_filter.go b/internal/v2/database/number_filter.go index 4853806457..ce263ceeee 100644 --- a/internal/v2/database/number_filter.go +++ b/internal/v2/database/number_filter.go @@ -3,7 +3,6 @@ package database import ( "time" - "github.com/shopspring/decimal" "github.com/zitadel/logging" "golang.org/x/exp/constraints" ) @@ -95,7 +94,7 @@ func (c numberCompare) String() string { } type number interface { - constraints.Integer | constraints.Float | time.Time | decimal.Decimal + constraints.Integer | constraints.Float | time.Time // TODO: condition must know if it's args are named parameters or not // constraints.Integer | constraints.Float | time.Time | placeholder } diff --git a/internal/v2/eventstore/event_store.go b/internal/v2/eventstore/event_store.go index e89786c657..cc447c5e15 100644 --- a/internal/v2/eventstore/event_store.go +++ b/internal/v2/eventstore/event_store.go @@ -2,8 +2,6 @@ package eventstore import ( "context" - - "github.com/shopspring/decimal" ) func NewEventstore(querier Querier, pusher Pusher) *EventStore { @@ -32,12 +30,12 @@ type healthier interface { } type GlobalPosition struct { - Position decimal.Decimal + Position float64 InPositionOrder uint32 } func (gp GlobalPosition) IsLess(other GlobalPosition) bool { - return gp.Position.LessThan(other.Position) || (gp.Position.Equal(other.Position) && gp.InPositionOrder < other.InPositionOrder) + return gp.Position < other.Position || (gp.Position == other.Position && gp.InPositionOrder < other.InPositionOrder) } type Reducer interface { diff --git a/internal/v2/eventstore/postgres/push_test.go b/internal/v2/eventstore/postgres/push_test.go index 6f8b224c41..91fdc1fcd7 100644 --- a/internal/v2/eventstore/postgres/push_test.go +++ b/internal/v2/eventstore/postgres/push_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/zerrors" @@ -820,7 +818,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -901,11 +899,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -986,11 +984,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -1046,7 +1044,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -1101,7 +1099,7 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, }, ), @@ -1183,11 +1181,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), @@ -1274,11 +1272,11 @@ func Test_push(t *testing.T) { [][]driver.Value{ { time.Now(), - decimal.NewFromFloat(123).String(), + float64(123), }, { time.Now(), - decimal.NewFromFloat(123.1).String(), + float64(123.1), }, }, ), diff --git a/internal/v2/eventstore/postgres/query_test.go b/internal/v2/eventstore/postgres/query_test.go index 34b73bd820..56f506ac50 100644 --- a/internal/v2/eventstore/postgres/query_test.go +++ b/internal/v2/eventstore/postgres/query_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/eventstore" @@ -543,13 +541,13 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0), + eventstore.PositionGreater(123.4, 0), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND position > $2 ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(123.4)}, + args: []any{"i1", 123.4}, }, }, { @@ -557,18 +555,18 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - // eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0), + // eventstore.PositionGreater(123.4, 0), // eventstore.PositionLess(125.4, 10), eventstore.PositionBetween( - &eventstore.GlobalPosition{Position: decimal.NewFromFloat(123.4)}, - &eventstore.GlobalPosition{Position: decimal.NewFromFloat(125.4), InPositionOrder: 10}, + &eventstore.GlobalPosition{Position: 123.4}, + &eventstore.GlobalPosition{Position: 125.4, InPositionOrder: 10}, ), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order < $3) OR position < $4) AND position > $5 ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(125.4), uint32(10), decimal.NewFromFloat(125.4), decimal.NewFromFloat(123.4)}, + args: []any{"i1", 125.4, uint32(10), 125.4, 123.4}, // TODO: (adlerhurst) would require some refactoring to reuse existing args // query: " WHERE instance_id = $1 AND position > $2 AND ((position = $3 AND in_tx_order < $4) OR position < $3) ORDER BY position, in_tx_order", // args: []any{"i1", 123.4, 125.4, uint32(10)}, @@ -579,13 +577,13 @@ func Test_writeFilter(t *testing.T) { args: args{ filter: eventstore.NewFilter( eventstore.FilterPagination( - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order", - args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4)}, + args: []any{"i1", 123.4, uint32(12), 123.4}, }, }, { @@ -595,13 +593,13 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order LIMIT $5 OFFSET $6", - args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, { @@ -611,14 +609,14 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), eventstore.AppendAggregateFilter("user"), ), }, want: wantQuery{ query: " WHERE instance_id = $1 AND aggregate_type = $2 AND ((position = $3 AND in_tx_order > $4) OR position > $5) ORDER BY position, in_tx_order LIMIT $6 OFFSET $7", - args: []any{"i1", "user", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", "user", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, { @@ -628,7 +626,7 @@ func Test_writeFilter(t *testing.T) { eventstore.FilterPagination( eventstore.Limit(10), eventstore.Offset(3), - eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12), + eventstore.PositionGreater(123.4, 12), ), eventstore.AppendAggregateFilter("user"), eventstore.AppendAggregateFilter( @@ -639,7 +637,7 @@ func Test_writeFilter(t *testing.T) { }, want: wantQuery{ query: " WHERE instance_id = $1 AND (aggregate_type = $2 OR (aggregate_type = $3 AND aggregate_id = $4)) AND ((position = $5 AND in_tx_order > $6) OR position > $7) ORDER BY position, in_tx_order LIMIT $8 OFFSET $9", - args: []any{"i1", "user", "org", "o1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)}, + args: []any{"i1", "user", "org", "o1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, }, }, } @@ -958,7 +956,7 @@ func Test_writeQueryUse_examples(t *testing.T) { ), eventstore.FilterPagination( // used because we need to check for first login and an app which is not console - eventstore.PositionGreater(decimal.NewFromInt(12), 4), + eventstore.PositionGreater(12, 4), ), ), eventstore.NewFilter( @@ -1067,9 +1065,9 @@ func Test_writeQueryUse_examples(t *testing.T) { "instance", "user", "user.token.added", - decimal.NewFromInt(12), + float64(12), uint32(4), - decimal.NewFromInt(12), + float64(12), "instance", "instance", []string{"instance.idp.config.added", "instance.idp.oauth.added", "instance.idp.oidc.added", "instance.idp.jwt.added", "instance.idp.azure.added", "instance.idp.github.added", "instance.idp.github.enterprise.added", "instance.idp.gitlab.added", "instance.idp.gitlab.selfhosted.added", "instance.idp.google.added", "instance.idp.ldap.added", "instance.idp.config.apple.added", "instance.idp.saml.added"}, @@ -1203,7 +1201,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1237,7 +1235,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), []byte(`{"name": "gigi"}`), "gigi", @@ -1271,7 +1269,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1285,7 +1283,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(24), - decimal.NewFromInt(124).String(), + float64(124), uint32(0), []byte(`{"name": "gigi"}`), "gigi", @@ -1319,7 +1317,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(23), - decimal.NewFromInt(123).String(), + float64(123), uint32(0), nil, "gigi", @@ -1333,7 +1331,7 @@ func Test_executeQuery(t *testing.T) { time.Now(), "event.type", uint32(24), - decimal.NewFromInt(124).String(), + float64(124), uint32(0), []byte(`{"name": "gigi"}`), "gigi", diff --git a/internal/v2/eventstore/query.go b/internal/v2/eventstore/query.go index f7a30a2139..c9b3cecd37 100644 --- a/internal/v2/eventstore/query.go +++ b/internal/v2/eventstore/query.go @@ -7,8 +7,6 @@ import ( "slices" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" ) @@ -725,7 +723,7 @@ func (pc *PositionCondition) Min() *GlobalPosition { // PositionGreater prepares the condition as follows // if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is NOT set: position > -func PositionGreater(position decimal.Decimal, inPositionOrder uint32) paginationOpt { +func PositionGreater(position float64, inPositionOrder uint32) paginationOpt { return func(p *Pagination) { p.ensurePosition() p.position.min = &GlobalPosition{ @@ -745,7 +743,7 @@ func GlobalPositionGreater(position *GlobalPosition) paginationOpt { // PositionLess prepares the condition as follows // if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is NOT set: position > -func PositionLess(position decimal.Decimal, inPositionOrder uint32) paginationOpt { +func PositionLess(position float64, inPositionOrder uint32) paginationOpt { return func(p *Pagination) { p.ensurePosition() p.position.max = &GlobalPosition{ diff --git a/internal/v2/eventstore/query_test.go b/internal/v2/eventstore/query_test.go index 0f313e9560..00c08914c1 100644 --- a/internal/v2/eventstore/query_test.go +++ b/internal/v2/eventstore/query_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/database" ) @@ -76,13 +74,13 @@ func TestPaginationOpt(t *testing.T) { name: "global position greater", args: args{ opts: []paginationOpt{ - GlobalPositionGreater(&GlobalPosition{Position: decimal.NewFromInt(10)}), + GlobalPositionGreater(&GlobalPosition{Position: 10}), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 0, }, }, @@ -92,13 +90,13 @@ func TestPaginationOpt(t *testing.T) { name: "position greater", args: args{ opts: []paginationOpt{ - PositionGreater(decimal.NewFromInt(10), 0), + PositionGreater(10, 0), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 0, }, }, @@ -109,13 +107,13 @@ func TestPaginationOpt(t *testing.T) { name: "position less", args: args{ opts: []paginationOpt{ - PositionLess(decimal.NewFromInt(10), 12), + PositionLess(10, 12), }, }, want: &Pagination{ position: &PositionCondition{ max: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 12, }, }, @@ -125,13 +123,13 @@ func TestPaginationOpt(t *testing.T) { name: "global position less", args: args{ opts: []paginationOpt{ - GlobalPositionLess(&GlobalPosition{Position: decimal.NewFromInt(12), InPositionOrder: 24}), + GlobalPositionLess(&GlobalPosition{Position: 12, InPositionOrder: 24}), }, }, want: &Pagination{ position: &PositionCondition{ max: &GlobalPosition{ - Position: decimal.NewFromInt(12), + Position: 12, InPositionOrder: 24, }, }, @@ -142,19 +140,19 @@ func TestPaginationOpt(t *testing.T) { args: args{ opts: []paginationOpt{ PositionBetween( - &GlobalPosition{decimal.NewFromInt(10), 12}, - &GlobalPosition{decimal.NewFromInt(20), 0}, + &GlobalPosition{10, 12}, + &GlobalPosition{20, 0}, ), }, }, want: &Pagination{ position: &PositionCondition{ min: &GlobalPosition{ - Position: decimal.NewFromInt(10), + Position: 10, InPositionOrder: 12, }, max: &GlobalPosition{ - Position: decimal.NewFromInt(20), + Position: 20, InPositionOrder: 0, }, }, diff --git a/internal/v2/readmodel/last_successful_mirror.go b/internal/v2/readmodel/last_successful_mirror.go index 6635b73342..80b436b896 100644 --- a/internal/v2/readmodel/last_successful_mirror.go +++ b/internal/v2/readmodel/last_successful_mirror.go @@ -1,8 +1,6 @@ package readmodel import ( - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/system" "github.com/zitadel/zitadel/internal/v2/system/mirror" @@ -10,7 +8,7 @@ import ( type LastSuccessfulMirror struct { ID string - Position decimal.Decimal + Position float64 source string } @@ -55,7 +53,7 @@ func (h *LastSuccessfulMirror) Reduce(events ...*eventstore.StorageEvent) (err e func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error { // if position is set we skip all older events - if h.Position.GreaterThan(decimal.NewFromInt(0)) { + if h.Position > 0 { return nil } diff --git a/internal/v2/system/mirror/succeeded.go b/internal/v2/system/mirror/succeeded.go index 34d74f184f..6d0fba2c25 100644 --- a/internal/v2/system/mirror/succeeded.go +++ b/internal/v2/system/mirror/succeeded.go @@ -1,8 +1,6 @@ package mirror import ( - "github.com/shopspring/decimal" - "github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/zerrors" ) @@ -11,7 +9,7 @@ type succeededPayload struct { // Source is the name of the database data are mirrored from Source string `json:"source"` // Position until data will be mirrored - Position decimal.Decimal `json:"position"` + Position float64 `json:"position"` } const SucceededType = eventTypePrefix + "succeeded" @@ -40,7 +38,7 @@ func SucceededEventFromStorage(event *eventstore.StorageEvent) (e *SucceededEven }, nil } -func NewSucceededCommand(source string, position decimal.Decimal) *eventstore.Command { +func NewSucceededCommand(source string, position float64) *eventstore.Command { return &eventstore.Command{ Action: eventstore.Action[any]{ Creator: Creator, diff --git a/load-test/Makefile b/load-test/Makefile index bbd5ebf538..61494e27cb 100644 --- a/load-test/Makefile +++ b/load-test/Makefile @@ -4,43 +4,42 @@ ZITADEL_HOST ?= ADMIN_LOGIN_NAME ?= ADMIN_PASSWORD ?= -K6 := ./../../xk6-modules/k6 - .PHONY: human_password_login human_password_login: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/human_password_login.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/human_password_login.js --vus ${VUS} --duration ${DURATION} .PHONY: machine_pat_login machine_pat_login: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_pat_login.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_pat_login.js --vus ${VUS} --duration ${DURATION} .PHONY: machine_client_credentials_login machine_client_credentials_login: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_client_credentials_login.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_client_credentials_login.js --vus ${VUS} --duration ${DURATION} .PHONY: user_info user_info: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/user_info.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/user_info.js --vus ${VUS} --duration ${DURATION} .PHONY: manipulate_user manipulate_user: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/manipulate_user.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/manipulate_user.js --vus ${VUS} --duration ${DURATION} .PHONY: introspect introspect: ensure_modules bundle go install go.k6.io/xk6/cmd/xk6@latest cd ../../xk6-modules && xk6 build --with xk6-zitadel=. - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/introspection.js --vus ${VUS} --duration ${DURATION} + ./../../xk6-modules/k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/introspection.js --vus ${VUS} --duration ${DURATION} .PHONY: add_session add_session: bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/session.js --vus ${VUS} --duration ${DURATION} + k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/session.js --vus ${VUS} --duration ${DURATION} .PHONY: machine_jwt_profile_grant machine_jwt_profile_grant: ensure_modules ensure_key_pair bundle go install go.k6.io/xk6/cmd/xk6@latest cd ../../xk6-modules && xk6 build --with xk6-zitadel=. - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_jwt_profile_grant.js --vus ${VUS} --duration ${DURATION} + ./../../xk6-modules/k6 run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/machine_jwt_profile_grant.js --iterations 1 + # --vus ${VUS} --duration ${DURATION} .PHONY: machine_jwt_profile_grant_single_user machine_jwt_profile_grant_single_user: ensure_modules ensure_key_pair bundle @@ -65,8 +64,6 @@ endif bundle: npm i npm run bundle - go install go.k6.io/xk6/cmd/xk6@latest - cd ../../xk6-modules && xk6 build --with xk6-zitadel=. .PHONY: ensure_key_pair ensure_key_pair: diff --git a/load-test/src/use_cases/manipulate_user.ts b/load-test/src/use_cases/manipulate_user.ts index 058bde05f1..2ea53bd324 100644 --- a/load-test/src/use_cases/manipulate_user.ts +++ b/load-test/src/use_cases/manipulate_user.ts @@ -45,4 +45,3 @@ export function teardown(data: any) { removeOrg(data.org, data.tokens.accessToken); console.info('teardown: org removed'); } -