fix(eventstore): Make Eventstore Compatible with Relational Table Package (#10687)

Improves compatibility of eventstore and related database components
with the new relational table package.

## Which problems are solved

1. **Incompatible Database Interfaces**: The existing eventstore was
tightly coupled to the database package, which is incompatible with the
new, more abstract relational table package in v3. This prevented the
new command-side logic from pushing events to the legacy eventstore.
2. **Missing Health Checks**: The database interfaces in the new package
lacked a Ping method, making it impossible to perform health checks on
database connections.
3. **Event Publishing Logic**: The command handling logic in domain
needed a way to collect and push events to the legacy eventstore after a
command was successfully executed.

## How the problems are solved

1. **`LegacyEventstore` Interface**:
* A new `LegacyEventstore` interface is introduced in the new
`database/eventstore` . This interface exposes a `PushWithNewClient`
method that accepts the new `database.QueryExecutor` interface,
decoupling the v3 domain from the legacy implementation.
* The `internal/eventstore.Eventstore` now implements this interface. A
wrapper, PushWithClient, is added to convert the old database client
types (`*sql.DB`, `*sql.Tx`) into the new `QueryExecutor` types before
calling `PushWithNewClient`.
2. **Database Interface Updates**:
* The `database.Pool` and `database.Client` interfaces in
`storage/eventstore` have been updated to include a Ping method,
allowing for consistent health checks across different database
dialects.
* The `postgres` and `sql` dialect implementations have been updated to
support this new method.
3. **Command and Invoker Refactoring**:
* The `Commander` interface in domain now includes an `Events()
[]legacy_es.Command` method. This allows commands to declare which
events they will generate.
* The `eventCollector` in the invoker logic has been redesigned. It now
ensures a database transaction is started before executing a command.
After successful execution, it calls the `Events()` method on the
command to collect the generated events and appends them to a list.
* The `eventStoreInvoker` then pushes all collected events to the legacy
eventstore using the new `LegacyEventstore` interface, ensuring that
events are only pushed if the entire command (and any sub-commands)
executes successfully within the transaction.
4. **Testing**:
* New unit tests have been added for the invoker to verify that events
are correctly collected from single commands, batched commands, and
nested commands.

These changes create a clean bridge between the new v3 command-side
logic and the existing v1 eventstore, allowing for incremental adoption
of the new architecture while maintaining full functionality.

## Additional Information

closes https://github.com/zitadel/zitadel/issues/10442
This commit is contained in:
Silvan
2025-09-16 18:58:49 +02:00
committed by GitHub
parent 41d04ffa65
commit 22ef817d5c
28 changed files with 705 additions and 644 deletions

View File

@@ -17,7 +17,6 @@ on:
env:
cache_path: |
backend
internal/statik/statik.go
internal/notification/statik/statik.go
internal/api/ui/login/static/resources/themes/zitadel/css/zitadel.css*
@@ -44,7 +43,7 @@ jobs:
continue-on-error: true
id: cache
with:
key: core-${{ hashFiles( 'go.*', 'openapi', 'cmd', 'pkg/grpc/**/*.go', 'proto', 'internal', 'backend/**') }}
key: core-${{ hashFiles( 'go.*', 'openapi', 'cmd', 'pkg/grpc/**/*.go', 'proto', 'internal', 'backend') }}
restore-keys: |
core-
path: ${{ env.cache_path }}

View File

@@ -1,131 +1,146 @@
package domain
// import (
// "context"
// "fmt"
import (
"context"
"fmt"
// "github.com/zitadel/zitadel/backend/v3/storage/database"
// )
"github.com/zitadel/zitadel/backend/v3/storage/database"
legacy_es "github.com/zitadel/zitadel/internal/eventstore"
)
// // Commander is the all it needs to implement the command pattern.
// // It is the interface all manipulations need to implement.
// // If possible it should also be used for queries. We will find out if this is possible in the future.
// type Commander interface {
// Execute(ctx context.Context, opts *CommandOpts) (err error)
// fmt.Stringer
// }
// Commander is all that is needed to implement the command pattern.
// It is the interface all manipulations need to implement.
// If possible it should also be used for queries. We will find out if this is possible in the future.
type Commander interface {
Execute(ctx context.Context, opts *CommandOpts) (err error)
// Events returns the events that should be pushed to the event store after the command is executed.
// If the command does not produce events, it should return nil or an empty slice.
Events(ctx context.Context) []legacy_es.Command
fmt.Stringer
}
// // Invoker is part of the command pattern.
// // It is the interface that is used to execute commands.
// type Invoker interface {
// Invoke(ctx context.Context, command Commander, opts *CommandOpts) error
// }
// Invoker is part of the command pattern.
// It is the interface that is used to execute commands.
type Invoker interface {
Invoke(ctx context.Context, command Commander, opts *CommandOpts) error
}
// // CommandOpts are passed to each command
// // the provide common fields used by commands like the database client.
// type CommandOpts struct {
// DB database.QueryExecutor
// Invoker Invoker
// }
// CommandOpts are passed to each command
// it provides common fields used by commands like the database client.
type CommandOpts struct {
DB database.QueryExecutor
Invoker Invoker
}
// type ensureTxOpts struct {
// *database.TransactionOptions
// }
type ensureTxOpts struct {
*database.TransactionOptions
}
// type EnsureTransactionOpt func(*ensureTxOpts)
type EnsureTransactionOpt func(*ensureTxOpts)
// // EnsureTx ensures that the DB is a transaction. If it is not, it will start a new transaction.
// // The returned close function will end the transaction. If the DB is already a transaction, the close function
// // will do nothing because another [Commander] is already responsible for ending the transaction.
// func (o *CommandOpts) EnsureTx(ctx context.Context, opts ...EnsureTransactionOpt) (close func(context.Context, error) error, err error) {
// beginner, ok := o.DB.(database.Beginner)
// if !ok {
// // db is already a transaction
// return func(_ context.Context, err error) error {
// return err
// }, nil
// }
// EnsureTx ensures that the DB is a transaction. If it is not, it will start a new transaction.
// The returned close function will end the transaction. If the DB is already a transaction, the close function
// will do nothing because another [Commander] is already responsible for ending the transaction.
func (o *CommandOpts) EnsureTx(ctx context.Context, opts ...EnsureTransactionOpt) (close func(context.Context, error) error, err error) {
beginner, ok := o.DB.(database.Beginner)
if !ok {
// db is already a transaction
return func(_ context.Context, err error) error {
return err
}, nil
}
// txOpts := &ensureTxOpts{
// TransactionOptions: new(database.TransactionOptions),
// }
// for _, opt := range opts {
// opt(txOpts)
// }
txOpts := &ensureTxOpts{
TransactionOptions: new(database.TransactionOptions),
}
for _, opt := range opts {
opt(txOpts)
}
// tx, err := beginner.Begin(ctx, txOpts.TransactionOptions)
// if err != nil {
// return nil, err
// }
// o.DB = tx
tx, err := beginner.Begin(ctx, txOpts.TransactionOptions)
if err != nil {
return nil, err
}
o.DB = tx
// return func(ctx context.Context, err error) error {
// return tx.End(ctx, err)
// }, nil
// }
return func(ctx context.Context, err error) error {
return tx.End(ctx, err)
}, nil
}
// // EnsureClient ensures that the o.DB is a client. If it is not, it will get a new client from the [database.Pool].
// // The returned close function will release the client. If the o.DB is already a client or transaction, the close function
// // will do nothing because another [Commander] is already responsible for releasing the client.
// func (o *CommandOpts) EnsureClient(ctx context.Context) (close func(_ context.Context) error, err error) {
// pool, ok := o.DB.(database.Pool)
// if !ok {
// // o.DB is already a client
// return func(_ context.Context) error {
// return nil
// }, nil
// }
// client, err := pool.Acquire(ctx)
// if err != nil {
// return nil, err
// }
// o.DB = client
// return func(ctx context.Context) error {
// return client.Release(ctx)
// }, nil
// }
// EnsureClient ensures that the o.DB is a client. If it is not, it will get a new client from the [database.Pool].
// The returned close function will release the client. If the o.DB is already a client or transaction, the close function
// will do nothing because another [Commander] is already responsible for releasing the client.
func (o *CommandOpts) EnsureClient(ctx context.Context) (close func(_ context.Context) error, err error) {
pool, ok := o.DB.(database.Pool)
if !ok {
// o.DB is already a client
return func(_ context.Context) error {
return nil
}, nil
}
client, err := pool.Acquire(ctx)
if err != nil {
return nil, err
}
o.DB = client
return func(ctx context.Context) error {
return client.Release(ctx)
}, nil
}
// func (o *CommandOpts) Invoke(ctx context.Context, command Commander) error {
// if o.Invoker == nil {
// return command.Execute(ctx, o)
// }
// return o.Invoker.Invoke(ctx, command, o)
// }
func (o *CommandOpts) Invoke(ctx context.Context, command Commander) error {
if o.Invoker == nil {
return command.Execute(ctx, o)
}
return o.Invoker.Invoke(ctx, command, o)
}
// func DefaultOpts(invoker Invoker) *CommandOpts {
// if invoker == nil {
// invoker = &noopInvoker{}
// }
// return &CommandOpts{
// DB: pool,
// Invoker: invoker,
// }
// }
func DefaultOpts(invoker Invoker) *CommandOpts {
if invoker == nil {
invoker = &noopInvoker{}
}
return &CommandOpts{
DB: pool,
Invoker: invoker,
}
}
// // commandBatch is a batch of commands.
// // It uses the [Invoker] provided by the opts to execute each command.
// type commandBatch struct {
// Commands []Commander
// }
// commandBatch is a batch of commands.
// It uses the [Invoker] provided by the opts to execute each command.
type commandBatch struct {
Commands []Commander
}
// func BatchCommands(cmds ...Commander) *commandBatch {
// return &commandBatch{
// Commands: cmds,
// }
// }
// Events implements Commander.
func (cmd *commandBatch) Events(ctx context.Context) []legacy_es.Command {
commands := make([]legacy_es.Command, 0, len(cmd.Commands))
for _, c := range cmd.Commands {
if e := c.Events(ctx); len(e) > 0 {
commands = append(commands, e...)
}
}
return commands
}
// // String implements [Commander].
// func (cmd *commandBatch) String() string {
// return "commandBatch"
// }
func BatchCommands(cmds ...Commander) *commandBatch {
return &commandBatch{
Commands: cmds,
}
}
// func (b *commandBatch) Execute(ctx context.Context, opts *CommandOpts) (err error) {
// for _, cmd := range b.Commands {
// if err = opts.Invoke(ctx, cmd); err != nil {
// return err
// }
// }
// return nil
// }
// String implements [Commander].
func (cmd *commandBatch) String() string {
return "commandBatch"
}
// var _ Commander = (*commandBatch)(nil)
func (b *commandBatch) Execute(ctx context.Context, opts *CommandOpts) (err error) {
for _, cmd := range b.Commands {
if err = opts.Invoke(ctx, cmd); err != nil {
return err
}
}
return nil
}
var _ Commander = (*commandBatch)(nil)

View File

@@ -0,0 +1,68 @@
package domain
import (
"log/slog"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/eventstore"
"github.com/zitadel/zitadel/backend/v3/telemetry/logging"
"github.com/zitadel/zitadel/backend/v3/telemetry/tracing"
)
// The variables could also be moved to a struct.
// I just started with the singleton pattern and kept it like this.
var (
pool database.Pool
// userCodeAlgorithm crypto.EncryptionAlgorithm
tracer tracing.Tracer
logger logging.Logger = *logging.NewLogger(slog.Default())
legacyEventstore eventstore.LegacyEventstore
// userRepo func(database.QueryExecutor) UserRepository
// instanceRepo func(database.QueryExecutor) InstanceRepository
// cryptoRepo func(database.QueryExecutor) CryptoRepository
// orgRepo func(database.QueryExecutor) OrgRepository
// instanceCache cache.Cache[instanceCacheIndex, string, *Instance]
// orgCache cache.Cache[orgCacheIndex, string, *Org]
// generateID func() (string, error) = func() (string, error) {
// return strconv.FormatUint(rand.Uint64(), 10), nil
// }
)
func SetPool(p database.Pool) {
pool = p
}
// func SetUserCodeAlgorithm(algorithm crypto.EncryptionAlgorithm) {
// userCodeAlgorithm = algorithm
// }
func SetTracer(t tracing.Tracer) {
tracer = t
}
func SetLogger(l logging.Logger) {
logger = l
}
func SetLegacyEventstore(es eventstore.LegacyEventstore) {
legacyEventstore = es
}
// func SetUserRepository(repo func(database.QueryExecutor) UserRepository) {
// userRepo = repo
// }
// func SetOrgRepository(repo func(database.QueryExecutor) OrgRepository) {
// orgRepo = repo
// }
// func SetInstanceRepository(repo func(database.QueryExecutor) InstanceRepository) {
// instanceRepo = repo
// }
// func SetCryptoRepository(repo func(database.QueryExecutor) CryptoRepository) {
// cryptoRepo = repo
// }

View File

@@ -60,68 +60,3 @@ type domainChanges interface {
// This is used for reducing events.
SetUpdatedAt(t time.Time) database.Change
}
// import (
// "math/rand/v2"
// "strconv"
// "github.com/zitadel/zitadel/backend/v3/storage/cache"
// "github.com/zitadel/zitadel/backend/v3/storage/database"
// // "github.com/zitadel/zitadel/backend/v3/telemetry/logging"
// "github.com/zitadel/zitadel/backend/v3/telemetry/tracing"
// "github.com/zitadel/zitadel/internal/crypto"
// )
// // The variables could also be moved to a struct.
// // I just started with the singleton pattern and kept it like this.
// var (
// pool database.Pool
// userCodeAlgorithm crypto.EncryptionAlgorithm
// tracer tracing.Tracer
// // logger logging.Logger
// userRepo func(database.QueryExecutor) UserRepository
// // instanceRepo func(database.QueryExecutor) InstanceRepository
// cryptoRepo func(database.QueryExecutor) CryptoRepository
// orgRepo func(database.QueryExecutor) OrgRepository
// // instanceCache cache.Cache[instanceCacheIndex, string, *Instance]
// orgCache cache.Cache[orgCacheIndex, string, *Org]
// generateID func() (string, error) = func() (string, error) {
// return strconv.FormatUint(rand.Uint64(), 10), nil
// }
// )
// func SetPool(p database.Pool) {
// pool = p
// }
// func SetUserCodeAlgorithm(algorithm crypto.EncryptionAlgorithm) {
// userCodeAlgorithm = algorithm
// }
// func SetTracer(t tracing.Tracer) {
// tracer = t
// }
// // func SetLogger(l logging.Logger) {
// // logger = l
// // }
// func SetUserRepository(repo func(database.QueryExecutor) UserRepository) {
// userRepo = repo
// }
// func SetOrgRepository(repo func(database.QueryExecutor) OrgRepository) {
// orgRepo = repo
// }
// // func SetInstanceRepository(repo func(database.QueryExecutor) InstanceRepository) {
// // instanceRepo = repo
// // }
// func SetCryptoRepository(repo func(database.QueryExecutor) CryptoRepository) {
// cryptoRepo = repo
// }

View File

@@ -1,142 +1,143 @@
package domain
// import (
// "context"
// "fmt"
import (
"context"
"fmt"
// "github.com/zitadel/zitadel/backend/v3/storage/eventstore"
// )
"github.com/zitadel/zitadel/backend/v3/storage/eventstore"
legacy_es "github.com/zitadel/zitadel/internal/eventstore"
)
// // Invoke provides a way to execute commands within the domain package.
// // It uses a chain of responsibility pattern to handle the command execution.
// // The default chain includes logging, tracing, and event publishing.
// // If you want to invoke multiple commands in a single transaction, you can use the [commandBatch].
// func Invoke(ctx context.Context, cmd Commander) error {
// invoker := newEventStoreInvoker(newLoggingInvoker(newTraceInvoker(nil)))
// opts := &CommandOpts{
// Invoker: invoker.collector,
// DB: pool,
// }
// return invoker.Invoke(ctx, cmd, opts)
// }
// Invoke provides a way to execute commands within the domain package.
// It uses a chain of responsibility pattern to handle the command execution.
// The default chain includes logging, tracing, and event publishing.
// If you want to invoke multiple commands in a single transaction, you can use the [commandBatch].
func Invoke(ctx context.Context, cmd Commander) error {
invoker := newEventStoreInvoker(newLoggingInvoker(newTraceInvoker(nil)))
opts := &CommandOpts{
Invoker: invoker.collector,
DB: pool,
}
return invoker.Invoke(ctx, cmd, opts)
}
// // eventStoreInvoker checks if the command implements the [eventer] interface.
// // If it does, it collects the events and publishes them to the event store.
// type eventStoreInvoker struct {
// collector *eventCollector
// }
// eventStoreInvoker checks if the [Commander].Events function returns any events.
// If it does, it collects the events and publishes them to the event store.
type eventStoreInvoker struct {
collector *eventCollector
}
// func newEventStoreInvoker(next Invoker) *eventStoreInvoker {
// return &eventStoreInvoker{collector: &eventCollector{next: next}}
// }
func newEventStoreInvoker(next Invoker) *eventStoreInvoker {
return &eventStoreInvoker{collector: &eventCollector{next: next}}
}
// func (i *eventStoreInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
// err = i.collector.Invoke(ctx, command, opts)
// if err != nil {
// return err
// }
// if len(i.collector.events) > 0 {
// err = eventstore.Publish(ctx, i.collector.events, opts.DB)
// if err != nil {
// return err
// }
// }
// return nil
// }
func (i *eventStoreInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
err = i.collector.Invoke(ctx, command, opts)
if err != nil {
return err
}
if len(i.collector.events) > 0 {
err = eventstore.Publish(ctx, legacyEventstore, opts.DB, i.collector.events...)
if err != nil {
return err
}
}
return nil
}
// // eventCollector collects events from all commands. The [eventStoreInvoker] pushes the collected events after all commands are executed.
// type eventCollector struct {
// next Invoker
// events []*eventstore.Event
// }
// eventCollector collects events from all commands. The [eventStoreInvoker] pushes the collected events after all commands are executed.
// The events are collected after the command got executed, the collector ensures that the command is executed in the same transaction as writing the events.
type eventCollector struct {
next Invoker
events []legacy_es.Command
}
// type eventer interface {
// Events() []*eventstore.Event
// }
func (i *eventCollector) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
close, err := opts.EnsureTx(ctx)
if err != nil {
return err
}
defer func() { err = close(ctx, err) }()
// func (i *eventCollector) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
// if e, ok := command.(eventer); ok && len(e.Events()) > 0 {
// // we need to ensure all commands are executed in the same transaction
// close, err := opts.EnsureTx(ctx)
// if err != nil {
// return err
// }
// defer func() { err = close(ctx, err) }()
if i.next != nil {
err = i.next.Invoke(ctx, command, opts)
} else {
err = command.Execute(ctx, opts)
}
if err != nil {
return err
}
i.events = append(command.Events(ctx), i.events...)
// i.events = append(i.events, e.Events()...)
// }
// if i.next != nil {
// return i.next.Invoke(ctx, command, opts)
// }
// return command.Execute(ctx, opts)
// }
return
}
// // traceInvoker decorates each command with tracing.
// type traceInvoker struct {
// next Invoker
// }
// traceInvoker decorates each command with tracing.
type traceInvoker struct {
next Invoker
}
// func newTraceInvoker(next Invoker) *traceInvoker {
// return &traceInvoker{next: next}
// }
func newTraceInvoker(next Invoker) *traceInvoker {
return &traceInvoker{next: next}
}
// func (i *traceInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
// ctx, span := tracer.Start(ctx, fmt.Sprintf("%T", command))
// defer func() {
// if err != nil {
// span.RecordError(err)
// }
// span.End()
// }()
func (i *traceInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
ctx, span := tracer.Start(ctx, fmt.Sprintf("%T", command))
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
// if i.next != nil {
// return i.next.Invoke(ctx, command, opts)
// }
// return command.Execute(ctx, opts)
// }
if i.next != nil {
return i.next.Invoke(ctx, command, opts)
}
return command.Execute(ctx, opts)
}
// // loggingInvoker decorates each command with logging.
// // It is an example implementation and logs the command name at the beginning and success or failure after the command got executed.
// type loggingInvoker struct {
// next Invoker
// }
// loggingInvoker decorates each command with logging.
// It is an example implementation and logs the command name at the beginning and success or failure after the command got executed.
type loggingInvoker struct {
next Invoker
}
// func newLoggingInvoker(next Invoker) *loggingInvoker {
// return &loggingInvoker{next: next}
// }
func newLoggingInvoker(next Invoker) *loggingInvoker {
return &loggingInvoker{next: next}
}
// func (i *loggingInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
// logger.InfoContext(ctx, "Invoking command", "command", command.String())
func (i *loggingInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) (err error) {
logger.InfoContext(ctx, "Invoking command", "command", command.String())
// if i.next != nil {
// err = i.next.Invoke(ctx, command, opts)
// } else {
// err = command.Execute(ctx, opts)
// }
if i.next != nil {
err = i.next.Invoke(ctx, command, opts)
} else {
err = command.Execute(ctx, opts)
}
// if err != nil {
// logger.ErrorContext(ctx, "Command invocation failed", "command", command.String(), "error", err)
// return err
// }
// logger.InfoContext(ctx, "Command invocation succeeded", "command", command.String())
// return nil
// }
if err != nil {
logger.ErrorContext(ctx, "Command invocation failed", "command", command.String(), "error", err)
return err
}
logger.InfoContext(ctx, "Command invocation succeeded", "command", command.String())
return nil
}
// type noopInvoker struct {
// next Invoker
// }
type noopInvoker struct {
next Invoker
}
// func (i *noopInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) error {
// if i.next != nil {
// return i.next.Invoke(ctx, command, opts)
// }
// return command.Execute(ctx, opts)
// }
func (i *noopInvoker) Invoke(ctx context.Context, command Commander, opts *CommandOpts) error {
if i.next != nil {
return i.next.Invoke(ctx, command, opts)
}
return command.Execute(ctx, opts)
}
// // cacheInvoker could be used in the future to do the caching.
// // My goal would be to have two interfaces:
// // - cacheSetter: which caches an object
// // - cacheGetter: which gets an object from the cache, this should also skip the command execution
// cacheInvoker could be used in the future to do the caching.
// My goal would be to have two interfaces:
// - cacheSetter: which caches an object
// - cacheGetter: which gets an object from the cache, this should also skip the command execution
// type cacheInvoker struct {
// next Invoker
// }

View File

@@ -0,0 +1,180 @@
package domain
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/dbmock"
"github.com/zitadel/zitadel/backend/v3/storage/eventstore"
legacy_es "github.com/zitadel/zitadel/internal/eventstore"
)
type testLegacyEventstore struct{}
func (es *testLegacyEventstore) PushWithNewClient(ctx context.Context, client database.QueryExecutor, commands ...legacy_es.Command) ([]legacy_es.Event, error) {
return nil, nil
}
var _ eventstore.LegacyEventstore = (*testLegacyEventstore)(nil)
type invokeTestCommand struct {
events []legacy_es.Command
execute func(ctx context.Context, opts *CommandOpts) error
}
// Events implements Commander.
func (i *invokeTestCommand) Events(ctx context.Context) []legacy_es.Command {
return i.events
}
// Execute implements Commander.
func (i *invokeTestCommand) Execute(ctx context.Context, opts *CommandOpts) (err error) {
if i.execute == nil {
return nil
}
return i.execute(ctx, opts)
}
// String implements Commander.
func (i *invokeTestCommand) String() string {
return "invokeTestCommand"
}
var _ Commander = (*invokeTestCommand)(nil)
type invokeTestEvent struct {
id string
}
// Aggregate implements [legacy_es.Command].
func (i *invokeTestEvent) Aggregate() *legacy_es.Aggregate {
return &legacy_es.Aggregate{
ID: i.id,
Type: "test",
ResourceOwner: "test",
InstanceID: "test",
Version: legacy_es.Version("v1"),
}
}
// Creator implements [legacy_es.Command].
func (i *invokeTestEvent) Creator() string {
return "test"
}
// Fields implements [legacy_es.Command].
func (i *invokeTestEvent) Fields() []*legacy_es.FieldOperation {
return nil
}
// Payload implements [legacy_es.Command].
func (i *invokeTestEvent) Payload() any {
return nil
}
// Revision implements [legacy_es.Command].
func (i *invokeTestEvent) Revision() uint16 {
return 1
}
// Type implements [legacy_es.Command].
func (i *invokeTestEvent) Type() legacy_es.EventType {
return "test"
}
// UniqueConstraints implements [legacy_es.Command].
func (i *invokeTestEvent) UniqueConstraints() []*legacy_es.UniqueConstraint {
return nil
}
var _ legacy_es.Command = (*invokeTestEvent)(nil)
func Test_eventCollector_Invoke(t *testing.T) {
tests := []struct {
name string
command Commander
expectedErr error
assertCollectedEvents func(t *testing.T, events []legacy_es.Command)
}{
{
name: "simple command with events",
expectedErr: nil,
command: &invokeTestCommand{
events: []legacy_es.Command{&invokeTestEvent{id: "1"}},
},
assertCollectedEvents: func(t *testing.T, events []legacy_es.Command) {
require.Len(t, events, 1)
assert.Equal(t, "1", events[0].Aggregate().ID)
},
},
{
name: "simple command without events",
expectedErr: nil,
command: &invokeTestCommand{},
assertCollectedEvents: func(t *testing.T, events []legacy_es.Command) {
assert.Len(t, events, 0)
},
},
{
name: "command with sub commands",
expectedErr: nil,
command: &invokeTestCommand{
events: []legacy_es.Command{&invokeTestEvent{id: "1"}},
execute: func(ctx context.Context, opts *CommandOpts) error {
return opts.Invoke(ctx, &invokeTestCommand{
events: []legacy_es.Command{&invokeTestEvent{id: "2"}},
})
},
},
assertCollectedEvents: func(t *testing.T, events []legacy_es.Command) {
require.Len(t, events, 2)
assert.Equal(t, "1", events[0].Aggregate().ID)
assert.Equal(t, "2", events[1].Aggregate().ID)
},
},
{
name: "only sub commands with events",
expectedErr: nil,
command: &invokeTestCommand{
execute: func(ctx context.Context, opts *CommandOpts) error {
return opts.Invoke(ctx, &invokeTestCommand{
events: []legacy_es.Command{&invokeTestEvent{id: "2"}},
})
},
},
assertCollectedEvents: func(t *testing.T, events []legacy_es.Command) {
require.Len(t, events, 1)
assert.Equal(t, "2", events[0].Aggregate().ID)
},
},
}
SetLegacyEventstore(new(testLegacyEventstore))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
pool := dbmock.NewMockPool(ctrl)
tx := dbmock.NewMockTransaction(ctrl)
tx.EXPECT().End(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
pool.EXPECT().Begin(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
i := newEventStoreInvoker(nil)
opts := CommandOpts{
Invoker: i,
DB: pool,
}
err := opts.Invoke(t.Context(), tt.command)
require.ErrorIs(t, err, tt.expectedErr)
tt.assertCollectedEvents(t, i.collector.events)
})
}
}

View File

@@ -1,18 +1,19 @@
package domain
// import (
// "context"
import (
"context"
// "github.com/zitadel/zitadel/backend/v3/storage/eventstore"
// )
legacy_es "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/org"
)
// // AddOrgCommand adds a new organization.
// // I'm unsure if we should add the Admins here or if this should be a separate command.
// type AddOrgCommand struct {
// ID string `json:"id"`
// Name string `json:"name"`
// Admins []*AddMemberCommand `json:"admins"`
// }
// AddOrgCommand adds a new organization.
// I'm unsure if we should add the Admins here or if this should be a separate command.
type AddOrgCommand struct {
ID string `json:"id"`
Name string `json:"name"`
// Admins []*AddMemberCommand `json:"admins"`
}
// func NewAddOrgCommand(name string, admins ...*AddMemberCommand) *AddOrgCommand {
// return &AddOrgCommand{
@@ -63,17 +64,12 @@ package domain
// return nil
// }
// // Events implements [eventer].
// func (cmd *AddOrgCommand) Events() []*eventstore.Event {
// return []*eventstore.Event{
// {
// AggregateType: "org",
// AggregateID: cmd.ID,
// Type: "org.added",
// Payload: cmd,
// },
// }
// }
// Events implements [eventer].
func (cmd *AddOrgCommand) Events(ctx context.Context) []legacy_es.Command {
return []legacy_es.Command{
org.NewOrgAddedEvent(ctx, &org.NewAggregate(cmd.ID).Aggregate, cmd.Name),
}
}
// var (
// _ Commander = (*AddOrgCommand)(nil)

View File

@@ -12,6 +12,8 @@ type Pool interface {
Acquire(ctx context.Context) (Client, error)
Close(ctx context.Context) error
Ping(ctx context.Context) error
}
type PoolTest interface {
@@ -27,6 +29,8 @@ type Client interface {
Migrator
Release(ctx context.Context) error
Ping(ctx context.Context) error
}
// Querier is a database client that can execute queries and return rows.

View File

@@ -27,7 +27,7 @@ func (c *pgxConn) Begin(ctx context.Context, opts *database.TransactionOptions)
if err != nil {
return nil, wrapError(err)
}
return &pgxTx{tx}, nil
return &Transaction{tx}, nil
}
// Query implements sql.Client.
@@ -56,6 +56,11 @@ func (c *pgxConn) Exec(ctx context.Context, sql string, args ...any) (int64, err
return res.RowsAffected(), nil
}
// Ping implements [database.Pool].
func (c *pgxConn) Ping(ctx context.Context) error {
return wrapError(c.Conn.Ping(ctx))
}
// Migrate implements [database.Migrator].
func (c *pgxConn) Migrate(ctx context.Context) error {
if isMigrated {

View File

@@ -62,7 +62,7 @@ func (c *pgxPool) Begin(ctx context.Context, opts *database.TransactionOptions)
if err != nil {
return nil, wrapError(err)
}
return &pgxTx{tx}, nil
return &Transaction{tx}, nil
}
// Close implements [database.Pool].
@@ -71,6 +71,11 @@ func (c *pgxPool) Close(_ context.Context) error {
return nil
}
// Ping implements [database.Pool].
func (c *pgxPool) Ping(ctx context.Context) error {
return wrapError(c.Pool.Ping(ctx))
}
// Migrate implements [database.Migrator].
func (c *pgxPool) Migrate(ctx context.Context) error {
if isMigrated {

View File

@@ -9,24 +9,30 @@ import (
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
type pgxTx struct{ pgx.Tx }
type Transaction struct{ pgx.Tx }
var _ database.Transaction = (*pgxTx)(nil)
func PGxTx(tx pgx.Tx) *Transaction {
return &Transaction{
Tx: tx,
}
}
var _ database.Transaction = (*Transaction)(nil)
// Commit implements [database.Transaction].
func (tx *pgxTx) Commit(ctx context.Context) error {
func (tx *Transaction) Commit(ctx context.Context) error {
err := tx.Tx.Commit(ctx)
return wrapError(err)
}
// Rollback implements [database.Transaction].
func (tx *pgxTx) Rollback(ctx context.Context) error {
func (tx *Transaction) Rollback(ctx context.Context) error {
err := tx.Tx.Rollback(ctx)
return wrapError(err)
}
// End implements [database.Transaction].
func (tx *pgxTx) End(ctx context.Context, err error) error {
func (tx *Transaction) End(ctx context.Context, err error) error {
if err != nil {
rollbackErr := tx.Rollback(ctx)
if rollbackErr != nil {
@@ -39,7 +45,7 @@ func (tx *pgxTx) End(ctx context.Context, err error) error {
// Query implements [database.Transaction].
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
func (tx *Transaction) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
rows, err := tx.Tx.Query(ctx, sql, args...)
if err != nil {
return nil, wrapError(err)
@@ -49,13 +55,13 @@ func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.R
// QueryRow implements [database.Transaction].
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
func (tx *Transaction) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return &Row{tx.Tx.QueryRow(ctx, sql, args...)}
}
// Exec implements [database.Transaction].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
func (tx *Transaction) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
res, err := tx.Tx.Exec(ctx, sql, args...)
if err != nil {
return 0, wrapError(err)
@@ -65,12 +71,12 @@ func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) (int64, erro
// Begin implements [database.Transaction].
// As postgres does not support nested transactions we use savepoints to emulate them.
func (tx *pgxTx) Begin(ctx context.Context) (database.Transaction, error) {
func (tx *Transaction) Begin(ctx context.Context) (database.Transaction, error) {
savepoint, err := tx.Tx.Begin(ctx)
if err != nil {
return nil, wrapError(err)
}
return &pgxTx{savepoint}, nil
return &Transaction{savepoint}, nil
}
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {

View File

@@ -11,6 +11,10 @@ type sqlConn struct {
*sql.Conn
}
func SQLConn(conn *sql.Conn) database.Client {
return &sqlConn{Conn: conn}
}
var _ database.Client = (*sqlConn)(nil)
// Release implements [database.Client].
@@ -24,7 +28,7 @@ func (c *sqlConn) Begin(ctx context.Context, opts *database.TransactionOptions)
if err != nil {
return nil, wrapError(err)
}
return &sqlTx{tx}, nil
return &Transaction{tx}, nil
}
// Query implements sql.Client.
@@ -54,6 +58,11 @@ func (c *sqlConn) Exec(ctx context.Context, sql string, args ...any) (int64, err
return res.RowsAffected()
}
// Ping implements [database.Pool].
func (c *sqlConn) Ping(ctx context.Context) error {
return wrapError(c.PingContext(ctx))
}
// Migrate implements [database.Migrator].
func (c *sqlConn) Migrate(ctx context.Context) error {
return ErrMigrate

View File

@@ -61,7 +61,12 @@ func (c *sqlPool) Begin(ctx context.Context, opts *database.TransactionOptions)
if err != nil {
return nil, wrapError(err)
}
return &sqlTx{tx}, nil
return &Transaction{tx}, nil
}
// Ping implements [database.Pool].
func (c *sqlPool) Ping(ctx context.Context) error {
return wrapError(c.PingContext(ctx))
}
// Close implements [database.Pool].

View File

@@ -8,28 +8,28 @@ import (
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
type sqlTx struct{ *sql.Tx }
type Transaction struct{ *sql.Tx }
var _ database.Transaction = (*sqlTx)(nil)
var _ database.Transaction = (*Transaction)(nil)
func SQLTx(tx *sql.Tx) *sqlTx {
return &sqlTx{
func SQLTx(tx *sql.Tx) *Transaction {
return &Transaction{
Tx: tx,
}
}
// Commit implements [database.Transaction].
func (tx *sqlTx) Commit(ctx context.Context) error {
func (tx *Transaction) Commit(ctx context.Context) error {
return wrapError(tx.Tx.Commit())
}
// Rollback implements [database.Transaction].
func (tx *sqlTx) Rollback(ctx context.Context) error {
func (tx *Transaction) Rollback(ctx context.Context) error {
return wrapError(tx.Tx.Rollback())
}
// End implements [database.Transaction].
func (tx *sqlTx) End(ctx context.Context, err error) error {
func (tx *Transaction) End(ctx context.Context, err error) error {
if err != nil {
rollbackErr := tx.Rollback(ctx)
if rollbackErr != nil {
@@ -42,7 +42,7 @@ func (tx *sqlTx) End(ctx context.Context, err error) error {
// Query implements [database.Transaction].
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
func (tx *sqlTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
func (tx *Transaction) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
//nolint:rowserrcheck // Rows.Close is called by the caller
rows, err := tx.QueryContext(ctx, sql, args...)
if err != nil {
@@ -53,13 +53,13 @@ func (tx *sqlTx) Query(ctx context.Context, sql string, args ...any) (database.R
// QueryRow implements [database.Transaction].
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
func (tx *sqlTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
func (tx *Transaction) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
return &Row{tx.QueryRowContext(ctx, sql, args...)}
}
// Exec implements [database.Transaction].
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
func (tx *Transaction) Exec(ctx context.Context, sql string, args ...any) (int64, error) {
res, err := tx.ExecContext(ctx, sql, args...)
if err != nil {
return 0, wrapError(err)
@@ -69,7 +69,7 @@ func (tx *sqlTx) Exec(ctx context.Context, sql string, args ...any) (int64, erro
// Begin implements [database.Transaction].
// As postgres does not support nested transactions we use savepoints to emulate them.
func (tx *sqlTx) Begin(ctx context.Context) (database.Transaction, error) {
func (tx *Transaction) Begin(ctx context.Context) (database.Transaction, error) {
_, err := tx.ExecContext(ctx, createSavepoint)
if err != nil {
return nil, wrapError(err)

View File

@@ -95,6 +95,10 @@ func (e *IntegrityViolationError) Is(target error) bool {
return ok
}
func (e *IntegrityViolationError) Unwrap() error {
return e.original
}
// CheckError is returned when a check constraint fails.
// It wraps the [IntegrityViolationError] to provide more context.
// It is used to indicate that a check constraint was violated during an insert or update operation.

View File

@@ -4,21 +4,15 @@ import (
"context"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
type Event struct {
AggregateType string `json:"aggregateType"`
AggregateID string `json:"aggregateId"`
Type string `json:"type"`
Payload any `json:"payload,omitempty"`
type LegacyEventstore interface {
PushWithNewClient(ctx context.Context, client database.QueryExecutor, commands ...eventstore.Command) ([]eventstore.Event, error)
}
func Publish(ctx context.Context, events []*Event, db database.Executor) error {
for _, event := range events {
_, err := db.Exec(ctx, `INSERT INTO events (aggregate_type, aggregate_id) VALUES ($1, $2)`, event.AggregateType, event.AggregateID)
if err != nil {
return err
}
}
return nil
// Publish writes events to the eventstore using the provided pusher and database client.
func Publish(ctx context.Context, es LegacyEventstore, client database.QueryExecutor, commands ...eventstore.Command) error {
_, err := es.PushWithNewClient(ctx, client, commands...)
return err
}

View File

@@ -2,6 +2,7 @@ package eventstore
import (
"context"
"database/sql"
"errors"
"sort"
"time"
@@ -10,6 +11,8 @@ import (
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
new_db "github.com/zitadel/zitadel/backend/v3/storage/database"
new_sql "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/sql"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -91,6 +94,19 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
// PushWithClient pushes the events in a single transaction using the provided database client
// an event needs at least an aggregate
func (es *Eventstore) PushWithClient(ctx context.Context, client database.ContextQueryExecuter, cmds ...Command) ([]Event, error) {
var dbClient new_db.QueryExecutor
switch client := client.(type) {
case *sql.DB:
dbClient = new_sql.SQLPool(client)
case *sql.Tx:
dbClient = new_sql.SQLTx(client)
case *sql.Conn:
dbClient = new_sql.SQLConn(client)
}
return es.PushWithNewClient(ctx, dbClient, cmds...)
}
func (es *Eventstore) PushWithNewClient(ctx context.Context, client new_db.QueryExecutor, cmds ...Command) ([]Event, error) {
if es.PushTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
@@ -108,7 +124,7 @@ retry:
for i := 0; i <= es.maxRetries; i++ {
events, err = es.pusher.Push(ctx, client, cmds...)
// if there is a transaction passed the calling function needs to retry
if _, ok := client.(database.Tx); ok {
if _, ok := client.(new_db.Transaction); ok {
break retry
}
var pgErr *pgconn.PgError
@@ -284,9 +300,7 @@ type Pusher interface {
// Health checks if the connection to the storage is available
Health(ctx context.Context) error
// Push stores the actions
Push(ctx context.Context, client database.ContextQueryExecuter, commands ...Command) (_ []Event, err error)
// Client returns the underlying database connection
Client() *database.DB
Push(ctx context.Context, client new_db.QueryExecutor, commands ...Command) (_ []Event, err error)
}
type FillFieldsEvent interface {

View File

@@ -1,162 +0,0 @@
package eventstore_test
import (
"context"
_ "embed"
"fmt"
"strconv"
"testing"
"github.com/zitadel/zitadel/internal/eventstore"
)
//go:embed bench_payload.txt
var text string
func Benchmark_Push_SameAggregate(b *testing.B) {
ctx := context.Background()
smallPayload := struct {
Username string
Firstname string
Lastname string
}{
Username: "username",
Firstname: "firstname",
Lastname: "lastname",
}
bigPayload := struct {
Username string
Firstname string
Lastname string
Text string
}{
Username: "username",
Firstname: "firstname",
Lastname: "lastname",
Text: text,
}
commands := map[string][]eventstore.Command{
"no payload one command": {
generateCommand(eventstore.AggregateType(b.Name()), "id"),
},
"small payload one command": {
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(smallPayload)),
},
"big payload one command": {
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(bigPayload)),
},
"no payload multiple commands": {
generateCommand(eventstore.AggregateType(b.Name()), "id"),
generateCommand(eventstore.AggregateType(b.Name()), "id"),
generateCommand(eventstore.AggregateType(b.Name()), "id"),
},
"mixed payload multiple command": {
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(smallPayload)),
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(bigPayload)),
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(smallPayload)),
generateCommand(eventstore.AggregateType(b.Name()), "id", withTestData(bigPayload)),
},
}
for cmdsKey, cmds := range commands {
for pusherKey, store := range pushers {
b.Run(fmt.Sprintf("Benchmark_Push_SameAggregate-%s-%s", pusherKey, cmdsKey), func(b *testing.B) {
b.StopTimer()
cleanupEventstore(clients[pusherKey])
b.StartTimer()
for n := 0; n < b.N; n++ {
_, err := store.Push(ctx, store.Client().DB, cmds...)
if err != nil {
b.Error(err)
}
}
})
}
}
}
func Benchmark_Push_MultipleAggregate_Parallel(b *testing.B) {
smallPayload := struct {
Username string
Firstname string
Lastname string
}{
Username: "username",
Firstname: "firstname",
Lastname: "lastname",
}
bigPayload := struct {
Username string
Firstname string
Lastname string
Text string
}{
Username: "username",
Firstname: "firstname",
Lastname: "lastname",
Text: text,
}
commandCreators := map[string]func(id string) []eventstore.Command{
"no payload one command": func(id string) []eventstore.Command {
return []eventstore.Command{
generateCommand(eventstore.AggregateType(b.Name()), id),
}
},
"small payload one command": func(id string) []eventstore.Command {
return []eventstore.Command{
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(smallPayload)),
}
},
"big payload one command": func(id string) []eventstore.Command {
return []eventstore.Command{
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(bigPayload)),
}
},
"no payload multiple commands": func(id string) []eventstore.Command {
return []eventstore.Command{
generateCommand(eventstore.AggregateType(b.Name()), id),
generateCommand(eventstore.AggregateType(b.Name()), id),
generateCommand(eventstore.AggregateType(b.Name()), id),
}
},
"mixed payload multiple command": func(id string) []eventstore.Command {
return []eventstore.Command{
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(smallPayload)),
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(bigPayload)),
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(smallPayload)),
generateCommand(eventstore.AggregateType(b.Name()), id, withTestData(bigPayload)),
}
},
}
for cmdsKey, commandCreator := range commandCreators {
for pusherKey, store := range pushers {
b.Run(fmt.Sprintf("Benchmark_Push_DifferentAggregate-%s-%s", cmdsKey, pusherKey), func(b *testing.B) {
b.StopTimer()
cleanupEventstore(clients[pusherKey])
ctx, cancel := context.WithCancel(context.Background())
b.StartTimer()
i := 0
b.RunParallel(func(p *testing.PB) {
for p.Next() {
i++
_, err := store.Push(ctx, store.Client().DB, commandCreator(strconv.Itoa(i))...)
if err != nil {
b.Error(err)
}
}
})
cancel()
})
}
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
new_db "github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
"github.com/zitadel/zitadel/internal/database"
@@ -348,7 +349,7 @@ func (repo *testPusher) Health(ctx context.Context) error {
return nil
}
func (repo *testPusher) Push(_ context.Context, _ database.ContextQueryExecuter, commands ...Command) (events []Event, err error) {
func (repo *testPusher) Push(_ context.Context, _ new_db.QueryExecutor, commands ...Command) (events []Event, err error) {
if len(repo.errs) != 0 {
err, repo.errs = repo.errs[0], repo.errs[1:]
return nil, err

View File

@@ -14,7 +14,8 @@ import (
reflect "reflect"
decimal "github.com/shopspring/decimal"
database "github.com/zitadel/zitadel/internal/database"
database "github.com/zitadel/zitadel/backend/v3/storage/database"
database0 "github.com/zitadel/zitadel/internal/database"
eventstore "github.com/zitadel/zitadel/internal/eventstore"
gomock "go.uber.org/mock/gomock"
)
@@ -43,10 +44,10 @@ func (m *MockQuerier) EXPECT() *MockQuerierMockRecorder {
}
// Client mocks base method.
func (m *MockQuerier) Client() *database.DB {
func (m *MockQuerier) Client() *database0.DB {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Client")
ret0, _ := ret[0].(*database.DB)
ret0, _ := ret[0].(*database0.DB)
return ret0
}
@@ -137,20 +138,6 @@ func (m *MockPusher) EXPECT() *MockPusherMockRecorder {
return m.recorder
}
// Client mocks base method.
func (m *MockPusher) Client() *database.DB {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Client")
ret0, _ := ret[0].(*database.DB)
return ret0
}
// Client indicates an expected call of Client.
func (mr *MockPusherMockRecorder) Client() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Client", reflect.TypeOf((*MockPusher)(nil).Client))
}
// Health mocks base method.
func (m *MockPusher) Health(arg0 context.Context) error {
m.ctrl.T.Helper()
@@ -166,7 +153,7 @@ func (mr *MockPusherMockRecorder) Health(arg0 any) *gomock.Call {
}
// Push mocks base method.
func (m *MockPusher) Push(arg0 context.Context, arg1 database.ContextQueryExecuter, arg2 ...eventstore.Command) ([]eventstore.Event, error) {
func (m *MockPusher) Push(arg0 context.Context, arg1 database.QueryExecutor, arg2 ...eventstore.Command) ([]eventstore.Event, error) {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {

View File

@@ -15,6 +15,7 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
new_sql "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/sql"
"github.com/zitadel/zitadel/internal/database"
db_mock "github.com/zitadel/zitadel/internal/database/mock"
"github.com/zitadel/zitadel/internal/database/postgres"
@@ -646,7 +647,7 @@ func Test_query_events_with_postgres(t *testing.T) {
pusher := new_es.NewEventstore(dbClient)
// setup initial data for query
if _, err := pusher.Push(context.Background(), dbClient.DB, tt.fields.existingEvents...); err != nil {
if _, err := pusher.Push(context.Background(), new_sql.SQLPool(dbClient.DB), tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}

View File

@@ -13,6 +13,8 @@ import (
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
new_db "github.com/zitadel/zitadel/backend/v3/storage/database"
new_sql "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/sql"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -32,7 +34,7 @@ var (
)
type Eventstore struct {
client *database.DB
client new_db.Pool
queue eventstore.ExecutionQueue
}
@@ -153,14 +155,9 @@ func RegisterEventstoreTypes(ctx context.Context, conn *pgx.Conn) error {
return nil
}
// Client implements the [eventstore.Pusher]
func (es *Eventstore) Client() *database.DB {
return es.client
}
func NewEventstore(client *database.DB, opts ...EventstoreOption) *Eventstore {
es := &Eventstore{
client: client,
client: new_sql.SQLPool(client.DB),
}
for _, opt := range opts {
opt(es)
@@ -169,7 +166,7 @@ func NewEventstore(client *database.DB, opts ...EventstoreOption) *Eventstore {
}
func (es *Eventstore) Health(ctx context.Context) error {
return es.client.PingContext(ctx)
return es.client.Ping(ctx)
}
var errTypesNotFound = errors.New("types not found")
@@ -188,24 +185,24 @@ func CheckExecutionPlan(ctx context.Context, conn *sql.Conn) error {
})
}
func (es *Eventstore) pushTx(ctx context.Context, client database.ContextQueryExecuter) (tx database.Tx, deferrable func(err error) error, err error) {
tx, ok := client.(database.Tx)
func (es *Eventstore) pushTx(ctx context.Context, client new_db.QueryExecutor) (tx new_db.Transaction, deferrable func(err error) error, err error) {
tx, ok := client.(new_db.Transaction)
if ok {
return tx, nil, nil
}
beginner, ok := client.(database.Beginner)
beginner, ok := client.(new_db.Beginner)
if !ok {
beginner = es.client
}
tx, err = beginner.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
tx, err = beginner.Begin(ctx, &new_db.TransactionOptions{
IsolationLevel: new_db.IsolationLevelReadCommitted,
AccessMode: new_db.AccessModeReadWrite,
})
if err != nil {
return nil, nil, err
}
return tx, func(err error) error { return database.CloseTransaction(tx, err) }, nil
return tx, func(err error) error { return tx.End(ctx, err) }, nil
}
type EventstoreOption func(*Eventstore)

View File

@@ -2,7 +2,6 @@ package eventstore
import (
"context"
"database/sql"
_ "embed"
"encoding/json"
"reflect"
@@ -10,8 +9,8 @@ import (
"strconv"
"strings"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -29,16 +28,12 @@ func (es *Eventstore) FillFields(ctx context.Context, events ...eventstore.FillF
ctx, span := tracing.NewSpan(ctx)
defer span.End()
tx, err := es.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
tx, err := es.client.Begin(ctx, &database.TransactionOptions{IsolationLevel: database.IsolationLevelReadCommitted})
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
return
}
err = tx.Commit()
err = tx.End(ctx, err)
}()
return handleFieldFillEvents(ctx, tx, events)
@@ -52,40 +47,39 @@ func (es *Eventstore) Search(ctx context.Context, conditions ...map[eventstore.F
var builder strings.Builder
args := buildSearchStatement(ctx, &builder, conditions...)
err = es.client.QueryContext(
rows, err := es.client.Query(
ctx,
func(rows *sql.Rows) error {
for rows.Next() {
var (
res eventstore.SearchResult
value fieldValue
)
err = rows.Scan(
&res.Aggregate.InstanceID,
&res.Aggregate.ResourceOwner,
&res.Aggregate.Type,
&res.Aggregate.ID,
&res.Object.Type,
&res.Object.ID,
&res.Object.Revision,
&res.FieldName,
&value.value,
)
if err != nil {
return err
}
res.Value = &value
result = append(result, &res)
}
return nil
},
builder.String(),
args...,
)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var (
res eventstore.SearchResult
value fieldValue
)
err = rows.Scan(
&res.Aggregate.InstanceID,
&res.Aggregate.ResourceOwner,
&res.Aggregate.Type,
&res.Aggregate.ID,
&res.Object.Type,
&res.Object.ID,
&res.Object.Revision,
&res.FieldName,
&value.value,
)
if err != nil {
return nil, err
}
res.Value = &value
result = append(result, &res)
}
return result, nil
}
@@ -143,7 +137,7 @@ func buildSearchCondition(builder *strings.Builder, index int, conditions map[ev
return args
}
func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Tx, commands []eventstore.Command) error {
func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Transaction, commands []eventstore.Command) error {
for _, command := range commands {
if len(command.Fields()) > 0 {
if err := handleFieldOperations(ctx, tx, command.Fields()); err != nil {
@@ -154,7 +148,7 @@ func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Tx, c
return nil
}
func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventstore.FillFieldsEvent) error {
func handleFieldFillEvents(ctx context.Context, tx database.Transaction, events []eventstore.FillFieldsEvent) error {
for _, event := range events {
if len(event.Fields()) == 0 {
continue
@@ -166,7 +160,7 @@ func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventst
return nil
}
func handleFieldOperations(ctx context.Context, tx database.Tx, operations []*eventstore.FieldOperation) error {
func handleFieldOperations(ctx context.Context, tx database.Transaction, operations []*eventstore.FieldOperation) error {
for _, operation := range operations {
if operation.Set != nil {
if err := handleFieldSet(ctx, tx, operation.Set); err != nil {
@@ -184,7 +178,7 @@ func handleFieldOperations(ctx context.Context, tx database.Tx, operations []*ev
return nil
}
func handleFieldSet(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
func handleFieldSet(ctx context.Context, tx database.Transaction, field *eventstore.Field) error {
if len(field.UpsertConflictFields) == 0 {
return handleSearchInsert(ctx, tx, field)
}
@@ -195,12 +189,12 @@ const (
insertField = `INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`
)
func handleSearchInsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
func handleSearchInsert(ctx context.Context, tx database.Transaction, field *eventstore.Field) error {
value, err := json.Marshal(field.Value.Value)
if err != nil {
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
}
_, err = tx.ExecContext(
_, err = tx.Exec(
ctx,
insertField,
@@ -224,13 +218,13 @@ const (
fieldsUpsertSuffix = ` RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)`
)
func handleSearchUpsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
func handleSearchUpsert(ctx context.Context, tx database.Transaction, field *eventstore.Field) error {
value, err := json.Marshal(field.Value.Value)
if err != nil {
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
}
_, err = tx.ExecContext(
_, err = tx.Exec(
ctx,
writeUpsertField(field.UpsertConflictFields),
@@ -270,12 +264,12 @@ func writeUpsertField(fields []eventstore.FieldType) string {
const removeSearch = `DELETE FROM eventstore.fields WHERE `
func handleSearchDelete(ctx context.Context, tx database.Tx, clauses map[eventstore.FieldType]any) error {
func handleSearchDelete(ctx context.Context, tx database.Transaction, clauses map[eventstore.FieldType]any) error {
if len(clauses) == 0 {
return zerrors.ThrowInvalidArgument(nil, "V3-oqlBZ", "no conditions")
}
stmt, args := writeDeleteField(clauses)
_, err := tx.ExecContext(ctx, stmt, args...)
_, err := tx.Exec(ctx, stmt, args...)
return err
}

View File

@@ -2,22 +2,22 @@ package eventstore
import (
"context"
"database/sql"
_ "embed"
"fmt"
"github.com/riverqueue/river"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/sql"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/queue"
exec_repo "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
func (es *Eventstore) Push(ctx context.Context, client database.ContextQueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) {
func (es *Eventstore) Push(ctx context.Context, client database.QueryExecutor, commands ...eventstore.Command) (events []eventstore.Event, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -29,20 +29,9 @@ func (es *Eventstore) Push(ctx context.Context, client database.ContextQueryExec
return events, err
}
func (es *Eventstore) writeCommands(ctx context.Context, client database.ContextQueryExecuter, commands []eventstore.Command) (_ []eventstore.Event, err error) {
var conn *sql.Conn
switch c := client.(type) {
case database.Client:
conn, err = c.Conn(ctx)
case nil:
conn, err = es.client.Conn(ctx)
client = conn
}
if err != nil {
return nil, err
}
if conn != nil {
defer conn.Close()
func (es *Eventstore) writeCommands(ctx context.Context, client database.QueryExecutor, commands []eventstore.Command) (_ []eventstore.Event, err error) {
if client == nil {
client = es.client
}
tx, close, err := es.pushTx(ctx, client)
@@ -56,7 +45,7 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context
}
// lock the instance for reading events if await events is set for the duration of the transaction.
_, err = tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock_shared('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))", authz.GetInstance(ctx).InstanceID())
_, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock_shared('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))", authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
@@ -82,7 +71,7 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context
return events, nil
}
func writeEvents(ctx context.Context, tx database.Tx, commands []eventstore.Command) (_ []eventstore.Event, err error) {
func writeEvents(ctx context.Context, tx database.Transaction, commands []eventstore.Command) (_ []eventstore.Event, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -91,7 +80,7 @@ func writeEvents(ctx context.Context, tx database.Tx, commands []eventstore.Comm
return nil, err
}
rows, err := tx.QueryContext(ctx, `select owner, created_at, "sequence", position from eventstore.push($1::eventstore.command[])`, cmds)
rows, err := tx.Query(ctx, `select owner, created_at, "sequence", position from eventstore.push($1::eventstore.command[])`, cmds)
if err != nil {
return nil, err
}
@@ -110,12 +99,12 @@ func writeEvents(ctx context.Context, tx database.Tx, commands []eventstore.Comm
return events, nil
}
func (es *Eventstore) queueExecutions(ctx context.Context, tx database.Tx, events []eventstore.Event) error {
func (es *Eventstore) queueExecutions(ctx context.Context, tx database.Transaction, events []eventstore.Event) error {
if es.queue == nil {
return nil
}
sqlTx, ok := tx.(*sql.Tx)
sqlTx, ok := tx.(*sql.Transaction)
if !ok {
types := make([]string, len(events))
for i, event := range events {
@@ -132,7 +121,7 @@ func (es *Eventstore) queueExecutions(ctx context.Context, tx database.Tx, event
return nil
}
return es.queue.InsertManyFastTx(
ctx, sqlTx, jobArgs,
ctx, sqlTx.Tx, jobArgs,
queue.WithQueueName(exec_repo.QueueName),
)
}

View File

@@ -2,7 +2,6 @@ package eventstore
import (
"context"
"database/sql"
_ "embed"
"testing"
@@ -11,6 +10,9 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
new_db "github.com/zitadel/zitadel/backend/v3/storage/database"
new_pg "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres"
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/sql"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/postgres"
@@ -268,7 +270,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
}
type args struct {
ctx context.Context
tx database.Tx
tx new_db.Transaction
events []eventstore.Event
}
tests := []struct {
@@ -278,7 +280,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
wantErr bool
}{
{
name: "incorrect Tx type, noop",
name: "no Tx type, noop",
queue: func(t *testing.T) eventstore.ExecutionQueue {
mQueue := mock.NewMockExecutionQueue(gomock.NewController(t))
return mQueue
@@ -290,6 +292,19 @@ func TestEventstore_queueExecutions(t *testing.T) {
},
wantErr: false,
},
{
name: "incorrect Tx type, noop",
queue: func(t *testing.T) eventstore.ExecutionQueue {
mQueue := mock.NewMockExecutionQueue(gomock.NewController(t))
return mQueue
},
args: args{
ctx: context.Background(),
tx: new_pg.PGxTx(nil),
events: events,
},
wantErr: false,
},
{
name: "no events",
queue: func(t *testing.T) eventstore.ExecutionQueue {
@@ -298,7 +313,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
},
args: args{
ctx: context.Background(),
tx: &sql.Tx{},
tx: sql.SQLTx(nil),
events: []eventstore.Event{},
},
wantErr: false,
@@ -311,7 +326,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
},
args: args{
ctx: context.Background(),
tx: &sql.Tx{},
tx: sql.SQLTx(nil),
events: events,
},
wantErr: false,
@@ -331,7 +346,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
},
}),
),
tx: &sql.Tx{},
tx: sql.SQLTx(nil),
events: events,
},
wantErr: false,
@@ -360,7 +375,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
{ExecutionID: "event"},
}),
),
tx: &sql.Tx{},
tx: sql.SQLTx(nil),
events: events,
},
wantErr: false,
@@ -389,7 +404,7 @@ func TestEventstore_queueExecutions(t *testing.T) {
{ExecutionID: "event/ex.removed"},
}),
),
tx: &sql.Tx{},
tx: sql.SQLTx(nil),
events: events,
},
wantErr: false,

View File

@@ -10,7 +10,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -35,7 +35,7 @@ var (
// pushWithoutFunc implements pushing events before setup step 39 was introduced.
// TODO: remove with v3
func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.ContextQueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) {
func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.QueryExecutor, commands ...eventstore.Command) (events []eventstore.Event, err error) {
tx, closeTx, err := es.pushTx(ctx, client)
if err != nil {
return nil, err
@@ -69,13 +69,13 @@ func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.Conte
return events, nil
}
func (es *Eventstore) writeEventsOld(ctx context.Context, tx database.Tx, sequences []*latestSequence, commands []eventstore.Command) ([]eventstore.Event, error) {
func (es *Eventstore) writeEventsOld(ctx context.Context, tx database.Transaction, sequences []*latestSequence, commands []eventstore.Command) ([]eventstore.Event, error) {
events, placeholders, args, err := mapCommands(commands, sequences)
if err != nil {
return nil, err
}
rows, err := tx.QueryContext(ctx, fmt.Sprintf(pushStmt, strings.Join(placeholders, ", ")), args...)
rows, err := tx.Query(ctx, fmt.Sprintf(pushStmt, strings.Join(placeholders, ", ")), args...)
if err != nil {
return nil, err
}

View File

@@ -2,15 +2,14 @@ package eventstore
import (
"context"
"database/sql"
_ "embed"
"fmt"
"strings"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -23,11 +22,11 @@ type latestSequence struct {
//go:embed sequences_query.sql
var latestSequencesStmt string
func latestSequences(ctx context.Context, tx database.Tx, commands []eventstore.Command) ([]*latestSequence, error) {
func latestSequences(ctx context.Context, tx database.Transaction, commands []eventstore.Command) ([]*latestSequence, error) {
sequences := commandsToSequences(ctx, commands)
conditions, args := sequencesToSql(sequences)
rows, err := tx.QueryContext(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " UNION ALL ")), args...)
rows, err := tx.Query(ctx, fmt.Sprintf(latestSequencesStmt, strings.Join(conditions, " UNION ALL ")), args...)
if err != nil {
return nil, zerrors.ThrowInternal(err, "V3-5jU5z", "Errors.Internal")
}
@@ -104,7 +103,7 @@ func sequencesToSql(sequences []*latestSequence) (conditions []string, args []an
return conditions, args
}
func scanToSequence(rows *sql.Rows, sequences []*latestSequence) error {
func scanToSequence(rows database.Rows, sequences []*latestSequence) error {
var aggregateType eventstore.AggregateType
var aggregateID, instanceID string
var currentSequence uint64

View File

@@ -10,7 +10,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -25,7 +25,7 @@ var (
addConstraintStmt string
)
func handleUniqueConstraints(ctx context.Context, tx database.Tx, commands []eventstore.Command) (err error) {
func handleUniqueConstraints(ctx context.Context, tx database.Transaction, commands []eventstore.Command) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -62,7 +62,7 @@ func handleUniqueConstraints(ctx context.Context, tx database.Tx, commands []eve
}
if len(deletePlaceholders) > 0 {
_, err := tx.ExecContext(ctx, fmt.Sprintf(deleteConstraintStmt, strings.Join(deletePlaceholders, " OR ")), deleteArgs...)
_, err := tx.Exec(ctx, fmt.Sprintf(deleteConstraintStmt, strings.Join(deletePlaceholders, " OR ")), deleteArgs...)
if err != nil {
logging.WithError(err).Warn("delete unique constraint failed")
errMessage := "Errors.Internal"
@@ -73,7 +73,7 @@ func handleUniqueConstraints(ctx context.Context, tx database.Tx, commands []eve
}
}
if len(addPlaceholders) > 0 {
_, err := tx.ExecContext(ctx, fmt.Sprintf(addConstraintStmt, strings.Join(addPlaceholders, ", ")), addArgs...)
_, err := tx.Exec(ctx, fmt.Sprintf(addConstraintStmt, strings.Join(addPlaceholders, ", ")), addArgs...)
if err != nil {
logging.WithError(err).Warn("add unique constraint failed")
errMessage := "Errors.Internal"