fix(eventstore): use decimal, correct mirror (#9906)

back port #9812, #9878, #9881, #9884

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Silvan
2025-05-20 13:58:32 +02:00
committed by GitHub
parent eeb7cd6910
commit 6d33f9e75a
58 changed files with 540 additions and 322 deletions

View File

@@ -72,7 +72,7 @@ jobs:
with: with:
node_version: "18" node_version: "18"
buf_version: "latest" buf_version: "latest"
go_lint_version: "v1.62.2" go_lint_version: "v1.64.8"
core_cache_key: ${{ needs.core.outputs.cache_key }} core_cache_key: ${{ needs.core.outputs.cache_key }}
core_cache_path: ${{ needs.core.outputs.cache_path }} core_cache_path: ${{ needs.core.outputs.cache_path }}

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
_ "embed" _ "embed"
"io" "io"
"strconv"
"time" "time"
"github.com/jackc/pgx/v5/stdlib" "github.com/jackc/pgx/v5/stdlib"
@@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) {
logging.OnError(err).Fatal("unable to connect to destination database") logging.OnError(err).Fatal("unable to connect to destination database")
defer destClient.Close() defer destClient.Close()
copyAuthRequests(ctx, sourceClient, destClient) copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
} }
func copyAuthRequests(ctx context.Context, source, dest *database.DB) { func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
start := time.Now() start := time.Now()
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
sourceConn, err := source.Conn(ctx) sourceConn, err := source.Conn(ctx)
logging.OnError(err).Fatal("unable to acquire connection") logging.OnError(err).Fatal("unable to acquire connection")
defer sourceConn.Close() defer sourceConn.Close()
@@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
errs := make(chan error, 1) errs := make(chan error, 1)
go func() { go func() {
err = sourceConn.Raw(func(driverConn interface{}) error { err = sourceConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn() conn := driverConn.(*stdlib.Conn).Conn()
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT") _, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
w.Close() w.Close()
return err return err
}) })
@@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
defer destConn.Close() defer destConn.Close()
var affected int64 var affected int64
err = destConn.Raw(func(driverConn interface{}) error { err = destConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn() conn := driverConn.(*stdlib.Conn).Conn()
if shouldReplace { if shouldReplace {

View File

@@ -24,6 +24,7 @@ type Migration struct {
Destination database.Config Destination database.Config
EventBulkSize uint32 EventBulkSize uint32
MaxAuthRequestAge time.Duration
Log *logging.Config Log *logging.Config
Machine *id.Config Machine *id.Config

View File

@@ -1,84 +1,88 @@
Source: Source:
cockroach: cockroach:
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST Host: localhost # ZITADEL_SOURCE_COCKROACH_HOST
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT Port: 26257 # ZITADEL_SOURCE_COCKROACH_PORT
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE Database: zitadel # ZITADEL_SOURCE_COCKROACH_DATABASE
MaxOpenConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS MaxOpenConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXOPENCONNS
MaxIdleConns: 6 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS MaxIdleConns: 6 # ZITADEL_SOURCE_COCKROACH_MAXIDLECONNS
EventPushConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO EventPushConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.33 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO ProjectionSpoolerConnRatio: 0.33 # ZITADEL_SOURCE_COCKROACH_PROJECTIONSPOOLERCONNRATIO
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME MaxConnLifetime: 30m # ZITADEL_SOURCE_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME MaxConnIdleTime: 5m # ZITADEL_SOURCE_COCKROACH_MAXCONNIDLETIME
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS Options: "" # ZITADEL_SOURCE_COCKROACH_OPTIONS
User: User:
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME Username: zitadel # ZITADEL_SOURCE_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD Password: "" # ZITADEL_SOURCE_COCKROACH_USER_PASSWORD
SSL: SSL:
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE Mode: disable # ZITADEL_SOURCE_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT RootCert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT Cert: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY Key: "" # ZITADEL_SOURCE_COCKROACH_USER_SSL_KEY
# Postgres is used as soon as a value is set # Postgres is used as soon as a value is set
# The values describe the possible fields to set values # The values describe the possible fields to set values
postgres: postgres:
Host: # ZITADEL_DATABASE_POSTGRES_HOST Host: # ZITADEL_SOURCE_POSTGRES_HOST
Port: # ZITADEL_DATABASE_POSTGRES_PORT Port: # ZITADEL_SOURCE_POSTGRES_PORT
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE Database: # ZITADEL_SOURCE_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS MaxOpenConns: # ZITADEL_SOURCE_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS MaxIdleConns: # ZITADEL_SOURCE_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME MaxConnLifetime: # ZITADEL_SOURCE_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME MaxConnIdleTime: # ZITADEL_SOURCE_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS Options: # ZITADEL_SOURCE_POSTGRES_OPTIONS
User: User:
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME Username: # ZITADEL_SOURCE_POSTGRES_USER_USERNAME
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD Password: # ZITADEL_SOURCE_POSTGRES_USER_PASSWORD
SSL: SSL:
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE Mode: # ZITADEL_SOURCE_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT RootCert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT Cert: # ZITADEL_SOURCE_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY Key: # ZITADEL_SOURCE_POSTGRES_USER_SSL_KEY
Destination: Destination:
cockroach: cockroach:
Host: localhost # ZITADEL_DATABASE_COCKROACH_HOST Host: localhost # ZITADEL_DESTINATION_COCKROACH_HOST
Port: 26257 # ZITADEL_DATABASE_COCKROACH_PORT Port: 26257 # ZITADEL_DESTINATION_COCKROACH_PORT
Database: zitadel # ZITADEL_DATABASE_COCKROACH_DATABASE Database: zitadel # ZITADEL_DESTINATION_COCKROACH_DATABASE
MaxOpenConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXOPENCONNS MaxOpenConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXOPENCONNS
MaxIdleConns: 0 # ZITADEL_DATABASE_COCKROACH_MAXIDLECONNS MaxIdleConns: 0 # ZITADEL_DESTINATION_COCKROACH_MAXIDLECONNS
MaxConnLifetime: 30m # ZITADEL_DATABASE_COCKROACH_MAXCONNLIFETIME MaxConnLifetime: 30m # ZITADEL_DESTINATION_COCKROACH_MAXCONNLIFETIME
MaxConnIdleTime: 5m # ZITADEL_DATABASE_COCKROACH_MAXCONNIDLETIME MaxConnIdleTime: 5m # ZITADEL_DESTINATION_COCKROACH_MAXCONNIDLETIME
EventPushConnRatio: 0.01 # ZITADEL_DATABASE_COCKROACH_EVENTPUSHCONNRATIO EventPushConnRatio: 0.01 # ZITADEL_DESTINATION_COCKROACH_EVENTPUSHCONNRATIO
ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DATABASE_COCKROACH_PROJECTIONSPOOLERCONNRATIO ProjectionSpoolerConnRatio: 0.5 # ZITADEL_DESTINATION_COCKROACH_PROJECTIONSPOOLERCONNRATIO
Options: "" # ZITADEL_DATABASE_COCKROACH_OPTIONS Options: "" # ZITADEL_DESTINATION_COCKROACH_OPTIONS
User: User:
Username: zitadel # ZITADEL_DATABASE_COCKROACH_USER_USERNAME Username: zitadel # ZITADEL_DESTINATION_COCKROACH_USER_USERNAME
Password: "" # ZITADEL_DATABASE_COCKROACH_USER_PASSWORD Password: "" # ZITADEL_DESTINATION_COCKROACH_USER_PASSWORD
SSL: SSL:
Mode: disable # ZITADEL_DATABASE_COCKROACH_USER_SSL_MODE Mode: disable # ZITADEL_DESTINATION_COCKROACH_USER_SSL_MODE
RootCert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_ROOTCERT RootCert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_ROOTCERT
Cert: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_CERT Cert: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_CERT
Key: "" # ZITADEL_DATABASE_COCKROACH_USER_SSL_KEY Key: "" # ZITADEL_DESTINATION_COCKROACH_USER_SSL_KEY
# Postgres is used as soon as a value is set # Postgres is used as soon as a value is set
# The values describe the possible fields to set values # The values describe the possible fields to set values
postgres: postgres:
Host: # ZITADEL_DATABASE_POSTGRES_HOST Host: # ZITADEL_DESTINATION_POSTGRES_HOST
Port: # ZITADEL_DATABASE_POSTGRES_PORT Port: # ZITADEL_DESTINATION_POSTGRES_PORT
Database: # ZITADEL_DATABASE_POSTGRES_DATABASE Database: # ZITADEL_DESTINATION_POSTGRES_DATABASE
MaxOpenConns: # ZITADEL_DATABASE_POSTGRES_MAXOPENCONNS MaxOpenConns: # ZITADEL_DESTINATION_POSTGRES_MAXOPENCONNS
MaxIdleConns: # ZITADEL_DATABASE_POSTGRES_MAXIDLECONNS MaxIdleConns: # ZITADEL_DESTINATION_POSTGRES_MAXIDLECONNS
MaxConnLifetime: # ZITADEL_DATABASE_POSTGRES_MAXCONNLIFETIME MaxConnLifetime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNLIFETIME
MaxConnIdleTime: # ZITADEL_DATABASE_POSTGRES_MAXCONNIDLETIME MaxConnIdleTime: # ZITADEL_DESTINATION_POSTGRES_MAXCONNIDLETIME
Options: # ZITADEL_DATABASE_POSTGRES_OPTIONS Options: # ZITADEL_DESTINATION_POSTGRES_OPTIONS
User: User:
Username: # ZITADEL_DATABASE_POSTGRES_USER_USERNAME Username: # ZITADEL_DESTINATION_POSTGRES_USER_USERNAME
Password: # ZITADEL_DATABASE_POSTGRES_USER_PASSWORD Password: # ZITADEL_DESTINATION_POSTGRES_USER_PASSWORD
SSL: SSL:
Mode: # ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE Mode: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_MODE
RootCert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_ROOTCERT RootCert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_ROOTCERT
Cert: # ZITADEL_DATABASE_POSTGRES_USER_SSL_CERT Cert: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_CERT
Key: # ZITADEL_DATABASE_POSTGRES_USER_SSL_KEY Key: # ZITADEL_DESTINATION_POSTGRES_USER_SSL_KEY
EventBulkSize: 10000
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
# The maximum duration an auth request was last updated before it gets ignored.
# Default is 30 days
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
Projections: Projections:
# The maximum duration a transaction remains open # The maximum duration a transaction remains open
@@ -87,14 +91,14 @@ Projections:
TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION TransactionDuration: 0s # ZITADEL_PROJECTIONS_TRANSACTIONDURATION
# turn off scheduler during operation # turn off scheduler during operation
RequeueEvery: 0s RequeueEvery: 0s
ConcurrentInstances: 7 ConcurrentInstances: 7 # ZITADEL_PROJECTIONS_CONCURRENTINSTANCES
EventBulkLimit: 1000 EventBulkLimit: 1000 # ZITADEL_PROJECTIONS_EVENTBULKLIMIT
Customizations: Customizations:
notifications: notifications:
MaxFailureCount: 1 MaxFailureCount: 1
Eventstore: Eventstore:
MaxRetries: 3 MaxRetries: 3 # ZITADEL_EVENTSTORE_MAXRETRIES
Auth: Auth:
Spooler: Spooler:

View File

@@ -3,6 +3,8 @@ package mirror
import ( import (
"context" "context"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/projection" "github.com/zitadel/zitadel/internal/v2/projection"
"github.com/zitadel/zitadel/internal/v2/readmodel" "github.com/zitadel/zitadel/internal/v2/readmodel"
@@ -30,12 +32,12 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
return lastSuccess, nil return lastSuccess, nil
} }
func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ float64, err error) { func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, id string, destination string) (_ decimal.Decimal, err error) {
var cmd *eventstore.Command var cmd *eventstore.Command
if len(instanceIDs) > 0 { if len(instanceIDs) > 0 {
cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs) cmd, err = mirror_event.NewStartedInstancesCommand(destination, instanceIDs)
if err != nil { if err != nil {
return 0, err return decimal.Decimal{}, err
} }
} else { } else {
cmd = mirror_event.NewStartedSystemCommand(destination) cmd = mirror_event.NewStartedSystemCommand(destination)
@@ -58,12 +60,12 @@ func writeMigrationStart(ctx context.Context, sourceES *eventstore.EventStore, i
), ),
) )
if err != nil { if err != nil {
return 0, err return decimal.Decimal{}, err
} }
return position.Position, nil return position.Position, nil
} }
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error { func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error {
return destinationES.Push( return destinationES.Push(
ctx, ctx,
eventstore.NewPushIntent( eventstore.NewPushIntent(

View File

@@ -8,7 +8,9 @@ import (
"io" "io"
"time" "time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/stdlib" "github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/zitadel/logging" "github.com/zitadel/logging"
@@ -68,6 +70,7 @@ func positionQuery(db *db.DB) string {
} }
func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) { func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.Info("starting to copy events")
start := time.Now() start := time.Now()
reader, writer := io.Pipe() reader, writer := io.Pipe()
@@ -96,7 +99,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration") logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
nextPos := make(chan bool, 1) nextPos := make(chan bool, 1)
pos := make(chan float64, 1) pos := make(chan decimal.Decimal, 1)
errs := make(chan error, 3) errs := make(chan error, 3)
go func() { go func() {
@@ -126,7 +129,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
if err != nil { if err != nil {
return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i) return zerrors.ThrowUnknownf(err, "MIGRA-KTuSq", "unable to copy events from source during iteration %d", i)
} }
logging.WithFields("batch_count", i).Info("batch of events copied")
if tag.RowsAffected() < int64(bulkSize) { if tag.RowsAffected() < int64(bulkSize) {
logging.WithFields("batch_count", i).Info("last batch of events copied")
return nil return nil
} }
@@ -144,7 +150,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
go func() { go func() {
defer close(pos) defer close(pos)
for range nextPos { for range nextPos {
var position float64 var position decimal.Decimal
err := dest.QueryRowContext( err := dest.QueryRowContext(
ctx, ctx,
func(row *sql.Row) error { func(row *sql.Row) error {
@@ -167,6 +173,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN") tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN")
eventCount = tag.RowsAffected() eventCount = tag.RowsAffected()
if err != nil { if err != nil {
pgErr := new(pgconn.PgError)
errors.As(err, &pgErr)
logging.WithError(err).WithField("pg_err_details", pgErr.Detail).Error("unable to copy events into destination")
return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination") return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination")
} }
@@ -179,7 +189,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated") logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
} }
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) { func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) {
joinedErrs := make([]error, 0, len(errs)) joinedErrs := make([]error, 0, len(errs))
for err := range errs { for err := range errs {
joinedErrs = append(joinedErrs, err) joinedErrs = append(joinedErrs, err)
@@ -198,6 +208,7 @@ func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, sou
} }
func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) { func copyUniqueConstraints(ctx context.Context, source, dest *db.DB) {
logging.Info("starting to copy unique constraints")
start := time.Now() start := time.Now()
reader, writer := io.Pipe() reader, writer := io.Pipe()
errs := make(chan error, 1) errs := make(chan error, 1)

View File

@@ -3,6 +3,7 @@ package mirror
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@@ -103,6 +104,7 @@ func projections(
config *ProjectionsConfig, config *ProjectionsConfig,
masterKey string, masterKey string,
) { ) {
logging.Info("starting to fill projections")
start := time.Now() start := time.Now()
client, err := database.Connect(config.Destination, false) client, err := database.Connect(config.Destination, false)
@@ -118,7 +120,11 @@ func projections(
logging.OnError(err).Fatal("unable create static storage") logging.OnError(err).Fatal("unable create static storage")
config.Eventstore.Querier = old_es.NewCRDB(client) config.Eventstore.Querier = old_es.NewCRDB(client)
config.Eventstore.Pusher = new_es.NewEventstore(client)
newES := new_es.NewEventstore(client)
config.Eventstore.Pusher = newES
config.Eventstore.Searcher = newES
es := eventstore.NewEventstore(config.Eventstore) es := eventstore.NewEventstore(config.Eventstore)
esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{ esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{
MaxRetries: config.Eventstore.MaxRetries, MaxRetries: config.Eventstore.MaxRetries,
@@ -249,11 +255,13 @@ func projections(
}() }()
for i := 0; i < int(config.Projections.ConcurrentInstances); i++ { for i := 0; i < int(config.Projections.ConcurrentInstances); i++ {
go execProjections(ctx, instances, failedInstances, &wg) go execProjections(ctx, es, instances, failedInstances, &wg)
} }
for _, instance := range queryInstanceIDs(ctx, client) { existingInstances := queryInstanceIDs(ctx, client)
for i, instance := range existingInstances {
instances <- instance instances <- instance
logging.WithFields("id", instance, "index", fmt.Sprintf("%d/%d", i, len(existingInstances))).Info("instance queued for projection")
} }
close(instances) close(instances)
wg.Wait() wg.Wait()
@@ -263,9 +271,9 @@ func projections(
logging.WithFields("took", time.Since(start)).Info("projections executed") logging.WithFields("took", time.Since(start)).Info("projections executed")
} }
func execProjections(ctx context.Context, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) { func execProjections(ctx context.Context, es *eventstore.Eventstore, instances <-chan string, failedInstances chan<- string, wg *sync.WaitGroup) {
for instance := range instances { for instance := range instances {
logging.WithFields("instance", instance).Info("start projections") logging.WithFields("instance", instance).Info("starting projections")
ctx = internal_authz.WithInstanceID(ctx, instance) ctx = internal_authz.WithInstanceID(ctx, instance)
err := projection.ProjectInstance(ctx) err := projection.ProjectInstance(ctx)
@@ -282,6 +290,13 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue continue
} }
err = projection.ProjectInstanceFields(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
failedInstances <- instance
continue
}
err = auth_handler.ProjectInstance(ctx) err = auth_handler.ProjectInstance(ctx)
if err != nil { if err != nil {
logging.WithFields("instance", instance).OnError(err).Info("trigger auth handler failed") logging.WithFields("instance", instance).OnError(err).Info("trigger auth handler failed")
@@ -289,7 +304,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue continue
} }
err = notification.ProjectInstance(ctx) err = notification.SetCurrentState(ctx, es)
if err != nil { if err != nil {
logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed") logging.WithFields("instance", instance).OnError(err).Info("trigger notification failed")
failedInstances <- instance failedInstances <- instance
@@ -300,7 +315,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
wg.Done() wg.Done()
} }
// returns the instance configured by flag // queryInstanceIDs returns the instance configured by flag
// or all instances which are not removed // or all instances which are not removed
func queryInstanceIDs(ctx context.Context, source *database.DB) []string { func queryInstanceIDs(ctx context.Context, source *database.DB) []string {
if len(instanceIDs) > 0 { if len(instanceIDs) > 0 {

View File

@@ -46,6 +46,7 @@ func copySystem(ctx context.Context, config *Migration) {
} }
func copyAssets(ctx context.Context, source, dest *database.DB) { func copyAssets(ctx context.Context, source, dest *database.DB) {
logging.Info("starting to copy assets")
start := time.Now() start := time.Now()
sourceConn, err := source.Conn(ctx) sourceConn, err := source.Conn(ctx)
@@ -70,7 +71,7 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
logging.OnError(err).Fatal("unable to acquire dest connection") logging.OnError(err).Fatal("unable to acquire dest connection")
defer destConn.Close() defer destConn.Close()
var eventCount int64 var assetCount int64
err = destConn.Raw(func(driverConn interface{}) error { err = destConn.Raw(func(driverConn interface{}) error {
conn := driverConn.(*stdlib.Conn).Conn() conn := driverConn.(*stdlib.Conn).Conn()
@@ -82,16 +83,17 @@ func copyAssets(ctx context.Context, source, dest *database.DB) {
} }
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin") tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.assets (instance_id, asset_type, resource_owner, name, content_type, data, updated_at) FROM stdin")
eventCount = tag.RowsAffected() assetCount = tag.RowsAffected()
return err return err
}) })
logging.OnError(err).Fatal("unable to copy assets to destination") logging.OnError(err).Fatal("unable to copy assets to destination")
logging.OnError(<-errs).Fatal("unable to copy assets from source") logging.OnError(<-errs).Fatal("unable to copy assets from source")
logging.WithFields("took", time.Since(start), "count", eventCount).Info("assets migrated") logging.WithFields("took", time.Since(start), "count", assetCount).Info("assets migrated")
} }
func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) { func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
logging.Info("starting to copy encryption keys")
start := time.Now() start := time.Now()
sourceConn, err := source.Conn(ctx) sourceConn, err := source.Conn(ctx)
@@ -116,7 +118,7 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
logging.OnError(err).Fatal("unable to acquire dest connection") logging.OnError(err).Fatal("unable to acquire dest connection")
defer destConn.Close() defer destConn.Close()
var eventCount int64 var keyCount int64
err = destConn.Raw(func(driverConn interface{}) error { err = destConn.Raw(func(driverConn interface{}) error {
conn := driverConn.(*stdlib.Conn).Conn() conn := driverConn.(*stdlib.Conn).Conn()
@@ -128,11 +130,11 @@ func copyEncryptionKeys(ctx context.Context, source, dest *database.DB) {
} }
tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin") tag, err := conn.PgConn().CopyFrom(ctx, r, "COPY system.encryption_keys FROM stdin")
eventCount = tag.RowsAffected() keyCount = tag.RowsAffected()
return err return err
}) })
logging.OnError(err).Fatal("unable to copy encryption keys to destination") logging.OnError(err).Fatal("unable to copy encryption keys to destination")
logging.OnError(<-errs).Fatal("unable to copy encryption keys from source") logging.OnError(<-errs).Fatal("unable to copy encryption keys from source")
logging.WithFields("took", time.Since(start), "count", eventCount).Info("encryption keys migrated") logging.WithFields("took", time.Since(start), "count", keyCount).Info("encryption keys migrated")
} }

View File

@@ -163,6 +163,9 @@ Destination:
# As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration # As cockroachdb first copies the data into memory this parameter is used to iterate through the events table and fetch only the given amount of events per iteration
EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE EventBulkSize: 10000 # ZITADEL_EVENTBULKSIZE
# The maximum duration an auth request was last updated before it gets ignored.
# Default is 30 days
MaxAuthRequestAge: 720h # ZITADEL_MAXAUTHREQUESTAGE
Projections: Projections:
# Defines how many projections are allowed to run in parallel # Defines how many projections are allowed to run in parallel

4
go.mod
View File

@@ -44,7 +44,8 @@ require (
github.com/h2non/gock v1.2.0 github.com/h2non/gock v1.2.0
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/improbable-eng/grpc-web v0.15.0 github.com/improbable-eng/grpc-web v0.15.0
github.com/jackc/pgx/v5 v5.7.3 github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e
github.com/jackc/pgx/v5 v5.7.4
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52
github.com/jinzhu/gorm v1.9.16 github.com/jinzhu/gorm v1.9.16
github.com/k3a/html2text v1.2.1 github.com/k3a/html2text v1.2.1
@@ -64,6 +65,7 @@ require (
github.com/riverqueue/river/rivertype v0.19.0 github.com/riverqueue/river/rivertype v0.19.0
github.com/rs/cors v1.11.1 github.com/rs/cors v1.11.1
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/shopspring/decimal v1.4.0
github.com/sony/gobreaker/v2 v2.1.0 github.com/sony/gobreaker/v2 v2.1.0
github.com/sony/sonyflake v1.2.0 github.com/sony/sonyflake v1.2.0
github.com/spf13/cobra v1.9.1 github.com/spf13/cobra v1.9.1

8
go.sum
View File

@@ -445,8 +445,10 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.3 h1:PO1wNKj/bTAwxSJnO1Z4Ai8j4magtqg2SLNjEDzcXQo= github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e h1:i3gQ/Zo7sk4LUVbsAjTNeC4gIjoPNIZVzs4EXstssV4=
github.com/jackc/pgx/v5 v5.7.3/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e/go.mod h1:zUHglCZ4mpDUPgIwqEKoba6+tcUQzRdb1+DPTuYe9pI=
github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg=
github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ= github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ=
@@ -708,6 +710,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=

View File

@@ -2,9 +2,13 @@ package handler
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2" "github.com/zitadel/zitadel/internal/eventstore/handler/v2"
@@ -57,11 +61,13 @@ func Start(ctx context.Context) {
} }
func ProjectInstance(ctx context.Context) error { func ProjectInstance(ctx context.Context) error {
for _, projection := range projections { for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
_, err := projection.Trigger(ctx) _, err := projection.Trigger(ctx)
if err != nil { if err != nil {
return err return err
} }
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
} }
return nil return nil
} }

View File

@@ -11,6 +11,7 @@ import (
"github.com/go-jose/go-jose/v4" "github.com/go-jose/go-jose/v4"
"github.com/jonboulle/clockwork" "github.com/jonboulle/clockwork"
"github.com/muhlemmer/gu" "github.com/muhlemmer/gu"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/oidc/v3/pkg/op" "github.com/zitadel/oidc/v3/pkg/op"
@@ -350,14 +351,14 @@ func (o *OPStorage) getSigningKey(ctx context.Context) (op.SigningKey, error) {
if len(keys.Keys) > 0 { if len(keys.Keys) > 0 {
return PrivateKeyToSigningKey(SelectSigningKey(keys.Keys), o.encAlg) return PrivateKeyToSigningKey(SelectSigningKey(keys.Keys), o.encAlg)
} }
var position float64 var position decimal.Decimal
if keys.State != nil { if keys.State != nil {
position = keys.State.Position position = keys.State.Position
} }
return nil, o.refreshSigningKey(ctx, position) return nil, o.refreshSigningKey(ctx, position)
} }
func (o *OPStorage) refreshSigningKey(ctx context.Context, position float64) error { func (o *OPStorage) refreshSigningKey(ctx context.Context, position decimal.Decimal) error {
ok, err := o.ensureIsLatestKey(ctx, position) ok, err := o.ensureIsLatestKey(ctx, position)
if err != nil || !ok { if err != nil || !ok {
return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date") return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date")
@@ -369,12 +370,12 @@ func (o *OPStorage) refreshSigningKey(ctx context.Context, position float64) err
return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "") return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "")
} }
func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position float64) (bool, error) { func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position decimal.Decimal) (bool, error) {
maxSequence, err := o.getMaxKeySequence(ctx) maxSequence, err := o.getMaxKeyPosition(ctx)
if err != nil { if err != nil {
return false, fmt.Errorf("error retrieving new events: %w", err) return false, fmt.Errorf("error retrieving new events: %w", err)
} }
return position >= maxSequence, nil return position.GreaterThanOrEqual(maxSequence), nil
} }
func PrivateKeyToSigningKey(key query.PrivateKey, algorithm crypto.EncryptionAlgorithm) (_ op.SigningKey, err error) { func PrivateKeyToSigningKey(key query.PrivateKey, algorithm crypto.EncryptionAlgorithm) (_ op.SigningKey, err error) {
@@ -412,9 +413,9 @@ func (o *OPStorage) lockAndGenerateSigningKeyPair(ctx context.Context) error {
return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), "RS256") return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), "RS256")
} }
func (o *OPStorage) getMaxKeySequence(ctx context.Context) (float64, error) { func (o *OPStorage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
return o.eventstore.LatestSequence(ctx, return o.eventstore.LatestPosition(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
ResourceOwner(authz.GetInstance(ctx).InstanceID()). ResourceOwner(authz.GetInstance(ctx).InstanceID()).
AwaitOpenTransactions(). AwaitOpenTransactions().
AllowTimeTravel(). AllowTimeTravel().

View File

@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/go-jose/go-jose/v4" "github.com/go-jose/go-jose/v4"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/saml/pkg/provider/key" "github.com/zitadel/saml/pkg/provider/key"
@@ -76,7 +77,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates)) return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates))
} }
var position float64 var position decimal.Decimal
if certs.State != nil { if certs.State != nil {
position = certs.State.Position position = certs.State.Position
} }
@@ -87,7 +88,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
func (p *Storage) refreshCertificate( func (p *Storage) refreshCertificate(
ctx context.Context, ctx context.Context,
usage crypto.KeyUsage, usage crypto.KeyUsage,
position float64, position decimal.Decimal,
) error { ) error {
ok, err := p.ensureIsLatestCertificate(ctx, position) ok, err := p.ensureIsLatestCertificate(ctx, position)
if err != nil { if err != nil {
@@ -103,12 +104,12 @@ func (p *Storage) refreshCertificate(
return nil return nil
} }
func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position float64) (bool, error) { func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position decimal.Decimal) (bool, error) {
maxSequence, err := p.getMaxKeySequence(ctx) maxSequence, err := p.getMaxKeyPosition(ctx)
if err != nil { if err != nil {
return false, fmt.Errorf("error retrieving new events: %w", err) return false, fmt.Errorf("error retrieving new events: %w", err)
} }
return position >= maxSequence, nil return position.GreaterThanOrEqual(maxSequence), nil
} }
func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error { func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error {
@@ -151,9 +152,9 @@ func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage cr
} }
} }
func (p *Storage) getMaxKeySequence(ctx context.Context) (float64, error) { func (p *Storage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
return p.eventstore.LatestSequence(ctx, return p.eventstore.LatestPosition(ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
ResourceOwner(authz.GetInstance(ctx).InstanceID()). ResourceOwner(authz.GetInstance(ctx).InstanceID()).
AwaitOpenTransactions(). AwaitOpenTransactions().
AddQuery(). AddQuery().

View File

@@ -2,8 +2,12 @@ package handler
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
@@ -72,11 +76,13 @@ func Projections() []*handler2.Handler {
} }
func ProjectInstance(ctx context.Context) error { func ProjectInstance(ctx context.Context) error {
for _, projection := range projections { for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
_, err := projection.Trigger(ctx) _, err := projection.Trigger(ctx)
if err != nil { if err != nil {
return err return err
} }
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
} }
return nil return nil
} }

View File

@@ -73,10 +73,16 @@ func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) {
} }
func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) {
dialect.RegisterAfterConnect(func(ctx context.Context, c *pgx.Conn) error { dialect.RegisterAfterConnect(func(ctx context.Context, conn *pgx.Conn) error {
// CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT // CockroachDB by default does not allow multiple modifications of the same table using ON CONFLICT
// This is needed to fill the fields table of the eventstore during eventstore.Push. // This is needed to fill the fields table of the eventstore during eventstore.Push.
_, err := c.Exec(ctx, "SET enable_multiple_modifications_of_table = on")
// This modification is only needed on crdb so we check if the connection is crdb
// postgres doesn't have parameter so we check if the parameter is empty
if conn.PgConn().ParameterStatus("crdb_version") == "" {
return nil
}
_, err := conn.Exec(ctx, "SET enable_multiple_modifications_of_table = on")
return err return err
}) })
connConfig := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns) connConfig := dialect.NewConnectionConfig(c.MaxOpenConns, c.MaxIdleConns)

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"reflect" "reflect"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
) )
@@ -23,7 +24,12 @@ type ConnectionConfig struct {
AfterRelease []func(c *pgx.Conn) error AfterRelease []func(c *pgx.Conn) error
} }
var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error var afterConnectFuncs = []func(ctx context.Context, c *pgx.Conn) error{
func(ctx context.Context, c *pgx.Conn) error {
pgxdecimal.Register(c.TypeMap())
return nil
},
}
func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) { func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) {
afterConnectFuncs = append(afterConnectFuncs, f) afterConnectFuncs = append(afterConnectFuncs, f)

View File

@@ -5,6 +5,8 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
) )
@@ -44,7 +46,7 @@ type Event interface {
// CreatedAt is the time the event was created at // CreatedAt is the time the event was created at
CreatedAt() time.Time CreatedAt() time.Time
// Position is the global position of the event // Position is the global position of the event
Position() float64 Position() decimal.Decimal
// Unmarshal parses the payload and stores the result // Unmarshal parses the payload and stores the result
// in the value pointed to by ptr. If ptr is nil or not a pointer, // in the value pointed to by ptr. If ptr is nil or not a pointer,

View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@@ -25,7 +26,7 @@ type BaseEvent struct {
Agg *Aggregate `json:"-"` Agg *Aggregate `json:"-"`
Seq uint64 Seq uint64
Pos float64 Pos decimal.Decimal
Creation time.Time Creation time.Time
previousAggregateSequence uint64 previousAggregateSequence uint64
previousAggregateTypeSequence uint64 previousAggregateTypeSequence uint64
@@ -38,7 +39,7 @@ type BaseEvent struct {
} }
// Position implements Event. // Position implements Event.
func (e *BaseEvent) Position() float64 { func (e *BaseEvent) Position() decimal.Decimal {
return e.Pos return e.Pos
} }

View File

@@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@@ -229,11 +230,11 @@ func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQu
}) })
} }
// LatestSequence filters the latest sequence for the given search query // LatestPosition filters the latest position for the given search query
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { func (es *Eventstore) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) {
queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID()) queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID())
return es.querier.LatestSequence(ctx, queryFactory) return es.querier.LatestPosition(ctx, queryFactory)
} }
// InstanceIDs returns the distinct instance ids found by the search query // InstanceIDs returns the distinct instance ids found by the search query
@@ -265,8 +266,8 @@ type Querier interface {
Health(ctx context.Context) error Health(ctx context.Context) error
// FilterToReducer calls r for every event returned from the storage // FilterToReducer calls r for every event returned from the storage
FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error
// LatestSequence returns the latest sequence found by the search query // LatestPosition returns the latest position found by the search query
LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error)
// InstanceIDs returns the instance ids found by the search query // InstanceIDs returns the instance ids found by the search query
InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error)
// Client returns the underlying database connection // Client returns the underlying database connection

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
"testing" "testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
) )
@@ -131,7 +133,7 @@ func TestCRDB_Filter(t *testing.T) {
} }
} }
func TestCRDB_LatestSequence(t *testing.T) { func TestCRDB_LatestPosition(t *testing.T) {
type args struct { type args struct {
searchQuery *eventstore.SearchQueryBuilder searchQuery *eventstore.SearchQueryBuilder
} }
@@ -139,7 +141,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
existingEvents []eventstore.Command existingEvents []eventstore.Command
} }
type res struct { type res struct {
sequence float64 position decimal.Decimal
} }
tests := []struct { tests := []struct {
name string name string
@@ -151,7 +153,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
{ {
name: "aggregate type filter no sequence", name: "aggregate type filter no sequence",
args: args{ args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
AddQuery(). AddQuery().
AggregateTypes("not found"). AggregateTypes("not found").
Builder(), Builder(),
@@ -168,7 +170,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
{ {
name: "aggregate type filter sequence", name: "aggregate type filter sequence",
args: args{ args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence). searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
AddQuery(). AddQuery().
AggregateTypes(eventstore.AggregateType(t.Name())). AggregateTypes(eventstore.AggregateType(t.Name())).
Builder(), Builder(),
@@ -202,12 +204,12 @@ func TestCRDB_LatestSequence(t *testing.T) {
return return
} }
sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery) position, err := db.LatestPosition(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
} }
if tt.res.sequence > sequence { if tt.res.position.GreaterThan(position) {
t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.sequence, sequence) t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.position, position)
} }
}) })
} }

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service" "github.com/zitadel/zitadel/internal/api/service"
@@ -397,7 +398,7 @@ func (repo *testPusher) Push(_ context.Context, _ database.ContextQueryExecuter,
type testQuerier struct { type testQuerier struct {
events []Event events []Event
sequence float64 sequence decimal.Decimal
instances []string instances []string
err error err error
t *testing.T t *testing.T
@@ -430,9 +431,9 @@ func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *Searc
return nil return nil
} }
func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) { func (repo *testQuerier) LatestPosition(ctx context.Context, queryFactory *SearchQueryBuilder) (decimal.Decimal, error) {
if repo.err != nil { if repo.err != nil {
return 0, repo.err return decimal.Decimal{}, repo.err
} }
return repo.sequence, nil return repo.sequence, nil
} }
@@ -1076,7 +1077,7 @@ func TestEventstore_FilterEvents(t *testing.T) {
} }
} }
func TestEventstore_LatestSequence(t *testing.T) { func TestEventstore_LatestPosition(t *testing.T) {
type args struct { type args struct {
query *SearchQueryBuilder query *SearchQueryBuilder
} }
@@ -1096,7 +1097,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "no events", name: "no events",
args: args{ args: args{
query: &SearchQueryBuilder{ query: &SearchQueryBuilder{
columns: ColumnsMaxSequence, columns: ColumnsMaxPosition,
queries: []*SearchQuery{ queries: []*SearchQuery{
{ {
builder: &SearchQueryBuilder{}, builder: &SearchQueryBuilder{},
@@ -1119,7 +1120,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "repo error", name: "repo error",
args: args{ args: args{
query: &SearchQueryBuilder{ query: &SearchQueryBuilder{
columns: ColumnsMaxSequence, columns: ColumnsMaxPosition,
queries: []*SearchQuery{ queries: []*SearchQuery{
{ {
builder: &SearchQueryBuilder{}, builder: &SearchQueryBuilder{},
@@ -1142,7 +1143,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
name: "found events", name: "found events",
args: args{ args: args{
query: &SearchQueryBuilder{ query: &SearchQueryBuilder{
columns: ColumnsMaxSequence, columns: ColumnsMaxPosition,
queries: []*SearchQuery{ queries: []*SearchQuery{
{ {
builder: &SearchQueryBuilder{}, builder: &SearchQueryBuilder{},
@@ -1168,7 +1169,7 @@ func TestEventstore_LatestSequence(t *testing.T) {
querier: tt.fields.repo, querier: tt.fields.repo,
} }
_, err := es.LatestSequence(context.Background(), tt.args.query) _, err := es.LatestPosition(context.Background(), tt.args.query)
if (err != nil) != tt.res.wantErr { if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr) t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
} }

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
) )
@@ -126,10 +127,15 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
return additionalIteration, err return additionalIteration, err
} }
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt // stop execution if currentState.eventTimestamp >= config.maxCreatedAt
if config.maxPosition != 0 && currentState.position >= config.maxPosition { if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil return false, nil
} }
if config.minPosition.GreaterThan(decimal.NewFromInt(0)) {
currentState.position = config.minPosition
currentState.offset = 0
}
events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState) events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState)
if err != nil { if err != nil {
return additionalIteration, err return additionalIteration, err
@@ -150,7 +156,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
} }
func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) { func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) {
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx))
if err != nil || len(events) == 0 { if err != nil || len(events) == 0 {
h.log().OnError(err).Debug("filter eventstore failed") h.log().OnError(err).Debug("filter eventstore failed")
return nil, false, err return nil, false, err
@@ -159,7 +165,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
idx, offset := skipPreviouslyReducedEvents(events, currentState) idx, offset := skipPreviouslyReducedEvents(events, currentState)
if currentState.position == events[len(events)-1].Position() { if currentState.position.Equal(events[len(events)-1].Position()) {
offset += currentState.offset offset += currentState.offset
} }
currentState.position = events[len(events)-1].Position() currentState.position = events[len(events)-1].Position()
@@ -189,9 +195,9 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
} }
func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) { func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) {
var position float64 var position decimal.Decimal
for i, event := range events { for i, event := range events {
if event.Position() != position { if !event.Position().Equal(position) {
offset = 0 offset = 0
position = event.Position() position = event.Position()
} }

View File

@@ -4,13 +4,13 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"math"
"math/rand" "math/rand"
"slices" "slices"
"sync" "sync"
"time" "time"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@@ -385,7 +385,8 @@ func (h *Handler) existingInstances(ctx context.Context) ([]string, error) {
type triggerConfig struct { type triggerConfig struct {
awaitRunning bool awaitRunning bool
maxPosition float64 maxPosition decimal.Decimal
minPosition decimal.Decimal
} }
type TriggerOpt func(conf *triggerConfig) type TriggerOpt func(conf *triggerConfig)
@@ -396,12 +397,18 @@ func WithAwaitRunning() TriggerOpt {
} }
} }
func WithMaxPosition(position float64) TriggerOpt { func WithMaxPosition(position decimal.Decimal) TriggerOpt {
return func(conf *triggerConfig) { return func(conf *triggerConfig) {
conf.maxPosition = position conf.maxPosition = position
} }
} }
func WithMinPosition(position decimal.Decimal) TriggerOpt {
return func(conf *triggerConfig) {
conf.minPosition = position
}
}
func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) { func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) {
config := new(triggerConfig) config := new(triggerConfig)
for _, opt := range opts { for _, opt := range opts {
@@ -510,10 +517,15 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
return additionalIteration, err return additionalIteration, err
} }
// stop execution if currentState.position >= config.maxPosition // stop execution if currentState.position >= config.maxPosition
if config.maxPosition != 0 && currentState.position >= config.maxPosition { if !config.maxPosition.Equal(decimal.Decimal{}) && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil return false, nil
} }
if config.minPosition.GreaterThan(decimal.NewFromInt(0)) {
currentState.position = config.minPosition
currentState.offset = 0
}
var statements []*Statement var statements []*Statement
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState) statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
if err != nil { if err != nil {
@@ -569,7 +581,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
return []*Statement{stmt}, false, nil return []*Statement{stmt}, false, nil
} }
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) events, err := h.es.Filter(ctx, h.EventQuery(currentState).SetTx(tx))
if err != nil { if err != nil {
h.log().WithError(err).Debug("filter eventstore failed") h.log().WithError(err).Debug("filter eventstore failed")
return nil, false, err return nil, false, err
@@ -605,7 +617,7 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int { func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int {
for i, statement := range statements { for i, statement := range statements {
if statement.Position == currentState.position && if statement.Position.Equal(currentState.position) &&
statement.Aggregate.ID == currentState.aggregateID && statement.Aggregate.ID == currentState.aggregateID &&
statement.Aggregate.Type == currentState.aggregateType && statement.Aggregate.Type == currentState.aggregateType &&
statement.Sequence == currentState.sequence { statement.Sequence == currentState.sequence {
@@ -661,7 +673,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S
return nil return nil
} }
func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder { func (h *Handler) EventQuery(currentState *state) *eventstore.SearchQueryBuilder {
builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AwaitOpenTransactions(). AwaitOpenTransactions().
Limit(uint64(h.bulkLimit)). Limit(uint64(h.bulkLimit)).
@@ -669,9 +681,8 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
OrderAsc(). OrderAsc().
InstanceID(currentState.instanceID) InstanceID(currentState.instanceID)
if currentState.position > 0 { if currentState.position.GreaterThan(decimal.Decimal{}) {
// decrease position by 10 because builder.PositionAfter filters for position > and we need position >= builder = builder.PositionAtLeast(currentState.position)
builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10))
if currentState.offset > 0 { if currentState.offset > 0 {
builder = builder.Offset(currentState.offset) builder = builder.Offset(currentState.offset)
} }

View File

@@ -7,6 +7,8 @@ import (
"errors" "errors"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
@@ -14,7 +16,7 @@ import (
type state struct { type state struct {
instanceID string instanceID string
position float64 position decimal.Decimal
eventTimestamp time.Time eventTimestamp time.Time
aggregateType eventstore.AggregateType aggregateType eventstore.AggregateType
aggregateID string aggregateID string
@@ -45,7 +47,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
aggregateType = new(sql.NullString) aggregateType = new(sql.NullString)
sequence = new(sql.NullInt64) sequence = new(sql.NullInt64)
timestamp = new(sql.NullTime) timestamp = new(sql.NullTime)
position = new(sql.NullFloat64) position = new(decimal.NullDecimal)
offset = new(sql.NullInt64) offset = new(sql.NullInt64)
) )
@@ -75,7 +77,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
currentState.aggregateType = eventstore.AggregateType(aggregateType.String) currentState.aggregateType = eventstore.AggregateType(aggregateType.String)
currentState.sequence = uint64(sequence.Int64) currentState.sequence = uint64(sequence.Int64)
currentState.eventTimestamp = timestamp.Time currentState.eventTimestamp = timestamp.Time
currentState.position = position.Float64 currentState.position = position.Decimal
// psql does not provide unsigned numbers so we work around it // psql does not provide unsigned numbers so we work around it
currentState.offset = uint32(offset.Int64) currentState.offset = uint32(offset.Int64)
return currentState, nil return currentState, nil

View File

@@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database/mock" "github.com/zitadel/zitadel/internal/database/mock"
@@ -166,7 +167,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{ updatedState: &state{
instanceID: "instance", instanceID: "instance",
eventTimestamp: time.Now(), eventTimestamp: time.Now(),
position: 42, position: decimal.NewFromInt(42),
}, },
}, },
isErr: func(t *testing.T, err error) { isErr: func(t *testing.T, err error) {
@@ -192,7 +193,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{ updatedState: &state{
instanceID: "instance", instanceID: "instance",
eventTimestamp: time.Now(), eventTimestamp: time.Now(),
position: 42, position: decimal.NewFromInt(42),
}, },
}, },
isErr: func(t *testing.T, err error) { isErr: func(t *testing.T, err error) {
@@ -217,7 +218,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
eventstore.AggregateType("aggregate type"), eventstore.AggregateType("aggregate type"),
uint64(42), uint64(42),
mock.AnyType[time.Time]{}, mock.AnyType[time.Time]{},
float64(42), decimal.NewFromInt(42),
uint32(0), uint32(0),
), ),
mock.WithExecRowsAffected(1), mock.WithExecRowsAffected(1),
@@ -228,7 +229,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
updatedState: &state{ updatedState: &state{
instanceID: "instance", instanceID: "instance",
eventTimestamp: time.Now(), eventTimestamp: time.Now(),
position: 42, position: decimal.NewFromInt(42),
aggregateType: "aggregate type", aggregateType: "aggregate type",
aggregateID: "aggregate id", aggregateID: "aggregate id",
sequence: 42, sequence: 42,
@@ -397,7 +398,7 @@ func TestHandler_currentState(t *testing.T) {
"aggregate type", "aggregate type",
int64(42), int64(42),
testTime, testTime,
float64(42), decimal.NewFromInt(42).String(),
uint16(10), uint16(10),
}, },
}, },
@@ -412,7 +413,7 @@ func TestHandler_currentState(t *testing.T) {
currentState: &state{ currentState: &state{
instanceID: "instance", instanceID: "instance",
eventTimestamp: testTime, eventTimestamp: testTime,
position: 42, position: decimal.NewFromInt(42),
aggregateType: "aggregate type", aggregateType: "aggregate type",
aggregateID: "aggregate id", aggregateID: "aggregate id",
sequence: 42, sequence: 42,

View File

@@ -9,6 +9,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"golang.org/x/exp/constraints" "golang.org/x/exp/constraints"
@@ -82,7 +83,7 @@ func (h *Handler) reduce(event eventstore.Event) (*Statement, error) {
type Statement struct { type Statement struct {
Aggregate *eventstore.Aggregate Aggregate *eventstore.Aggregate
Sequence uint64 Sequence uint64
Position float64 Position decimal.Decimal
CreationDate time.Time CreationDate time.Time
offset uint32 offset uint32

View File

@@ -2,13 +2,14 @@ package eventstore_test
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"os" "os"
"testing" "testing"
"time" "time"
"github.com/cockroachdb/cockroach-go/v2/testserver" "github.com/cockroachdb/cockroach-go/v2/testserver"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib" "github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging" "github.com/zitadel/logging"
@@ -41,13 +42,15 @@ func TestMain(m *testing.M) {
testCRDBClient = &database.DB{ testCRDBClient = &database.DB{
Database: new(testDB), Database: new(testDB),
} }
config, err := pgxpool.ParseConfig(ts.PGURL().String())
connConfig, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil { if err != nil {
logging.WithFields("error", err).Fatal("unable to parse db url") logging.WithFields("error", err).Fatal("unable to parse db config")
} }
connConfig.AfterConnect = new_es.RegisterEventstoreTypes config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) pgxdecimal.Register(conn.TypeMap())
return new_es.RegisterEventstoreTypes(ctx, conn)
}
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil { if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool") logging.WithFields("error", err).Fatal("unable to create db pool")
} }
@@ -112,10 +115,15 @@ func initDB(ctx context.Context, db *database.DB) error {
} }
func connectLocalhost() (*database.DB, error) { func connectLocalhost() (*database.DB, error) {
client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable") config, err := pgxpool.ParseConfig("postgresql://root@localhost:26257/defaultdb?sslmode=disable")
if err != nil { if err != nil {
return nil, err return nil, err
} }
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
return nil, err
}
client := stdlib.OpenDBFromPool(pool)
if err = client.Ping(); err != nil { if err = client.Ping(); err != nil {
return nil, err return nil, err
} }

View File

@@ -1,6 +1,10 @@
package eventstore package eventstore
import "time" import (
"time"
"github.com/shopspring/decimal"
)
// ReadModel is the minimum representation of a read model. // ReadModel is the minimum representation of a read model.
// It implements a basic reducer // It implements a basic reducer
@@ -13,7 +17,7 @@ type ReadModel struct {
Events []Event `json:"-"` Events []Event `json:"-"`
ResourceOwner string `json:"-"` ResourceOwner string `json:"-"`
InstanceID string `json:"-"` InstanceID string `json:"-"`
Position float64 `json:"-"` Position decimal.Decimal `json:"-"`
} }
// AppendEvents adds all the events to the read model. // AppendEvents adds all the events to the read model.

View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
@@ -22,7 +23,7 @@ type Event struct {
// Seq is the sequence of the event // Seq is the sequence of the event
Seq uint64 Seq uint64
// Pos is the global sequence of the event multiple events can have the same sequence // Pos is the global sequence of the event multiple events can have the same sequence
Pos float64 Pos decimal.Decimal
//CreationDate is the time the event is created //CreationDate is the time the event is created
// it's used for human readability. // it's used for human readability.
@@ -97,7 +98,7 @@ func (e *Event) Sequence() uint64 {
} }
// Position implements [eventstore.Event] // Position implements [eventstore.Event]
func (e *Event) Position() float64 { func (e *Event) Position() decimal.Decimal {
return e.Pos return e.Pos
} }

View File

@@ -13,6 +13,7 @@ import (
context "context" context "context"
reflect "reflect" reflect "reflect"
decimal "github.com/shopspring/decimal"
database "github.com/zitadel/zitadel/internal/database" database "github.com/zitadel/zitadel/internal/database"
eventstore "github.com/zitadel/zitadel/internal/eventstore" eventstore "github.com/zitadel/zitadel/internal/eventstore"
gomock "go.uber.org/mock/gomock" gomock "go.uber.org/mock/gomock"
@@ -98,19 +99,19 @@ func (mr *MockQuerierMockRecorder) InstanceIDs(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceIDs", reflect.TypeOf((*MockQuerier)(nil).InstanceIDs), arg0, arg1)
} }
// LatestSequence mocks base method. // LatestPosition mocks base method.
func (m *MockQuerier) LatestSequence(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (float64, error) { func (m *MockQuerier) LatestPosition(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1) ret := m.ctrl.Call(m, "LatestPosition", arg0, arg1)
ret0, _ := ret[0].(float64) ret0, _ := ret[0].(decimal.Decimal)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// LatestSequence indicates an expected call of LatestSequence. // LatestPosition indicates an expected call of LatestPosition.
func (mr *MockQuerierMockRecorder) LatestSequence(arg0, arg1 any) *gomock.Call { func (mr *MockQuerierMockRecorder) LatestPosition(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockQuerier)(nil).LatestSequence), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestPosition", reflect.TypeOf((*MockQuerier)(nil).LatestPosition), arg0, arg1)
} }
// MockPusher is a mock of Pusher interface. // MockPusher is a mock of Pusher interface.

View File

@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
@@ -197,8 +198,8 @@ func (e *mockEvent) Sequence() uint64 {
return e.sequence return e.sequence
} }
func (e *mockEvent) Position() float64 { func (e *mockEvent) Position() decimal.Decimal {
return 0 return decimal.Decimal{}
} }
func (e *mockEvent) CreatedAt() time.Time { func (e *mockEvent) CreatedAt() time.Time {

View File

@@ -3,6 +3,8 @@ package repository
import ( import (
"database/sql" "database/sql"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
@@ -58,6 +60,8 @@ const (
//OperationNotIn checks if a stored value does not match one of the passed value list //OperationNotIn checks if a stored value does not match one of the passed value list
OperationNotIn OperationNotIn
OperationGreaterOrEquals
operationCount operationCount
) )
@@ -252,10 +256,10 @@ func instanceIDsFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuer
} }
func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter { func positionAfterFilter(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter {
if builder.GetPositionAfter() == 0 { if builder.GetPositionAtLeast().IsZero() {
return nil return nil
} }
query.Position = NewFilter(FieldPosition, builder.GetPositionAfter(), OperationGreater) query.Position = NewFilter(FieldPosition, builder.GetPositionAtLeast(), OperationGreaterOrEquals)
return query.Position return query.Position
} }
@@ -297,7 +301,7 @@ func eventDataFilter(query *eventstore.SearchQuery) *Filter {
} }
func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter { func eventPositionAfterFilter(query *eventstore.SearchQuery) *Filter {
if pos := query.GetPositionAfter(); pos != 0 { if pos := query.GetPositionAfter(); !pos.Equal(decimal.Decimal{}) {
return NewFilter(FieldPosition, pos, OperationGreater) return NewFilter(FieldPosition, pos, OperationGreater)
} }
return nil return nil

View File

@@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@@ -264,11 +265,11 @@ func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.S
return err return err
} }
// LatestSequence returns the latest sequence found by the search query // LatestPosition returns the latest position found by the search query
func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) { func (db *CRDB) LatestPosition(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (decimal.Decimal, error) {
var position sql.NullFloat64 var position decimal.Decimal
err := query(ctx, db, searchQuery, &position, false) err := query(ctx, db, searchQuery, &position, false)
return position.Float64, err return position, err
} }
// InstanceIDs returns the instance ids found by the search query // InstanceIDs returns the instance ids found by the search query
@@ -335,7 +336,7 @@ func (db *CRDB) eventQuery(useV1 bool) string {
" FROM eventstore.events2" " FROM eventstore.events2"
} }
func (db *CRDB) maxSequenceQuery(useV1 bool) string { func (db *CRDB) maxPositionQuery(useV1 bool) string {
if useV1 { if useV1 {
return `SELECT event_sequence FROM eventstore.events` return `SELECT event_sequence FROM eventstore.events`
} }
@@ -413,6 +414,8 @@ func (db *CRDB) operation(operation repository.Operation) string {
return "=" return "="
case repository.OperationGreater: case repository.OperationGreater:
return ">" return ">"
case repository.OperationGreaterOrEquals:
return ">="
case repository.OperationLess: case repository.OperationLess:
return "<" return "<"
case repository.OperationJSONContains: case repository.OperationJSONContains:

View File

@@ -4,6 +4,8 @@ import (
"database/sql" "database/sql"
"testing" "testing"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository" "github.com/zitadel/zitadel/internal/eventstore/repository"
) )
@@ -312,7 +314,7 @@ func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Ev
ResourceOwner: sql.NullString{String: "ro", Valid: true}, ResourceOwner: sql.NullString{String: "ro", Valid: true},
Typ: "test.created", Typ: "test.created",
Version: "v1", Version: "v1",
Pos: 42, Pos: decimal.NewFromInt(42),
} }
for _, opt := range opts { for _, opt := range opts {

View File

@@ -8,6 +8,8 @@ import (
"time" "time"
"github.com/cockroachdb/cockroach-go/v2/testserver" "github.com/cockroachdb/cockroach-go/v2/testserver"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib" "github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging" "github.com/zitadel/logging"
@@ -15,7 +17,6 @@ import (
"github.com/zitadel/zitadel/cmd/initialise" "github.com/zitadel/zitadel/cmd/initialise"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach" "github.com/zitadel/zitadel/internal/database/cockroach"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
) )
var ( var (
@@ -36,7 +37,10 @@ func TestMain(m *testing.M) {
if err != nil { if err != nil {
logging.WithFields("error", err).Fatal("unable to parse db url") logging.WithFields("error", err).Fatal("unable to parse db url")
} }
connConfig.AfterConnect = new_es.RegisterEventstoreTypes connConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
if err != nil { if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool") logging.WithFields("error", err).Fatal("unable to create db pool")

View File

@@ -9,6 +9,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/api/call"
@@ -25,7 +26,7 @@ type querier interface {
conditionFormat(repository.Operation) string conditionFormat(repository.Operation) string
placeholder(query string) string placeholder(query string) string
eventQuery(useV1 bool) string eventQuery(useV1 bool) string
maxSequenceQuery(useV1 bool) string maxPositionQuery(useV1 bool) string
instanceIDsQuery(useV1 bool) string instanceIDsQuery(useV1 bool) string
Client() *database.DB Client() *database.DB
orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string
@@ -74,7 +75,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
// instead of using the max function of the database (which doesn't work for postgres) // instead of using the max function of the database (which doesn't work for postgres)
// we select the most recent row // we select the most recent row
if q.Columns == eventstore.ColumnsMaxSequence { if q.Columns == eventstore.ColumnsMaxPosition {
q.Limit = 1 q.Limit = 1
q.Desc = true q.Desc = true
} }
@@ -91,7 +92,7 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
switch q.Columns { switch q.Columns {
case eventstore.ColumnsEvent, case eventstore.ColumnsEvent,
eventstore.ColumnsMaxSequence: eventstore.ColumnsMaxPosition:
query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1) query += criteria.orderByEventSequence(q.Desc, shouldOrderBySequence, useV1)
} }
@@ -147,8 +148,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) { func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (string, func(s scan, dest interface{}) error) {
switch columns { switch columns {
case eventstore.ColumnsMaxSequence: case eventstore.ColumnsMaxPosition:
return criteria.maxSequenceQuery(useV1), maxSequenceScanner return criteria.maxPositionQuery(useV1), maxPositionScanner
case eventstore.ColumnsInstanceIDs: case eventstore.ColumnsInstanceIDs:
return criteria.instanceIDsQuery(useV1), instanceIDsScanner return criteria.instanceIDsQuery(useV1), instanceIDsScanner
case eventstore.ColumnsEvent: case eventstore.ColumnsEvent:
@@ -166,13 +167,15 @@ func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string
return criteria.Timetravel(took) return criteria.Timetravel(took)
} }
func maxSequenceScanner(row scan, dest interface{}) (err error) { func maxPositionScanner(row scan, dest interface{}) (err error) {
position, ok := dest.(*sql.NullFloat64) position, ok := dest.(*decimal.Decimal)
if !ok { if !ok {
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest) return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be decimal.Decimal got: %T", dest)
} }
err = row(position) var res decimal.NullDecimal
err = row(&res)
if err == nil || errors.Is(err, sql.ErrNoRows) { if err == nil || errors.Is(err, sql.ErrNoRows) {
*position = res.Decimal
return nil return nil
} }
return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong") return zerrors.ThrowInternal(err, "SQL-bN5xg", "something went wrong")
@@ -201,7 +204,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest) return zerrors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest)
} }
event := new(repository.Event) event := new(repository.Event)
position := new(sql.NullFloat64) position := new(decimal.NullDecimal)
if useV1 { if useV1 {
err = scanner( err = scanner(
@@ -238,7 +241,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
logging.New().WithError(err).Warn("unable to scan row") logging.New().WithError(err).Warn("unable to scan row")
return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row") return zerrors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
} }
event.Pos = position.Float64 event.Pos = position.Decimal
return reduce(event) return reduce(event)
} }
} }

View File

@@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/DATA-DOG/go-sqlmock" "github.com/DATA-DOG/go-sqlmock"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
@@ -110,36 +111,36 @@ func Test_prepareColumns(t *testing.T) {
{ {
name: "max column", name: "max column",
args: args{ args: args{
columns: eventstore.ColumnsMaxSequence, columns: eventstore.ColumnsMaxPosition,
dest: new(sql.NullFloat64), dest: new(decimal.Decimal),
useV1: true, useV1: true,
}, },
res: res{ res: res{
query: `SELECT event_sequence FROM eventstore.events`, query: `SELECT event_sequence FROM eventstore.events`,
expected: sql.NullFloat64{Float64: 43, Valid: true}, expected: decimal.NewFromInt(42),
}, },
fields: fields{ fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
}, },
}, },
{ {
name: "max column v2", name: "max column v2",
args: args{ args: args{
columns: eventstore.ColumnsMaxSequence, columns: eventstore.ColumnsMaxPosition,
dest: new(sql.NullFloat64), dest: new(decimal.Decimal),
}, },
res: res{ res: res{
query: `SELECT "position" FROM eventstore.events2`, query: `SELECT "position" FROM eventstore.events2`,
expected: sql.NullFloat64{Float64: 43, Valid: true}, expected: decimal.NewFromInt(42),
}, },
fields: fields{ fields: fields{
dbRow: []interface{}{sql.NullFloat64{Float64: 43, Valid: true}}, dbRow: []interface{}{decimal.NewNullDecimal(decimal.NewFromInt(42))},
}, },
}, },
{ {
name: "max sequence wrong dest type", name: "max sequence wrong dest type",
args: args{ args: args{
columns: eventstore.ColumnsMaxSequence, columns: eventstore.ColumnsMaxPosition,
dest: new(uint64), dest: new(uint64),
}, },
res: res{ res: res{
@@ -179,11 +180,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{ res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{ expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"}, &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.NewFromInt(42), Data: nil, Version: "v1"},
}, },
}, },
fields: fields{ fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 42, Valid: true}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NewNullDecimal(decimal.NewFromInt(42)), sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
}, },
}, },
{ {
@@ -198,11 +199,11 @@ func Test_prepareColumns(t *testing.T) {
res: res{ res: res{
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`, query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
expected: []eventstore.Event{ expected: []eventstore.Event{
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"}, &repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: decimal.Decimal{}, Data: nil, Version: "v1"},
}, },
}, },
fields: fields{ fields: fields{
dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), sql.NullFloat64{Float64: 0, Valid: false}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)}, dbRow: []interface{}{time.Time{}, eventstore.EventType(""), uint64(5), decimal.NullDecimal{}, sql.RawBytes(nil), "", sql.NullString{}, "", eventstore.AggregateType("user"), "hodor", uint8(1)},
}, },
}, },
{ {
@@ -1006,7 +1007,7 @@ func Test_query_events_mocked(t *testing.T) {
InstanceID("instanceID"). InstanceID("instanceID").
OrderDesc(). OrderDesc().
Limit(5). Limit(5).
PositionAfter(123.456). PositionAtLeast(decimal.NewFromFloat(123.456)).
AddQuery(). AddQuery().
AggregateTypes("notify"). AggregateTypes("notify").
EventTypes("notify.foo.bar"). EventTypes("notify.foo.bar").
@@ -1020,9 +1021,9 @@ func Test_query_events_mocked(t *testing.T) {
fields: fields{ fields: fields{
mock: newMockClient(t).expectQuery(t, mock: newMockClient(t).expectQuery(t,
regexp.QuoteMeta( regexp.QuoteMeta(
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY event_sequence DESC LIMIT $9`, `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY event_sequence DESC LIMIT $9`,
), ),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
), ),
}, },
res: res{ res: res{
@@ -1037,7 +1038,7 @@ func Test_query_events_mocked(t *testing.T) {
InstanceID("instanceID"). InstanceID("instanceID").
OrderDesc(). OrderDesc().
Limit(5). Limit(5).
PositionAfter(123.456). PositionAtLeast(decimal.NewFromFloat(123.456)).
AddQuery(). AddQuery().
AggregateTypes("notify"). AggregateTypes("notify").
EventTypes("notify.foo.bar"). EventTypes("notify.foo.bar").
@@ -1051,9 +1052,9 @@ func Test_query_events_mocked(t *testing.T) {
fields: fields{ fields: fields{
mock: newMockClient(t).expectQuery(t, mock: newMockClient(t).expectQuery(t,
regexp.QuoteMeta( regexp.QuoteMeta(
`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`, `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" >= $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" >= $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`,
), ),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)}, []driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), decimal.NewFromFloat(123.456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", decimal.NewFromFloat(123.456), uint64(5)},
), ),
}, },
res: res{ res: res{

View File

@@ -5,6 +5,8 @@ import (
"database/sql" "database/sql"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
) )
@@ -26,7 +28,7 @@ type SearchQueryBuilder struct {
lockRows bool lockRows bool
lockOption LockOption lockOption LockOption
allowTimeTravel bool allowTimeTravel bool
positionAfter float64 positionAtLeast decimal.Decimal
awaitOpenTransactions bool awaitOpenTransactions bool
creationDateAfter time.Time creationDateAfter time.Time
creationDateBefore time.Time creationDateBefore time.Time
@@ -81,8 +83,8 @@ func (b *SearchQueryBuilder) GetAllowTimeTravel() bool {
return b.allowTimeTravel return b.allowTimeTravel
} }
func (b SearchQueryBuilder) GetPositionAfter() float64 { func (b SearchQueryBuilder) GetPositionAtLeast() decimal.Decimal {
return b.positionAfter return b.positionAtLeast
} }
func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool { func (b SearchQueryBuilder) GetAwaitOpenTransactions() bool {
@@ -118,7 +120,7 @@ type SearchQuery struct {
aggregateIDs []string aggregateIDs []string
eventTypes []EventType eventTypes []EventType
eventData map[string]interface{} eventData map[string]interface{}
positionAfter float64 positionAfter decimal.Decimal
} }
func (q SearchQuery) GetAggregateTypes() []AggregateType { func (q SearchQuery) GetAggregateTypes() []AggregateType {
@@ -137,7 +139,7 @@ func (q SearchQuery) GetEventData() map[string]interface{} {
return q.eventData return q.eventData
} }
func (q SearchQuery) GetPositionAfter() float64 { func (q SearchQuery) GetPositionAfter() decimal.Decimal {
return q.positionAfter return q.positionAfter
} }
@@ -161,8 +163,8 @@ type Columns int8
const ( const (
//ColumnsEvent represents all fields of an event //ColumnsEvent represents all fields of an event
ColumnsEvent = iota + 1 ColumnsEvent = iota + 1
// ColumnsMaxSequence represents the latest sequence of the filtered events // ColumnsMaxPosition represents the latest sequence of the filtered events
ColumnsMaxSequence ColumnsMaxPosition
// ColumnsInstanceIDs represents the instance ids of the filtered events // ColumnsInstanceIDs represents the instance ids of the filtered events
ColumnsInstanceIDs ColumnsInstanceIDs
@@ -296,9 +298,9 @@ func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder {
return builder return builder
} }
// PositionAfter filters for events which happened after the specified time // PositionAtLeast filters for events which happened after the specified time
func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder { func (builder *SearchQueryBuilder) PositionAtLeast(position decimal.Decimal) *SearchQueryBuilder {
builder.positionAfter = position builder.positionAtLeast = position
return builder return builder
} }
@@ -405,7 +407,7 @@ func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery {
return query return query
} }
func (query *SearchQuery) PositionAfter(position float64) *SearchQuery { func (query *SearchQuery) PositionAfter(position decimal.Decimal) *SearchQuery {
query.positionAfter = position query.positionAfter = position
return query return query
} }

View File

@@ -116,10 +116,10 @@ func TestSearchQuerybuilderSetters(t *testing.T) {
{ {
name: "set columns", name: "set columns",
args: args{ args: args{
setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxSequence)}, setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{testSetColumns(ColumnsMaxPosition)},
}, },
res: &SearchQueryBuilder{ res: &SearchQueryBuilder{
columns: ColumnsMaxSequence, columns: ColumnsMaxPosition,
}, },
}, },
{ {

View File

@@ -5,6 +5,8 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
) )
@@ -20,7 +22,7 @@ var _ eventstore.Event = (*Event)(nil)
type Event struct { type Event struct {
ID string ID string
Seq uint64 Seq uint64
Pos float64 Pos decimal.Decimal
CreationDate time.Time CreationDate time.Time
Typ eventstore.EventType Typ eventstore.EventType
PreviousSequence uint64 PreviousSequence uint64
@@ -80,7 +82,7 @@ func (e *Event) Sequence() uint64 {
} }
// Position implements [eventstore.Event] // Position implements [eventstore.Event]
func (e *Event) Position() float64 { func (e *Event) Position() decimal.Decimal {
return e.Pos return e.Pos
} }

View File

@@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@@ -42,7 +43,7 @@ type event struct {
command *command command *command
createdAt time.Time createdAt time.Time
sequence uint64 sequence uint64
position float64 position decimal.Decimal
} }
// TODO: remove on v3 // TODO: remove on v3
@@ -152,8 +153,8 @@ func (e *event) Sequence() uint64 {
return e.sequence return e.sequence
} }
// Sequence implements [eventstore.Event] // Position implements [eventstore.Event]
func (e *event) Position() float64 { func (e *event) Position() decimal.Decimal {
return e.position return e.position
} }

View File

@@ -61,7 +61,7 @@ func Test_Call(t *testing.T) {
args{ args{
ctx: context.Background(), ctx: context.Background(),
timeout: time.Second, timeout: time.Second,
sleep: time.Second, sleep: 2 * time.Second,
method: http.MethodPost, method: http.MethodPost,
body: []byte("{\"request\": \"values\"}"), body: []byte("{\"request\": \"values\"}"),
respBody: []byte("{\"response\": \"values\"}"), respBody: []byte("{\"response\": \"values\"}"),

View File

@@ -2,8 +2,12 @@ package notification
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
@@ -68,12 +72,34 @@ func Start(ctx context.Context) {
} }
} }
func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error {
if len(projections) == 0 {
return nil
}
position, err := es.LatestPosition(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1))
if err != nil {
return err
}
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection")
_, err = projection.Trigger(ctx, handler.WithMinPosition(position))
if err != nil {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set")
}
return nil
}
func ProjectInstance(ctx context.Context) error { func ProjectInstance(ctx context.Context) error {
for _, projection := range projections { for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")
_, err := projection.Trigger(ctx) _, err := projection.Trigger(ctx)
if err != nil { if err != nil {
return err return err
} }
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("notification projection done")
} }
return nil return nil
} }

View File

@@ -5,6 +5,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/shopspring/decimal"
"golang.org/x/text/language" "golang.org/x/text/language"
"github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/domain"
@@ -140,7 +141,7 @@ func (q *Queries) accessTokenByOIDCSessionAndTokenID(ctx context.Context, oidcSe
// checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination) // checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination)
// occurred after a certain time and will return an error if so. // occurred after a certain time and will return an error if so.
func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position float64, fingerprintID string) (err error) { func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position decimal.Decimal, fingerprintID string) (err error) {
ctx, span := tracing.NewSpan(ctx) ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }() defer func() { span.EndWithError(err) }()
@@ -165,7 +166,7 @@ func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID,
} }
type sessionTerminatedModel struct { type sessionTerminatedModel struct {
position float64 position decimal.Decimal
sessionID string sessionID string
userID string userID string
fingerPrintID string fingerPrintID string

View File

@@ -10,6 +10,7 @@ import (
"time" "time"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/api/call"
@@ -26,7 +27,7 @@ type Stateful interface {
type State struct { type State struct {
LastRun time.Time LastRun time.Time
Position float64 Position decimal.Decimal
EventCreatedAt time.Time EventCreatedAt time.Time
AggregateID string AggregateID string
AggregateType eventstore.AggregateType AggregateType eventstore.AggregateType
@@ -221,7 +222,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild
var ( var (
creationDate sql.NullTime creationDate sql.NullTime
lastUpdated sql.NullTime lastUpdated sql.NullTime
position sql.NullFloat64 position decimal.NullDecimal
) )
err := row.Scan( err := row.Scan(
&creationDate, &creationDate,
@@ -234,7 +235,7 @@ func prepareLatestState(ctx context.Context, db prepareDatabase) (sq.SelectBuild
return &State{ return &State{
EventCreatedAt: creationDate.Time, EventCreatedAt: creationDate.Time,
LastRun: lastUpdated.Time, LastRun: lastUpdated.Time,
Position: position.Float64, Position: position.Decimal,
}, nil }, nil
} }
} }
@@ -259,7 +260,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec
var ( var (
lastRun sql.NullTime lastRun sql.NullTime
eventDate sql.NullTime eventDate sql.NullTime
currentPosition sql.NullFloat64 currentPosition decimal.NullDecimal
aggregateType sql.NullString aggregateType sql.NullString
aggregateID sql.NullString aggregateID sql.NullString
sequence sql.NullInt64 sequence sql.NullInt64
@@ -280,7 +281,7 @@ func prepareCurrentStateQuery(ctx context.Context, db prepareDatabase) (sq.Selec
} }
currentState.State.EventCreatedAt = eventDate.Time currentState.State.EventCreatedAt = eventDate.Time
currentState.State.LastRun = lastRun.Time currentState.State.LastRun = lastRun.Time
currentState.Position = currentPosition.Float64 currentState.Position = currentPosition.Decimal
currentState.AggregateType = eventstore.AggregateType(aggregateType.String) currentState.AggregateType = eventstore.AggregateType(aggregateType.String)
currentState.AggregateID = aggregateID.String currentState.AggregateID = aggregateID.String
currentState.Sequence = uint64(sequence.Int64) currentState.Sequence = uint64(sequence.Int64)

View File

@@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"testing" "testing"
"github.com/shopspring/decimal"
) )
var ( var (
@@ -87,7 +89,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
State: State{ State: State{
EventCreatedAt: testNow, EventCreatedAt: testNow,
LastRun: testNow, LastRun: testNow,
Position: 20211108, Position: decimal.NewFromInt(20211108),
AggregateID: "agg-id", AggregateID: "agg-id",
AggregateType: "agg-type", AggregateType: "agg-type",
Sequence: 20211108, Sequence: 20211108,
@@ -134,7 +136,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
ProjectionName: "projection-name", ProjectionName: "projection-name",
State: State{ State: State{
EventCreatedAt: testNow, EventCreatedAt: testNow,
Position: 20211108, Position: decimal.NewFromInt(20211108),
LastRun: testNow, LastRun: testNow,
AggregateID: "agg-id", AggregateID: "agg-id",
AggregateType: "agg-type", AggregateType: "agg-type",
@@ -145,7 +147,7 @@ func Test_CurrentSequencesPrepares(t *testing.T) {
ProjectionName: "projection-name2", ProjectionName: "projection-name2",
State: State{ State: State{
EventCreatedAt: testNow, EventCreatedAt: testNow,
Position: 20211108, Position: decimal.NewFromInt(20211108),
LastRun: testNow, LastRun: testNow,
AggregateID: "agg-id", AggregateID: "agg-id",
AggregateType: "agg-type", AggregateType: "agg-type",

View File

@@ -2,6 +2,9 @@ package projection
import ( import (
"context" "context"
"fmt"
"github.com/zitadel/logging"
internal_authz "github.com/zitadel/zitadel/internal/api/authz" internal_authz "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
@@ -89,6 +92,7 @@ var (
) )
type projection interface { type projection interface {
ProjectionName() string
Start(ctx context.Context) Start(ctx context.Context)
Init(ctx context.Context) error Init(ctx context.Context) error
Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error) Trigger(ctx context.Context, opts ...handler.TriggerOpt) (_ context.Context, err error)
@@ -97,6 +101,7 @@ type projection interface {
var ( var (
projections []projection projections []projection
fields []*handler.FieldHandler
) )
func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, systemUsers map[string]*internal_authz.SystemAPIUser) error { func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, systemUsers map[string]*internal_authz.SystemAPIUser) error {
@@ -178,6 +183,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
MembershipFields = newFillMembershipFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsMemberships])) MembershipFields = newFillMembershipFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsMemberships]))
newProjectionsList() newProjectionsList()
newFieldsList()
return nil return nil
} }
@@ -201,7 +207,8 @@ func Start(ctx context.Context) {
} }
func ProjectInstance(ctx context.Context) error { func ProjectInstance(ctx context.Context) error {
for _, projection := range projections { for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
_, err := projection.Trigger(ctx) _, err := projection.Trigger(ctx)
if err != nil { if err != nil {
return err return err
@@ -210,6 +217,18 @@ func ProjectInstance(ctx context.Context) error {
return nil return nil
} }
func ProjectInstanceFields(ctx context.Context) error {
for i, fieldProjection := range fields {
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
err := fieldProjection.Trigger(ctx)
if err != nil {
return err
}
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done")
}
return nil
}
func ApplyCustomConfig(customConfig CustomConfig) handler.Config { func ApplyCustomConfig(customConfig CustomConfig) handler.Config {
return applyCustomConfig(projectionConfig, customConfig) return applyCustomConfig(projectionConfig, customConfig)
} }
@@ -234,6 +253,19 @@ func applyCustomConfig(config handler.Config, customConfig CustomConfig) handler
return config return config
} }
// we know this is ugly, but we need to have a singleton slice of all projections
// and are only able to initialize it after all projections are created
// as setup and start currently create them individually, we make sure we get the right one
// will be refactored when changing to new id based projections
func newFieldsList() {
fields = []*handler.FieldHandler{
ProjectGrantFields,
OrgDomainVerifiedFields,
InstanceDomainFields,
MembershipFields,
}
}
// we know this is ugly, but we need to have a singleton slice of all projections // we know this is ugly, but we need to have a singleton slice of all projections
// and are only able to initialize it after all projections are created // and are only able to initialize it after all projections are created
// as setup and start currently create them individually, we make sure we get the right one // as setup and start currently create them individually, we make sure we get the right one

View File

@@ -281,7 +281,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh
return nil, zerrors.ThrowInternal(err, "QUERY-wXnQR", "Errors.Query.SQLStatement") return nil, zerrors.ThrowInternal(err, "QUERY-wXnQR", "Errors.Query.SQLStatement")
} }
latestSequence, err := q.latestState(ctx, userGrantTable) latestState, err := q.latestState(ctx, userGrantTable)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -294,7 +294,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh
return nil, err return nil, err
} }
grants.State = latestSequence grants.State = latestState
return grants, nil return grants, nil
} }

View File

@@ -144,7 +144,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer
if err != nil { if err != nil {
return nil, zerrors.ThrowInvalidArgument(err, "QUERY-T84X9", "Errors.Query.InvalidRequest") return nil, zerrors.ThrowInvalidArgument(err, "QUERY-T84X9", "Errors.Query.InvalidRequest")
} }
latestSequence, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable) latestState, err := q.latestState(ctx, orgMemberTable, instanceMemberTable, projectMemberTable, projectGrantMemberTable)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -157,7 +157,7 @@ func (q *Queries) Memberships(ctx context.Context, queries *MembershipSearchQuer
if err != nil { if err != nil {
return nil, err return nil, err
} }
memberships.State = latestSequence memberships.State = latestState
return memberships, nil return memberships, nil
} }

View File

@@ -3,6 +3,7 @@ package database
import ( import (
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"golang.org/x/exp/constraints" "golang.org/x/exp/constraints"
) )
@@ -94,7 +95,7 @@ func (c numberCompare) String() string {
} }
type number interface { type number interface {
constraints.Integer | constraints.Float | time.Time constraints.Integer | constraints.Float | time.Time | decimal.Decimal
// TODO: condition must know if it's args are named parameters or not // TODO: condition must know if it's args are named parameters or not
// constraints.Integer | constraints.Float | time.Time | placeholder // constraints.Integer | constraints.Float | time.Time | placeholder
} }

View File

@@ -2,6 +2,8 @@ package eventstore
import ( import (
"context" "context"
"github.com/shopspring/decimal"
) )
func NewEventstore(querier Querier, pusher Pusher) *EventStore { func NewEventstore(querier Querier, pusher Pusher) *EventStore {
@@ -30,12 +32,12 @@ type healthier interface {
} }
type GlobalPosition struct { type GlobalPosition struct {
Position float64 Position decimal.Decimal
InPositionOrder uint32 InPositionOrder uint32
} }
func (gp GlobalPosition) IsLess(other GlobalPosition) bool { func (gp GlobalPosition) IsLess(other GlobalPosition) bool {
return gp.Position < other.Position || (gp.Position == other.Position && gp.InPositionOrder < other.InPositionOrder) return gp.Position.LessThan(other.Position) || (gp.Position.Equal(other.Position) && gp.InPositionOrder < other.InPositionOrder)
} }
type Reducer interface { type Reducer interface {

View File

@@ -8,6 +8,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/database/mock"
"github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
@@ -818,7 +820,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
}, },
), ),
@@ -899,11 +901,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
{ {
time.Now(), time.Now(),
float64(123.1), decimal.NewFromFloat(123.1).String(),
}, },
}, },
), ),
@@ -984,11 +986,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
{ {
time.Now(), time.Now(),
float64(123.1), decimal.NewFromFloat(123.1).String(),
}, },
}, },
), ),
@@ -1044,7 +1046,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
}, },
), ),
@@ -1099,7 +1101,7 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
}, },
), ),
@@ -1181,11 +1183,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
{ {
time.Now(), time.Now(),
float64(123.1), decimal.NewFromFloat(123.1).String(),
}, },
}, },
), ),
@@ -1272,11 +1274,11 @@ func Test_push(t *testing.T) {
[][]driver.Value{ [][]driver.Value{
{ {
time.Now(), time.Now(),
float64(123), decimal.NewFromFloat(123).String(),
}, },
{ {
time.Now(), time.Now(),
float64(123.1), decimal.NewFromFloat(123.1).String(),
}, },
}, },
), ),

View File

@@ -8,6 +8,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/database"
"github.com/zitadel/zitadel/internal/v2/database/mock" "github.com/zitadel/zitadel/internal/v2/database/mock"
"github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/eventstore"
@@ -541,13 +543,13 @@ func Test_writeFilter(t *testing.T) {
args: args{ args: args{
filter: eventstore.NewFilter( filter: eventstore.NewFilter(
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.PositionGreater(123.4, 0), eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0),
), ),
), ),
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND position > $2 ORDER BY position, in_tx_order", query: " WHERE instance_id = $1 AND position > $2 ORDER BY position, in_tx_order",
args: []any{"i1", 123.4}, args: []any{"i1", decimal.NewFromFloat(123.4)},
}, },
}, },
{ {
@@ -555,18 +557,18 @@ func Test_writeFilter(t *testing.T) {
args: args{ args: args{
filter: eventstore.NewFilter( filter: eventstore.NewFilter(
eventstore.FilterPagination( eventstore.FilterPagination(
// eventstore.PositionGreater(123.4, 0), // eventstore.PositionGreater(decimal.NewFromFloat(123.4), 0),
// eventstore.PositionLess(125.4, 10), // eventstore.PositionLess(125.4, 10),
eventstore.PositionBetween( eventstore.PositionBetween(
&eventstore.GlobalPosition{Position: 123.4}, &eventstore.GlobalPosition{Position: decimal.NewFromFloat(123.4)},
&eventstore.GlobalPosition{Position: 125.4, InPositionOrder: 10}, &eventstore.GlobalPosition{Position: decimal.NewFromFloat(125.4), InPositionOrder: 10},
), ),
), ),
), ),
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order < $3) OR position < $4) AND position > $5 ORDER BY position, in_tx_order", query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order < $3) OR position < $4) AND position > $5 ORDER BY position, in_tx_order",
args: []any{"i1", 125.4, uint32(10), 125.4, 123.4}, args: []any{"i1", decimal.NewFromFloat(125.4), uint32(10), decimal.NewFromFloat(125.4), decimal.NewFromFloat(123.4)},
// TODO: (adlerhurst) would require some refactoring to reuse existing args // TODO: (adlerhurst) would require some refactoring to reuse existing args
// query: " WHERE instance_id = $1 AND position > $2 AND ((position = $3 AND in_tx_order < $4) OR position < $3) ORDER BY position, in_tx_order", // query: " WHERE instance_id = $1 AND position > $2 AND ((position = $3 AND in_tx_order < $4) OR position < $3) ORDER BY position, in_tx_order",
// args: []any{"i1", 123.4, 125.4, uint32(10)}, // args: []any{"i1", 123.4, 125.4, uint32(10)},
@@ -577,13 +579,13 @@ func Test_writeFilter(t *testing.T) {
args: args{ args: args{
filter: eventstore.NewFilter( filter: eventstore.NewFilter(
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.PositionGreater(123.4, 12), eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
), ),
), ),
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order", query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order",
args: []any{"i1", 123.4, uint32(12), 123.4}, args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4)},
}, },
}, },
{ {
@@ -593,13 +595,13 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.Limit(10), eventstore.Limit(10),
eventstore.Offset(3), eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12), eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
), ),
), ),
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order LIMIT $5 OFFSET $6", query: " WHERE instance_id = $1 AND ((position = $2 AND in_tx_order > $3) OR position > $4) ORDER BY position, in_tx_order LIMIT $5 OFFSET $6",
args: []any{"i1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, args: []any{"i1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
}, },
}, },
{ {
@@ -609,14 +611,14 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.Limit(10), eventstore.Limit(10),
eventstore.Offset(3), eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12), eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
), ),
eventstore.AppendAggregateFilter("user"), eventstore.AppendAggregateFilter("user"),
), ),
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND aggregate_type = $2 AND ((position = $3 AND in_tx_order > $4) OR position > $5) ORDER BY position, in_tx_order LIMIT $6 OFFSET $7", query: " WHERE instance_id = $1 AND aggregate_type = $2 AND ((position = $3 AND in_tx_order > $4) OR position > $5) ORDER BY position, in_tx_order LIMIT $6 OFFSET $7",
args: []any{"i1", "user", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, args: []any{"i1", "user", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
}, },
}, },
{ {
@@ -626,7 +628,7 @@ func Test_writeFilter(t *testing.T) {
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.Limit(10), eventstore.Limit(10),
eventstore.Offset(3), eventstore.Offset(3),
eventstore.PositionGreater(123.4, 12), eventstore.PositionGreater(decimal.NewFromFloat(123.4), 12),
), ),
eventstore.AppendAggregateFilter("user"), eventstore.AppendAggregateFilter("user"),
eventstore.AppendAggregateFilter( eventstore.AppendAggregateFilter(
@@ -637,7 +639,7 @@ func Test_writeFilter(t *testing.T) {
}, },
want: wantQuery{ want: wantQuery{
query: " WHERE instance_id = $1 AND (aggregate_type = $2 OR (aggregate_type = $3 AND aggregate_id = $4)) AND ((position = $5 AND in_tx_order > $6) OR position > $7) ORDER BY position, in_tx_order LIMIT $8 OFFSET $9", query: " WHERE instance_id = $1 AND (aggregate_type = $2 OR (aggregate_type = $3 AND aggregate_id = $4)) AND ((position = $5 AND in_tx_order > $6) OR position > $7) ORDER BY position, in_tx_order LIMIT $8 OFFSET $9",
args: []any{"i1", "user", "org", "o1", 123.4, uint32(12), 123.4, uint32(10), uint32(3)}, args: []any{"i1", "user", "org", "o1", decimal.NewFromFloat(123.4), uint32(12), decimal.NewFromFloat(123.4), uint32(10), uint32(3)},
}, },
}, },
} }
@@ -956,7 +958,7 @@ func Test_writeQueryUse_examples(t *testing.T) {
), ),
eventstore.FilterPagination( eventstore.FilterPagination(
// used because we need to check for first login and an app which is not console // used because we need to check for first login and an app which is not console
eventstore.PositionGreater(12, 4), eventstore.PositionGreater(decimal.NewFromInt(12), 4),
), ),
), ),
eventstore.NewFilter( eventstore.NewFilter(
@@ -1065,9 +1067,9 @@ func Test_writeQueryUse_examples(t *testing.T) {
"instance", "instance",
"user", "user",
"user.token.added", "user.token.added",
float64(12), decimal.NewFromInt(12),
uint32(4), uint32(4),
float64(12), decimal.NewFromInt(12),
"instance", "instance",
"instance", "instance",
[]string{"instance.idp.config.added", "instance.idp.oauth.added", "instance.idp.oidc.added", "instance.idp.jwt.added", "instance.idp.azure.added", "instance.idp.github.added", "instance.idp.github.enterprise.added", "instance.idp.gitlab.added", "instance.idp.gitlab.selfhosted.added", "instance.idp.google.added", "instance.idp.ldap.added", "instance.idp.config.apple.added", "instance.idp.saml.added"}, []string{"instance.idp.config.added", "instance.idp.oauth.added", "instance.idp.oidc.added", "instance.idp.jwt.added", "instance.idp.azure.added", "instance.idp.github.added", "instance.idp.github.enterprise.added", "instance.idp.gitlab.added", "instance.idp.gitlab.selfhosted.added", "instance.idp.google.added", "instance.idp.ldap.added", "instance.idp.config.apple.added", "instance.idp.saml.added"},
@@ -1201,7 +1203,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(23), uint32(23),
float64(123), decimal.NewFromInt(123).String(),
uint32(0), uint32(0),
nil, nil,
"gigi", "gigi",
@@ -1235,7 +1237,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(23), uint32(23),
float64(123), decimal.NewFromInt(123).String(),
uint32(0), uint32(0),
[]byte(`{"name": "gigi"}`), []byte(`{"name": "gigi"}`),
"gigi", "gigi",
@@ -1269,7 +1271,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(23), uint32(23),
float64(123), decimal.NewFromInt(123).String(),
uint32(0), uint32(0),
nil, nil,
"gigi", "gigi",
@@ -1283,7 +1285,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(24), uint32(24),
float64(124), decimal.NewFromInt(124).String(),
uint32(0), uint32(0),
[]byte(`{"name": "gigi"}`), []byte(`{"name": "gigi"}`),
"gigi", "gigi",
@@ -1317,7 +1319,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(23), uint32(23),
float64(123), decimal.NewFromInt(123).String(),
uint32(0), uint32(0),
nil, nil,
"gigi", "gigi",
@@ -1331,7 +1333,7 @@ func Test_executeQuery(t *testing.T) {
time.Now(), time.Now(),
"event.type", "event.type",
uint32(24), uint32(24),
float64(124), decimal.NewFromInt(124).String(),
uint32(0), uint32(0),
[]byte(`{"name": "gigi"}`), []byte(`{"name": "gigi"}`),
"gigi", "gigi",

View File

@@ -7,6 +7,8 @@ import (
"slices" "slices"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/database"
) )
@@ -723,7 +725,7 @@ func (pc *PositionCondition) Min() *GlobalPosition {
// PositionGreater prepares the condition as follows // PositionGreater prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position > // if inPositionOrder is NOT set: position >
func PositionGreater(position float64, inPositionOrder uint32) paginationOpt { func PositionGreater(position decimal.Decimal, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) { return func(p *Pagination) {
p.ensurePosition() p.ensurePosition()
p.position.min = &GlobalPosition{ p.position.min = &GlobalPosition{
@@ -743,7 +745,7 @@ func GlobalPositionGreater(position *GlobalPosition) paginationOpt {
// PositionLess prepares the condition as follows // PositionLess prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position > // if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position > // if inPositionOrder is NOT set: position >
func PositionLess(position float64, inPositionOrder uint32) paginationOpt { func PositionLess(position decimal.Decimal, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) { return func(p *Pagination) {
p.ensurePosition() p.ensurePosition()
p.position.max = &GlobalPosition{ p.position.max = &GlobalPosition{

View File

@@ -6,6 +6,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/database" "github.com/zitadel/zitadel/internal/v2/database"
) )
@@ -74,13 +76,13 @@ func TestPaginationOpt(t *testing.T) {
name: "global position greater", name: "global position greater",
args: args{ args: args{
opts: []paginationOpt{ opts: []paginationOpt{
GlobalPositionGreater(&GlobalPosition{Position: 10}), GlobalPositionGreater(&GlobalPosition{Position: decimal.NewFromInt(10)}),
}, },
}, },
want: &Pagination{ want: &Pagination{
position: &PositionCondition{ position: &PositionCondition{
min: &GlobalPosition{ min: &GlobalPosition{
Position: 10, Position: decimal.NewFromInt(10),
InPositionOrder: 0, InPositionOrder: 0,
}, },
}, },
@@ -90,13 +92,13 @@ func TestPaginationOpt(t *testing.T) {
name: "position greater", name: "position greater",
args: args{ args: args{
opts: []paginationOpt{ opts: []paginationOpt{
PositionGreater(10, 0), PositionGreater(decimal.NewFromInt(10), 0),
}, },
}, },
want: &Pagination{ want: &Pagination{
position: &PositionCondition{ position: &PositionCondition{
min: &GlobalPosition{ min: &GlobalPosition{
Position: 10, Position: decimal.NewFromInt(10),
InPositionOrder: 0, InPositionOrder: 0,
}, },
}, },
@@ -107,13 +109,13 @@ func TestPaginationOpt(t *testing.T) {
name: "position less", name: "position less",
args: args{ args: args{
opts: []paginationOpt{ opts: []paginationOpt{
PositionLess(10, 12), PositionLess(decimal.NewFromInt(10), 12),
}, },
}, },
want: &Pagination{ want: &Pagination{
position: &PositionCondition{ position: &PositionCondition{
max: &GlobalPosition{ max: &GlobalPosition{
Position: 10, Position: decimal.NewFromInt(10),
InPositionOrder: 12, InPositionOrder: 12,
}, },
}, },
@@ -123,13 +125,13 @@ func TestPaginationOpt(t *testing.T) {
name: "global position less", name: "global position less",
args: args{ args: args{
opts: []paginationOpt{ opts: []paginationOpt{
GlobalPositionLess(&GlobalPosition{Position: 12, InPositionOrder: 24}), GlobalPositionLess(&GlobalPosition{Position: decimal.NewFromInt(12), InPositionOrder: 24}),
}, },
}, },
want: &Pagination{ want: &Pagination{
position: &PositionCondition{ position: &PositionCondition{
max: &GlobalPosition{ max: &GlobalPosition{
Position: 12, Position: decimal.NewFromInt(12),
InPositionOrder: 24, InPositionOrder: 24,
}, },
}, },
@@ -140,19 +142,19 @@ func TestPaginationOpt(t *testing.T) {
args: args{ args: args{
opts: []paginationOpt{ opts: []paginationOpt{
PositionBetween( PositionBetween(
&GlobalPosition{10, 12}, &GlobalPosition{decimal.NewFromInt(10), 12},
&GlobalPosition{20, 0}, &GlobalPosition{decimal.NewFromInt(20), 0},
), ),
}, },
}, },
want: &Pagination{ want: &Pagination{
position: &PositionCondition{ position: &PositionCondition{
min: &GlobalPosition{ min: &GlobalPosition{
Position: 10, Position: decimal.NewFromInt(10),
InPositionOrder: 12, InPositionOrder: 12,
}, },
max: &GlobalPosition{ max: &GlobalPosition{
Position: 20, Position: decimal.NewFromInt(20),
InPositionOrder: 0, InPositionOrder: 0,
}, },
}, },

View File

@@ -1,6 +1,8 @@
package readmodel package readmodel
import ( import (
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/system" "github.com/zitadel/zitadel/internal/v2/system"
"github.com/zitadel/zitadel/internal/v2/system/mirror" "github.com/zitadel/zitadel/internal/v2/system/mirror"
@@ -8,7 +10,7 @@ import (
type LastSuccessfulMirror struct { type LastSuccessfulMirror struct {
ID string ID string
Position float64 Position decimal.Decimal
source string source string
} }
@@ -34,6 +36,7 @@ func (p *LastSuccessfulMirror) Filter() *eventstore.Filter {
), ),
eventstore.FilterPagination( eventstore.FilterPagination(
eventstore.Descending(), eventstore.Descending(),
eventstore.Limit(1),
), ),
) )
} }
@@ -53,7 +56,7 @@ func (h *LastSuccessfulMirror) Reduce(events ...*eventstore.StorageEvent) (err e
func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error { func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error {
// if position is set we skip all older events // if position is set we skip all older events
if h.Position > 0 { if h.Position.GreaterThan(decimal.NewFromInt(0)) {
return nil return nil
} }

View File

@@ -1,6 +1,8 @@
package mirror package mirror
import ( import (
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/v2/eventstore" "github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors" "github.com/zitadel/zitadel/internal/zerrors"
) )
@@ -9,7 +11,7 @@ type succeededPayload struct {
// Source is the name of the database data are mirrored from // Source is the name of the database data are mirrored from
Source string `json:"source"` Source string `json:"source"`
// Position until data will be mirrored // Position until data will be mirrored
Position float64 `json:"position"` Position decimal.Decimal `json:"position"`
} }
const SucceededType = eventTypePrefix + "succeeded" const SucceededType = eventTypePrefix + "succeeded"
@@ -38,7 +40,7 @@ func SucceededEventFromStorage(event *eventstore.StorageEvent) (e *SucceededEven
}, nil }, nil
} }
func NewSucceededCommand(source string, position float64) *eventstore.Command { func NewSucceededCommand(source string, position decimal.Decimal) *eventstore.Command {
return &eventstore.Command{ return &eventstore.Command{
Action: eventstore.Action[any]{ Action: eventstore.Action[any]{
Creator: Creator, Creator: Creator,