fix(eventstore): revert precise decimal (#8527) (#8679)

This commit is contained in:
Tim Möhlmann
2024-09-24 19:43:29 +03:00
committed by GitHub
parent eb97be6fdf
commit aeb379e7de
47 changed files with 215 additions and 319 deletions

View File

@@ -5,8 +5,6 @@ import (
"encoding/json"
"time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
)
@@ -20,7 +18,7 @@ type Event struct {
// Seq is the sequence of the event
Seq uint64
// Pos is the global sequence of the event multiple events can have the same sequence
Pos decimal.Decimal
Pos float64
//CreationDate is the time the event is created
// it's used for human readability.
@@ -93,7 +91,7 @@ func (e *Event) Sequence() uint64 {
}
// Position implements [eventstore.Event]
func (e *Event) Position() decimal.Decimal {
func (e *Event) Position() float64 {
return e.Pos
}

View File

@@ -13,7 +13,6 @@ import (
context "context"
reflect "reflect"
decimal "github.com/shopspring/decimal"
eventstore "github.com/zitadel/zitadel/internal/eventstore"
gomock "go.uber.org/mock/gomock"
)
@@ -84,19 +83,19 @@ func (mr *MockQuerierMockRecorder) InstanceIDs(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1)
}
// LatestPosition mocks base method.
func (m *MockQuerier) LatestPosition(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
// LatestSequence mocks base method.
func (m *MockQuerier) LatestSequence(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (float64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LatestPosition", arg0, arg1)
ret0, _ := ret[0].(decimal.Decimal)
ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1)
ret0, _ := ret[0].(float64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// LatestPosition indicates an expected call of LatestPosition.
func (mr *MockQuerierMockRecorder) LatestPosition(arg0, arg1 any) *gomock.Call {
// LatestSequence indicates an expected call of LatestSequence.
func (mr *MockQuerierMockRecorder) LatestSequence(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestPosition", reflect.TypeOf((*MockQuerier)(nil).LatestPosition), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockQuerier)(nil).LatestSequence), arg0, arg1)
}
// MockPusher is a mock of Pusher interface.

View File

@@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
@@ -187,8 +186,8 @@ func (e *mockEvent) Sequence() uint64 {
return e.sequence
}
func (e *mockEvent) Position() decimal.Decimal {
return decimal.Decimal{}
func (e *mockEvent) Position() float64 {
return 0
}
func (e *mockEvent) CreatedAt() time.Time {

View File

@@ -55,8 +55,6 @@ const (
//OperationNotIn checks if a stored value does not match one of the passed value list
OperationNotIn
OperationGreaterEqual
operationCount
)
@@ -234,10 +232,10 @@ func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuer
}
func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetPositionAfter().IsZero() {
if builder.GetPositionAfter() == 0 {
return nil
}
query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreaterEqual)
query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater)
return query.Position
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
@@ -266,11 +265,11 @@ func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.S
return err
}
// LatestPosition returns the latest position found by the search query
func (db *CRDB) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
var position decimal.Decimal
// LatestSequence returns the latest sequence found by the search query
func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) {
var position sql.NullFloat64
err := query(ctx, db, searchQuery, &position, false)
return position, err
return position.Float64, err
}
// InstanceIDs returns the instance ids found by the search query
@@ -337,7 +336,7 @@ func (db *CRDB) eventQuery(useV1 bool) string {
" FROM eventstore.events2"
}
func (db *CRDB) maxPositionQuery(useV1 bool) string {
func (db *CRDB) maxSequenceQuery(useV1 bool) string {
if useV1 {
return `SELECT event_sequence FROM eventstore.events`
}
@@ -415,8 +414,6 @@ func (db *CRDB) operation(operation repository.Operation) string {
return "="
case repository.OperationGreater:
return ">"
case repository.OperationGreaterEqual:
return ">="
case repository.OperationLess:
return "<"
case repository.OperationJSONContains:

View File

@@ -4,8 +4,6 @@ import (
"database/sql"
"testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -314,7 +312,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev
ResourceOwner: sql.NullString{String: "ro", Valid: true},
Typ: "test.created",
Version: "v1",
Pos: decimal.NewFromInt(42),
Pos: 42,
}
for _, opt := range opts {

View File

@@ -9,7 +9,6 @@ import (
"strconv"
"strings"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
@@ -26,7 +25,7 @@ type querier interface {
conditionFormat(repository.Operation) string
placeholder(query string) string
eventQuery(useV1 bool) string
maxPositionQuery(useV1 bool) string
maxSequenceQuery(useV1 bool) string
instanceIDsQuery(useV1 bool) string
db() *database.DB
orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string
@@ -75,7 +74,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
// instead of using the max function of the database (which doesn't work for postgres)
// we select the most recent row
if q.Columns == eventstore.ColumnsMaxPosition {
if q.Columns == eventstore.ColumnsMaxSequence {
q.Limit = 1
q.Desc = true
}
@@ -92,7 +91,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
switch q.Columns {
case eventstore.ColumnsEvent,
eventstore.ColumnsMaxPosition:
eventstore.ColumnsMaxSequence:
query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1)
}
@@ -136,8 +135,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) {
switch columns {
case eventstore.ColumnsMaxPosition:
return criteria.maxPositionQuery(useV1), maxPositionScanner
case eventstore.ColumnsMaxSequence:
return criteria.maxSequenceQuery(useV1), maxSequenceScanner
case eventstore.ColumnsInstanceIDs:
return criteria.instanceIDsQuery(useV1), instanceIDsScanner
case eventstore.ColumnsEvent:
@@ -155,15 +154,13 @@ func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string
return criteria.Timetravel(took)
}
func maxPositionScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*decimal.Decimal)
func maxSequenceScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*sql.NullFloat64)
if !ok {
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be decimal.Decimal got: %T", dest)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest)
}
var res decimal.NullDecimal
err = row(&res)
err = row(position)
if err == nil || errors.Is(err, sql.ErrNoRows) {
*position = res.Decimal
return nil
}
return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong")
@@ -192,7 +189,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest)
}
event := new(repository.Event)
position := new(decimal.NullDecimal)
position := new(sql.NullFloat64)
if useV1 {
err = scanner(
@@ -229,7 +226,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
logging.New().WithError(err).Warn("unable to scan row")
return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
}
event.Pos = position.Decimal
event.Pos = position.Float64
return reduce(event)
}
}

View File

@@ -10,7 +10,6 @@ import (
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/database"
@@ -110,36 +109,36 @@ func Test_prepareColumns(t *testing.T) {
{
name: "max column",
args: args{
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
useV1: true,
},
res: res{
query: `SELECT event_sequence FROM eventstore.events`,
expected: decimal.NewFromInt(42),
expected: sql.NullFloat64{Float64: 43, Valid: true},
},
fields: fields{
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
},
},
{
name: "max column v2",
args: args{
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
},
res: res{
query: `SELECT "position" FROM eventstore.events2`,
expected: decimal.NewFromInt(42),
expected: sql.NullFloat64{Float64: 43, Valid: true},
},
fields: fields{
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
},
},
{
name: "max sequence wrong dest type",
args: args{
columns: eventstore.ColumnsMaxPosition,
columns: eventstore.ColumnsMaxSequence,
dest: new(uint64),
},
res: res{
@@ -179,11 +178,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{
@@ -198,11 +197,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{