fix(eventstore): use decimal, correct mirror (#9914)

# Eventstore fixes

- `event.Position` used float64 before which can lead to [precision
loss](https://github.com/golang/go/issues/47300). The type got replaced
by [a type without precision
loss](https://github.com/jackc/pgx-shopspring-decimal)
- the handler reported the wrong error if the current state was updated
and therefore took longer to retry failed events.

# Mirror fixes

- max age of auth requests can be configured to speed up copying data
from `auth.auth_requests` table. Auth requests last updated before the
set age will be ignored. Default is 1 month
- notification projections are skipped because notifications should be
sent by the source system. The projections are set to the latest
position
- ensure that mirror can be executed multiple times

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Silvan
2025-05-28 23:54:18 +02:00
committed by GitHub
parent 046b165db8
commit 131f70db34
51 changed files with 436 additions and 236 deletions

View File

@@ -7,6 +7,8 @@ import (
"testing"
"time"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
@@ -30,7 +32,11 @@ func TestMain(m *testing.M) {
connConfig, err := pgxpool.ParseConfig(config.GetConnectionURL())
logging.OnError(err).Fatal("unable to parse db url")
connConfig.AfterConnect = new_es.RegisterEventstoreTypes
connConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return new_es.RegisterEventstoreTypes(ctx, conn)
}
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
logging.OnError(err).Fatal("unable to create db pool")

View File

@@ -2,12 +2,12 @@ package sql
import (
"context"
"database/sql"
"errors"
"regexp"
"strconv"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -55,11 +55,11 @@ func (psql *Postgres) FilterToReducer(ctx context.Context, searchQuery *eventsto
return err
}
// LatestSequence returns the latest sequence found by the search query
func (db *Postgres) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) {
var position sql.NullFloat64
// LatestPosition returns the latest position found by the search query
func (db *Postgres) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
var position decimal.Decimal
err := query(ctx, db, searchQuery, &position, false)
return position.Float64, err
return position, err
}
// InstanceIDs returns the instance ids found by the search query
@@ -126,7 +126,7 @@ func (db *Postgres) eventQuery(useV1 bool) string {
" FROM eventstore.events2"
}
func (db *Postgres) maxSequenceQuery(useV1 bool) string {
func (db *Postgres) maxPositionQuery(useV1 bool) string {
if useV1 {
return `SELECT event_sequence FROM eventstore.events`
}
@@ -207,6 +207,8 @@ func (db *Postgres) operation(operation repository.Operation) string {
return "="
case repository.OperationGreater:
return ">"
case repository.OperationGreaterOrEquals:
return ">="
case repository.OperationLess:
return "<"
case repository.OperationJSONContains:

View File

@@ -4,6 +4,8 @@ import (
"database/sql"
"testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -312,7 +314,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev
ResourceOwner: sql.NullString{String: "ro", Valid: true},
Typ: "test.created",
Version: "v1",
Pos: 42,
Pos: decimal.NewFromInt(42),
}
for _, opt := range opts {

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
@@ -24,7 +25,7 @@ type querier interface {
conditionFormat(repository.Operation) string
placeholder(query string) string
eventQuery(useV1 bool) string
maxSequenceQuery(useV1 bool) string
maxPositionQuery(useV1 bool) string
instanceIDsQuery(useV1 bool) string
Client() *database.DB
orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string
@@ -68,7 +69,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
// instead of using the max function of the database (which doesn't work for postgres)
// we select the most recent row
if q.Columns == eventstore.ColumnsMaxSequence {
if q.Columns == eventstore.ColumnsMaxPosition {
q.Limit = 1
q.Desc = true
}
@@ -85,7 +86,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
switch q.Columns {
case eventstore.ColumnsEvent,
eventstore.ColumnsMaxSequence:
eventstore.ColumnsMaxPosition:
query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1)
}
@@ -141,8 +142,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) {
switch columns {
case eventstore.ColumnsMaxSequence:
return criteria.maxSequenceQuery(useV1), maxSequenceScanner
case eventstore.ColumnsMaxPosition:
return criteria.maxPositionQuery(useV1), maxPositionScanner
case eventstore.ColumnsInstanceIDs:
return criteria.instanceIDsQuery(useV1), instanceIDsScanner
case eventstore.ColumnsEvent:
@@ -152,13 +153,15 @@ func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (s
}
}
func maxSequenceScanner(row scan, dest any) (err error) {
position, ok := dest.(*sql.NullFloat64)
func maxPositionScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*decimal.Decimal)
if !ok {
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be pointer to decimal.Decimal got: %T", dest)
}
err = row(position)
var res decimal.NullDecimal
err = row(&res)
if err == nil || errors.Is(err, sql.ErrNoRows) {
*position = res.Decimal
return nil
}
return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong")
@@ -187,7 +190,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest)
}
event := new(repository.Event)
position := new(sql.NullFloat64)
position := new(decimal.NullDecimal)
if useV1 {
err = scanner(
@@ -224,7 +227,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
logging.New().WithError(err).Warn("unable to scan row")
return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
}
event.Pos = position.Float64
event.Pos = position.Decimal
return reduce(event)
}
}

View File

@@ -7,10 +7,12 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/database"
@@ -111,36 +113,36 @@ func Test_prepareColumns(t *testing.T) {
{
name: "max column",
args: args{
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
useV1: true,
},
res: res{
query: `SELECT event_sequence FROM eventstore.events`,
expected: sql.NullFloat64{Float64: 43, Valid: true},
expected: decimal.NewFromInt(42),
},
fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
},
},
{
name: "max column v2",
args: args{
columns: eventstore.ColumnsMaxSequence,
dest: new(sql.NullFloat64),
columns: eventstore.ColumnsMaxPosition,
dest: new(decimal.Decimal),
},
res: res{
query: `SELECT "position" FROM eventstore.events2`,
expected: sql.NullFloat64{Float64: 43, Valid: true},
expected: decimal.NewFromInt(42),
},
fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}},
dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
},
},
{
name: "max sequence wrong dest type",
args: args{
columns: eventstore.ColumnsMaxSequence,
columns: eventstore.ColumnsMaxPosition,
dest: new(uint64),
},
res: res{
@@ -180,11 +182,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{
@@ -199,11 +201,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"},
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"},
},
},
fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
},
},
{
@@ -901,7 +903,7 @@ func Test_query_events_mocked(t *testing.T) {
InstanceID("instanceID").
OrderDesc().
Limit(5).
PositionAfter(123.456).
PositionAtLeast(decimal.NewFromFloat(123.456)).
AddQuery().
AggregateTypes("notify").
EventTypes("notify.foo.bar").
@@ -914,8 +916,8 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY event_sequence DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)},
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY event_sequence DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
),
},
res: res{
@@ -930,7 +932,7 @@ func Test_query_events_mocked(t *testing.T) {
InstanceID("instanceID").
OrderDesc().
Limit(5).
PositionAfter(123.456).
PositionAtLeast(decimal.NewFromFloat(123.456)).
AddQuery().
AggregateTypes("notify").
EventTypes("notify.foo.bar").
@@ -943,8 +945,8 @@ func Test_query_events_mocked(t *testing.T) {
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)},
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
),
},
res: res{
@@ -988,6 +990,10 @@ func Test_query_events_mocked(t *testing.T) {
client.DB.DB = tt.fields.mock.client
}
if strings.HasPrefix(tt.name, "aggregate / event type, position and exclusion") {
t.Log("hodor")
}
err := query(context.Background(), client, tt.args.query, tt.args.dest, tt.args.useV1)
if (err != nil) != tt.res.wantErr {
t.Errorf("query() error = %v, wantErr %v", err, tt.res.wantErr)