Files
zitadel/backend/v3/storage/database/database.go
Silvan 22ef817d5c fix(eventstore): Make Eventstore Compatible with Relational Table Package (#10687)
Improves compatibility of eventstore and related database components
with the new relational table package.

## Which problems are solved

1. **Incompatible Database Interfaces**: The existing eventstore was
tightly coupled to the database package, which is incompatible with the
new, more abstract relational table package in v3. This prevented the
new command-side logic from pushing events to the legacy eventstore.
2. **Missing Health Checks**: The database interfaces in the new package
lacked a Ping method, making it impossible to perform health checks on
database connections.
3. **Event Publishing Logic**: The command handling logic in domain
needed a way to collect and push events to the legacy eventstore after a
command was successfully executed.

## How the problems are solved

1. **`LegacyEventstore` Interface**:
* A new `LegacyEventstore` interface is introduced in the new
`database/eventstore` . This interface exposes a `PushWithNewClient`
method that accepts the new `database.QueryExecutor` interface,
decoupling the v3 domain from the legacy implementation.
* The `internal/eventstore.Eventstore` now implements this interface. A
wrapper, PushWithClient, is added to convert the old database client
types (`*sql.DB`, `*sql.Tx`) into the new `QueryExecutor` types before
calling `PushWithNewClient`.
2. **Database Interface Updates**:
* The `database.Pool` and `database.Client` interfaces in
`storage/eventstore` have been updated to include a Ping method,
allowing for consistent health checks across different database
dialects.
* The `postgres` and `sql` dialect implementations have been updated to
support this new method.
3. **Command and Invoker Refactoring**:
* The `Commander` interface in domain now includes an `Events()
[]legacy_es.Command` method. This allows commands to declare which
events they will generate.
* The `eventCollector` in the invoker logic has been redesigned. It now
ensures a database transaction is started before executing a command.
After successful execution, it calls the `Events()` method on the
command to collect the generated events and appends them to a list.
* The `eventStoreInvoker` then pushes all collected events to the legacy
eventstore using the new `LegacyEventstore` interface, ensuring that
events are only pushed if the entire command (and any sub-commands)
executes successfully within the transaction.
4. **Testing**:
* New unit tests have been added for the invoker to verify that events
are correctly collected from single commands, batched commands, and
nested commands.

These changes create a clean bridge between the new v3 command-side
logic and the existing v1 eventstore, allowing for incremental adoption
of the new architecture while maintaining full functionality.

## Additional Information

closes https://github.com/zitadel/zitadel/issues/10442
2025-09-16 18:58:49 +02:00

88 lines
2.1 KiB
Go

package database
import (
"context"
)
// Pool is a connection pool. e.g. pgxpool
type Pool interface {
Beginner
QueryExecutor
Migrator
Acquire(ctx context.Context) (Client, error)
Close(ctx context.Context) error
Ping(ctx context.Context) error
}
type PoolTest interface {
Pool
// MigrateTest is the same as [Migrator] but executes the migrations multiple times instead of only once.
MigrateTest(ctx context.Context) error
}
// Client is a single database connection which can be released back to the pool.
type Client interface {
Beginner
QueryExecutor
Migrator
Release(ctx context.Context) error
Ping(ctx context.Context) error
}
// Querier is a database client that can execute queries and return rows.
type Querier interface {
Query(ctx context.Context, stmt string, args ...any) (Rows, error)
QueryRow(ctx context.Context, stmt string, args ...any) Row
}
// Executor is a database client that can execute statements.
// It returns the number of rows affected or an error
type Executor interface {
Exec(ctx context.Context, stmt string, args ...any) (int64, error)
}
// QueryExecutor is a database client that can execute queries and statements.
type QueryExecutor interface {
Querier
Executor
}
// Scanner scans a single row of data into the destination.
type Scanner interface {
Scan(dest ...any) error
}
// Row is an abstraction of sql.Row.
type Row interface {
Scanner
}
// Rows is an abstraction of sql.Rows.
type Rows interface {
Scanner
Next() bool
Close() error
Err() error
}
type CollectableRows interface {
// Collect collects all rows and scans them into dest.
// dest must be a pointer to a slice of pointer to structs
// e.g. *[]*MyStruct
// Rows are closed after this call.
Collect(dest any) error
// CollectFirst collects the first row and scans it into dest.
// dest must be a pointer to a struct
// e.g. *MyStruct{}
// Rows are closed after this call.
CollectFirst(dest any) error
// CollectExactlyOneRow collects exactly one row and scans it into dest.
// e.g. *MyStruct{}
// Rows are closed after this call.
CollectExactlyOneRow(dest any) error
}