chore!: Introduce ZITADEL v3 (#9645)

This PR summarizes multiple changes specifically only available with
ZITADEL v3:

- feat: Web Keys management
(https://github.com/zitadel/zitadel/pull/9526)
- fix(cmd): ensure proper working of mirror
(https://github.com/zitadel/zitadel/pull/9509)
- feat(Authz): system user support for permission check v2
(https://github.com/zitadel/zitadel/pull/9640)
- chore(license): change from Apache to AGPL
(https://github.com/zitadel/zitadel/pull/9597)
- feat(console): list v2 sessions
(https://github.com/zitadel/zitadel/pull/9539)
- fix(console): add loginV2 feature flag
(https://github.com/zitadel/zitadel/pull/9682)
- fix(feature flags): allow reading "own" flags
(https://github.com/zitadel/zitadel/pull/9649)
- feat(console): add Actions V2 UI
(https://github.com/zitadel/zitadel/pull/9591)

BREAKING CHANGE
- feat(webkey): migrate to v2beta API
(https://github.com/zitadel/zitadel/pull/9445)
- chore!: remove CockroachDB Support
(https://github.com/zitadel/zitadel/pull/9444)
- feat(actions): migrate to v2beta API
(https://github.com/zitadel/zitadel/pull/9489)

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
Co-authored-by: Ramon <mail@conblem.me>
Co-authored-by: Elio Bischof <elio@zitadel.com>
Co-authored-by: Kenta Yamaguchi <56732734+KEY60228@users.noreply.github.com>
Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com>
Co-authored-by: Livio Spring <livio@zitadel.com>
Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Iraq <66622793+kkrime@users.noreply.github.com>
Co-authored-by: Florian Forster <florian@zitadel.com>
Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Max Peintner <peintnerm@gmail.com>
This commit is contained in:
Fabienne Bühler
2025-04-02 16:53:06 +02:00
committed by GitHub
parent d14a23ae7e
commit 07ce3b6905
559 changed files with 14578 additions and 7622 deletions

View File

@@ -11,7 +11,7 @@ import (
"github.com/zitadel/zitadel/internal/eventstore"
)
func TestCRDB_Push_OneAggregate(t *testing.T) {
func TestEventstore_Push_OneAggregate(t *testing.T) {
type args struct {
ctx context.Context
commands []eventstore.Command
@@ -202,7 +202,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
}
}
if _, err := db.Push(tt.args.ctx, tt.args.commands...); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
t.Errorf("eventstore.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
assertEventCount(t,
@@ -218,7 +218,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
}
}
func TestCRDB_Push_MultipleAggregate(t *testing.T) {
func TestEventstore_Push_MultipleAggregate(t *testing.T) {
type args struct {
commands []eventstore.Command
}
@@ -312,7 +312,7 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
},
)
if _, err := db.Push(context.Background(), tt.args.commands...); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
t.Errorf("eventstore.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
assertEventCount(t, clients[pusherName], tt.res.eventsRes.aggType, tt.res.eventsRes.aggID, tt.res.eventsRes.pushedEventsCount)
@@ -321,7 +321,7 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
}
}
func TestCRDB_Push_Parallel(t *testing.T) {
func TestEventstore_Push_Parallel(t *testing.T) {
type args struct {
commands [][]eventstore.Command
}
@@ -453,7 +453,7 @@ func TestCRDB_Push_Parallel(t *testing.T) {
}
}
func TestCRDB_Push_ResourceOwner(t *testing.T) {
func TestEventstore_Push_ResourceOwner(t *testing.T) {
type args struct {
commands []eventstore.Command
}
@@ -587,7 +587,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
events, err := db.Push(context.Background(), tt.args.commands...)
if err != nil {
t.Errorf("CRDB.Push() error = %v", err)
t.Errorf("eventstore.Push() error = %v", err)
}
if len(events) != len(tt.res.resourceOwners) {

View File

@@ -7,7 +7,7 @@ import (
"github.com/zitadel/zitadel/internal/eventstore"
)
func TestCRDB_Filter(t *testing.T) {
func TestEventstore_Filter(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
@@ -120,18 +120,18 @@ func TestCRDB_Filter(t *testing.T) {
events, err := db.Filter(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("eventstore.query() error = %v, wantErr %v", err, tt.wantErr)
}
if len(events) != tt.res.eventCount {
t.Errorf("CRDB.query() expected event count: %d got %d", tt.res.eventCount, len(events))
t.Errorf("eventstore.query() expected event count: %d got %d", tt.res.eventCount, len(events))
}
})
}
}
}
func TestCRDB_LatestSequence(t *testing.T) {
func TestEventstore_LatestSequence(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
@@ -204,10 +204,10 @@ func TestCRDB_LatestSequence(t *testing.T) {
sequence, err := db.LatestSequence(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("eventstore.query() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.res.sequence > sequence {
t.Errorf("CRDB.query() expected sequence: %v got %v", tt.res.sequence, sequence)
t.Errorf("eventstore.query() expected sequence: %v got %v", tt.res.sequence, sequence)
}
})
}

View File

@@ -289,8 +289,8 @@ func (rm *UserReadModel) Reduce() error {
func TestUserReadModel(t *testing.T) {
es := eventstore.NewEventstore(
&eventstore.Config{
Querier: query_repo.NewCRDB(testCRDBClient),
Pusher: v3.NewEventstore(testCRDBClient),
Querier: query_repo.NewPostgres(testClient),
Pusher: v3.NewEventstore(testClient),
},
)

View File

@@ -665,7 +665,6 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AwaitOpenTransactions().
Limit(uint64(h.bulkLimit)).
AllowTimeTravel().
OrderAsc().
InstanceID(currentState.instanceID)

View File

@@ -8,111 +8,100 @@ import (
"testing"
"time"
"github.com/cockroachdb/cockroach-go/v2/testserver"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/cmd/initialise"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/database/postgres"
"github.com/zitadel/zitadel/internal/eventstore"
es_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
)
var (
testCRDBClient *database.DB
queriers map[string]eventstore.Querier = make(map[string]eventstore.Querier)
pushers map[string]eventstore.Pusher = make(map[string]eventstore.Pusher)
clients map[string]*database.DB = make(map[string]*database.DB)
testClient *database.DB
queriers map[string]eventstore.Querier = make(map[string]eventstore.Querier)
pushers map[string]eventstore.Pusher = make(map[string]eventstore.Pusher)
clients map[string]*database.DB = make(map[string]*database.DB)
)
func TestMain(m *testing.M) {
opts := make([]testserver.TestServerOpt, 0, 1)
if version := os.Getenv("ZITADEL_CRDB_VERSION"); version != "" {
opts = append(opts, testserver.CustomVersionOpt(version))
}
ts, err := testserver.NewTestServer(opts...)
if err != nil {
logging.WithFields("error", err).Fatal("unable to start db")
}
os.Exit(func() int {
config, cleanup := postgres.StartEmbedded()
defer cleanup()
testCRDBClient = &database.DB{
Database: new(testDB),
}
connConfig, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil {
logging.WithFields("error", err).Fatal("unable to parse db url")
}
connConfig.AfterConnect = new_es.RegisterEventstoreTypes
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool")
}
testCRDBClient.DB = stdlib.OpenDBFromPool(pool)
if err = testCRDBClient.Ping(); err != nil {
logging.WithFields("error", err).Fatal("unable to ping db")
}
v2 := &es_sql.CRDB{DB: testCRDBClient}
queriers["v2(inmemory)"] = v2
clients["v2(inmemory)"] = testCRDBClient
pushers["v3(inmemory)"] = new_es.NewEventstore(testCRDBClient)
clients["v3(inmemory)"] = testCRDBClient
if localDB, err := connectLocalhost(); err == nil {
if err = initDB(context.Background(), localDB); err != nil {
logging.WithFields("error", err).Fatal("migrations failed")
testClient = &database.DB{
Database: new(testDB),
}
pushers["v3(singlenode)"] = new_es.NewEventstore(localDB)
clients["v3(singlenode)"] = localDB
}
// pushers["v2(inmemory)"] = v2
connConfig, err := pgxpool.ParseConfig(config.GetConnectionURL())
logging.OnError(err).Fatal("unable to parse db url")
defer func() {
testCRDBClient.Close()
ts.Stop()
}()
connConfig.AfterConnect = new_es.RegisterEventstoreTypes
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
logging.OnError(err).Fatal("unable to create db pool")
if err = initDB(context.Background(), testCRDBClient); err != nil {
logging.WithFields("error", err).Fatal("migrations failed")
}
testClient.DB = stdlib.OpenDBFromPool(pool)
err = testClient.Ping()
logging.OnError(err).Fatal("unable to ping db")
os.Exit(m.Run())
v2 := &es_sql.Postgres{DB: testClient}
queriers["v2(inmemory)"] = v2
clients["v2(inmemory)"] = testClient
pushers["v3(inmemory)"] = new_es.NewEventstore(testClient)
clients["v3(inmemory)"] = testClient
if localDB, err := connectLocalhost(); err == nil {
err = initDB(context.Background(), localDB)
logging.OnError(err).Fatal("migrations failed")
pushers["v3(singlenode)"] = new_es.NewEventstore(localDB)
clients["v3(singlenode)"] = localDB
}
defer func() {
logging.OnError(testClient.Close()).Error("unable to close db")
}()
err = initDB(context.Background(), &database.DB{DB: testClient.DB, Database: &postgres.Config{Database: "zitadel"}})
logging.OnError(err).Fatal("migrations failed")
return m.Run()
}())
}
func initDB(ctx context.Context, db *database.DB) error {
initialise.ReadStmts("cockroach")
config := new(database.Config)
config.SetConnector(&cockroach.Config{
User: cockroach.User{
Username: "zitadel",
},
Database: "zitadel",
})
config.SetConnector(&postgres.Config{User: postgres.User{Username: "zitadel"}, Database: "zitadel"})
if err := initialise.ReadStmts(); err != nil {
return err
}
err := initialise.Init(ctx, db,
initialise.VerifyUser(config.Username(), ""),
initialise.VerifyDatabase(config.DatabaseName()),
initialise.VerifyGrant(config.DatabaseName(), config.Username()),
initialise.VerifySettings(config.DatabaseName(), config.Username()))
initialise.VerifyGrant(config.DatabaseName(), config.Username()))
if err != nil {
return err
}
err = initialise.VerifyZitadel(ctx, db, *config)
if err != nil {
return err
}
// create old events
_, err = db.Exec(oldEventsTable)
return err
}
func connectLocalhost() (*database.DB, error) {
client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
client, err := sql.Open("pgx", "postgresql://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return nil, err
}
@@ -134,7 +123,7 @@ func (*testDB) DatabaseName() string { return "db" }
func (*testDB) Username() string { return "user" }
func (*testDB) Type() string { return "cockroach" }
func (*testDB) Type() dialect.DatabaseType { return dialect.DatabaseTypePostgres }
func generateCommand(aggregateType eventstore.AggregateType, aggregateID string, opts ...func(*testEvent)) eventstore.Command {
e := &testEvent{
@@ -177,7 +166,7 @@ func canceledCtx() context.Context {
}
func fillUniqueData(unique_type, field, instanceID string) error {
_, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field, instance_id) VALUES ($1, $2, $3)", unique_type, field, instanceID)
_, err := testClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field, instance_id) VALUES ($1, $2, $3)", unique_type, field, instanceID)
return err
}
@@ -251,5 +240,5 @@ const oldEventsTable = `CREATE TABLE IF NOT EXISTS eventstore.events (
, "position" DECIMAL NOT NULL
, in_tx_order INTEGER NOT NULL
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence DESC)
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence)
);`

View File

@@ -16,7 +16,6 @@ type SearchQuery struct {
Tx *sql.Tx
LockRows bool
LockOption eventstore.LockOption
AllowTimeTravel bool
AwaitOpenTransactions bool
Limit uint64
Offset uint32
@@ -51,11 +50,11 @@ const (
OperationGreater
// OperationLess compares if the given values is less than the stored one
OperationLess
//OperationIn checks if a stored value matches one of the passed value list
// OperationIn checks if a stored value matches one of the passed value list
OperationIn
//OperationJSONContains checks if a stored value matches the given json
// OperationJSONContains checks if a stored value matches the given json
OperationJSONContains
//OperationNotIn checks if a stored value does not match one of the passed value list
// OperationNotIn checks if a stored value does not match one of the passed value list
OperationNotIn
operationCount
@@ -65,25 +64,25 @@ const (
type Field int32
const (
//FieldAggregateType represents the aggregate type field
// FieldAggregateType represents the aggregate type field
FieldAggregateType Field = iota + 1
//FieldAggregateID represents the aggregate id field
// FieldAggregateID represents the aggregate id field
FieldAggregateID
//FieldSequence represents the sequence field
// FieldSequence represents the sequence field
FieldSequence
//FieldResourceOwner represents the resource owner field
// FieldResourceOwner represents the resource owner field
FieldResourceOwner
//FieldInstanceID represents the instance id field
// FieldInstanceID represents the instance id field
FieldInstanceID
//FieldEditorService represents the editor service field
// FieldEditorService represents the editor service field
FieldEditorService
//FieldEditorUser represents the editor user field
// FieldEditorUser represents the editor user field
FieldEditorUser
//FieldEventType represents the event type field
// FieldEventType represents the event type field
FieldEventType
//FieldEventData represents the event data field
// FieldEventData represents the event data field
FieldEventData
//FieldCreationDate represents the creation date field
// FieldCreationDate represents the creation date field
FieldCreationDate
// FieldPosition represents the field of the global sequence
FieldPosition
@@ -129,7 +128,6 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
Offset: builder.GetOffset(),
Desc: builder.GetDesc(),
Tx: builder.GetTx(),
AllowTimeTravel: builder.GetAllowTimeTravel(),
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
SubQueries: make([][]*Filter, len(builder.GetQueries())),
}

View File

@@ -1,455 +0,0 @@
package sql
import (
"context"
"database/sql"
"encoding/json"
"errors"
"regexp"
"strconv"
"strings"
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
const (
//as soon as stored procedures are possible in crdb
// we could move the code to migrations and call the procedure
// traking issue: https://github.com/cockroachdb/cockroach/issues/17511
//
//previous_data selects the needed data of the latest event of the aggregate
// and buffers it (crdb inmemory)
crdbInsert = "WITH previous_data (aggregate_type_sequence, aggregate_sequence, resource_owner) AS (" +
"SELECT agg_type.seq, agg.seq, agg.ro FROM " +
"(" +
//max sequence of requested aggregate type
" SELECT MAX(event_sequence) seq, 1 join_me" +
" FROM eventstore.events" +
" WHERE aggregate_type = $2" +
" AND (CASE WHEN $9::TEXT IS NULL THEN instance_id is null else instance_id = $9::TEXT END)" +
") AS agg_type " +
// combined with
"LEFT JOIN " +
"(" +
// max sequence and resource owner of aggregate root
" SELECT event_sequence seq, resource_owner ro, 1 join_me" +
" FROM eventstore.events" +
" WHERE aggregate_type = $2 AND aggregate_id = $3" +
" AND (CASE WHEN $9::TEXT IS NULL THEN instance_id is null else instance_id = $9::TEXT END)" +
" ORDER BY event_sequence DESC" +
" LIMIT 1" +
") AS agg USING(join_me)" +
") " +
"INSERT INTO eventstore.events (" +
" event_type," +
" aggregate_type," +
" aggregate_id," +
" aggregate_version," +
" creation_date," +
" position," +
" event_data," +
" editor_user," +
" editor_service," +
" resource_owner," +
" instance_id," +
" event_sequence," +
" previous_aggregate_sequence," +
" previous_aggregate_type_sequence," +
" in_tx_order" +
") " +
// defines the data to be inserted
"SELECT" +
" $1::VARCHAR AS event_type," +
" $2::VARCHAR AS aggregate_type," +
" $3::VARCHAR AS aggregate_id," +
" $4::VARCHAR AS aggregate_version," +
" hlc_to_timestamp(cluster_logical_timestamp()) AS creation_date," +
" cluster_logical_timestamp() AS position," +
" $5::JSONB AS event_data," +
" $6::VARCHAR AS editor_user," +
" $7::VARCHAR AS editor_service," +
" COALESCE((resource_owner), $8::VARCHAR) AS resource_owner," +
" $9::VARCHAR AS instance_id," +
" COALESCE(aggregate_sequence, 0)+1," +
" aggregate_sequence AS previous_aggregate_sequence," +
" aggregate_type_sequence AS previous_aggregate_type_sequence," +
" $10 AS in_tx_order " +
"FROM previous_data " +
"RETURNING id, event_sequence, creation_date, resource_owner, instance_id"
uniqueInsert = `INSERT INTO eventstore.unique_constraints
(
unique_type,
unique_field,
instance_id
)
VALUES (
$1,
$2,
$3
)`
uniqueDelete = `DELETE FROM eventstore.unique_constraints
WHERE unique_type = $1 and unique_field = $2 and instance_id = $3`
uniqueDeleteInstance = `DELETE FROM eventstore.unique_constraints
WHERE instance_id = $1`
)
// awaitOpenTransactions ensures event ordering, so we don't events younger that open transactions
var (
awaitOpenTransactionsV1 string
awaitOpenTransactionsV2 string
)
func awaitOpenTransactions(useV1 bool) string {
if useV1 {
return awaitOpenTransactionsV1
}
return awaitOpenTransactionsV2
}
type CRDB struct {
*database.DB
}
func NewCRDB(client *database.DB) *CRDB {
switch client.Type() {
case "cockroach":
awaitOpenTransactionsV1 = " AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))"
awaitOpenTransactionsV2 = ` AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))`
case "postgres":
awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
}
return &CRDB{client}
}
func (db *CRDB) Health(ctx context.Context) error { return db.Ping() }
// Push adds all events to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
func (db *CRDB) Push(ctx context.Context, commands ...eventstore.Command) (events []eventstore.Event, err error) {
events = make([]eventstore.Event, len(commands))
err = crdb.ExecuteTx(ctx, db.DB.DB, nil, func(tx *sql.Tx) error {
var uniqueConstraints []*eventstore.UniqueConstraint
for i, command := range commands {
if command.Aggregate().InstanceID == "" {
command.Aggregate().InstanceID = authz.GetInstance(ctx).InstanceID()
}
var payload []byte
if command.Payload() != nil {
payload, err = json.Marshal(command.Payload())
if err != nil {
return err
}
}
e := &repository.Event{
Typ: command.Type(),
Data: payload,
EditorUser: command.Creator(),
Version: command.Aggregate().Version,
AggregateID: command.Aggregate().ID,
AggregateType: command.Aggregate().Type,
ResourceOwner: sql.NullString{String: command.Aggregate().ResourceOwner, Valid: command.Aggregate().ResourceOwner != ""},
InstanceID: command.Aggregate().InstanceID,
}
err := tx.QueryRowContext(ctx, crdbInsert,
e.Type(),
e.Aggregate().Type,
e.Aggregate().ID,
e.Aggregate().Version,
payload,
e.Creator(),
"zitadel",
e.Aggregate().ResourceOwner,
e.Aggregate().InstanceID,
i,
).Scan(&e.ID, &e.Seq, &e.CreationDate, &e.ResourceOwner, &e.InstanceID)
if err != nil {
logging.WithFields(
"aggregate", e.Aggregate().Type,
"aggregateId", e.Aggregate().ID,
"aggregateType", e.Aggregate().Type,
"eventType", e.Type(),
"instanceID", e.Aggregate().InstanceID,
).WithError(err).Debug("query failed")
return zerrors.ThrowInternal(err, "SQL-SBP37", "unable to create event")
}
uniqueConstraints = append(uniqueConstraints, command.UniqueConstraints()...)
events[i] = e
}
return db.handleUniqueConstraints(ctx, tx, uniqueConstraints...)
})
if err != nil && !errors.Is(err, &zerrors.ZitadelError{}) {
err = zerrors.ThrowInternal(err, "SQL-DjgtG", "unable to store events")
}
return events, err
}
// handleUniqueConstraints adds or removes unique constraints
func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueConstraints ...*eventstore.UniqueConstraint) (err error) {
if len(uniqueConstraints) == 0 || (len(uniqueConstraints) == 1 && uniqueConstraints[0] == nil) {
return nil
}
for _, uniqueConstraint := range uniqueConstraints {
uniqueConstraint.UniqueField = strings.ToLower(uniqueConstraint.UniqueField)
switch uniqueConstraint.Action {
case eventstore.UniqueConstraintAdd:
_, err := tx.ExecContext(ctx, uniqueInsert, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField, authz.GetInstance(ctx).InstanceID())
if err != nil {
logging.WithFields(
"unique_type", uniqueConstraint.UniqueType,
"unique_field", uniqueConstraint.UniqueField).WithError(err).Info("insert unique constraint failed")
if db.isUniqueViolationError(err) {
return zerrors.ThrowAlreadyExists(err, "SQL-wHcEq", uniqueConstraint.ErrorMessage)
}
return zerrors.ThrowInternal(err, "SQL-dM9ds", "unable to create unique constraint")
}
case eventstore.UniqueConstraintRemove:
_, err := tx.ExecContext(ctx, uniqueDelete, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField, authz.GetInstance(ctx).InstanceID())
if err != nil {
logging.WithFields(
"unique_type", uniqueConstraint.UniqueType,
"unique_field", uniqueConstraint.UniqueField).WithError(err).Info("delete unique constraint failed")
return zerrors.ThrowInternal(err, "SQL-6n88i", "unable to remove unique constraint")
}
case eventstore.UniqueConstraintInstanceRemove:
_, err := tx.ExecContext(ctx, uniqueDeleteInstance, authz.GetInstance(ctx).InstanceID())
if err != nil {
logging.WithFields(
"instance_id", authz.GetInstance(ctx).InstanceID()).WithError(err).Info("delete instance unique constraints failed")
return zerrors.ThrowInternal(err, "SQL-6n88i", "unable to remove unique constraints of instance")
}
}
}
return nil
}
// FilterToReducer finds all events matching the given search query and passes them to the reduce function.
func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
err = query(ctx, crdb, searchQuery, reduce, false)
if err == nil {
return nil
}
pgErr := new(pgconn.PgError)
// check events2 not exists
if errors.As(err, &pgErr) && pgErr.Code == "42P01" {
return query(ctx, crdb, searchQuery, reduce, true)
}
return err
}
// LatestSequence returns the latest sequence found by the search query
func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) {
var position sql.NullFloat64
err := query(ctx, db, searchQuery, &position, false)
return position.Float64, err
}
// InstanceIDs returns the instance ids found by the search query
func (db *CRDB) InstanceIDs(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) ([]string, error) {
var ids []string
err := query(ctx, db, searchQuery, &ids, false)
if err != nil {
return nil, err
}
return ids, nil
}
func (db *CRDB) Client() *database.DB {
return db.DB
}
func (db *CRDB) orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string {
if useV1 {
if desc {
return ` ORDER BY event_sequence DESC`
}
return ` ORDER BY event_sequence`
}
if shouldOrderBySequence {
if desc {
return ` ORDER BY "sequence" DESC`
}
return ` ORDER BY "sequence"`
}
if desc {
return ` ORDER BY "position" DESC, in_tx_order DESC`
}
return ` ORDER BY "position", in_tx_order`
}
func (db *CRDB) eventQuery(useV1 bool) string {
if useV1 {
return "SELECT" +
" creation_date" +
", event_type" +
", event_sequence" +
", event_data" +
", editor_user" +
", resource_owner" +
", instance_id" +
", aggregate_type" +
", aggregate_id" +
", aggregate_version" +
" FROM eventstore.events"
}
return "SELECT" +
" created_at" +
", event_type" +
`, "sequence"` +
`, "position"` +
", payload" +
", creator" +
`, "owner"` +
", instance_id" +
", aggregate_type" +
", aggregate_id" +
", revision" +
" FROM eventstore.events2"
}
func (db *CRDB) maxSequenceQuery(useV1 bool) string {
if useV1 {
return `SELECT event_sequence FROM eventstore.events`
}
return `SELECT "position" FROM eventstore.events2`
}
func (db *CRDB) instanceIDsQuery(useV1 bool) string {
table := "eventstore.events2"
if useV1 {
table = "eventstore.events"
}
return "SELECT DISTINCT instance_id FROM " + table
}
func (db *CRDB) columnName(col repository.Field, useV1 bool) string {
switch col {
case repository.FieldAggregateID:
return "aggregate_id"
case repository.FieldAggregateType:
return "aggregate_type"
case repository.FieldSequence:
if useV1 {
return "event_sequence"
}
return `"sequence"`
case repository.FieldResourceOwner:
if useV1 {
return "resource_owner"
}
return `"owner"`
case repository.FieldInstanceID:
return "instance_id"
case repository.FieldEditorService:
if useV1 {
return "editor_service"
}
return ""
case repository.FieldEditorUser:
if useV1 {
return "editor_user"
}
return "creator"
case repository.FieldEventType:
return "event_type"
case repository.FieldEventData:
if useV1 {
return "event_data"
}
return "payload"
case repository.FieldCreationDate:
if useV1 {
return "creation_date"
}
return "created_at"
case repository.FieldPosition:
return `"position"`
default:
return ""
}
}
func (db *CRDB) conditionFormat(operation repository.Operation) string {
switch operation {
case repository.OperationIn:
return "%s %s ANY(?)"
case repository.OperationNotIn:
return "%s %s ALL(?)"
}
return "%s %s ?"
}
func (db *CRDB) operation(operation repository.Operation) string {
switch operation {
case repository.OperationEquals, repository.OperationIn:
return "="
case repository.OperationGreater:
return ">"
case repository.OperationLess:
return "<"
case repository.OperationJSONContains:
return "@>"
case repository.OperationNotIn:
return "<>"
}
return ""
}
var (
placeholder = regexp.MustCompile(`\?`)
)
// placeholder replaces all "?" with postgres placeholders ($<NUMBER>)
func (db *CRDB) placeholder(query string) string {
occurances := placeholder.FindAllStringIndex(query, -1)
if len(occurances) == 0 {
return query
}
replaced := query[:occurances[0][0]]
for i, l := range occurances {
nextIDX := len(query)
if i < len(occurances)-1 {
nextIDX = occurances[i+1][0]
}
replaced = replaced + "$" + strconv.Itoa(i+1) + query[l[1]:nextIDX]
}
return replaced
}
func (db *CRDB) isUniqueViolationError(err error) bool {
if pgxErr, ok := err.(*pgconn.PgError); ok {
if pgxErr.Code == "23505" {
return true
}
}
return false
}

View File

@@ -7,72 +7,61 @@ import (
"testing"
"time"
"github.com/cockroachdb/cockroach-go/v2/testserver"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/cmd/initialise"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/database/postgres"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
)
var (
testCRDBClient *sql.DB
testClient *sql.DB
)
func TestMain(m *testing.M) {
opts := make([]testserver.TestServerOpt, 0, 1)
if version := os.Getenv("ZITADEL_CRDB_VERSION"); version != "" {
opts = append(opts, testserver.CustomVersionOpt(version))
}
ts, err := testserver.NewTestServer(opts...)
if err != nil {
logging.WithFields("error", err).Fatal("unable to start db")
}
os.Exit(func() int {
config, cleanup := postgres.StartEmbedded()
defer cleanup()
connConfig, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil {
logging.WithFields("error", err).Fatal("unable to parse db url")
}
connConfig.AfterConnect = new_es.RegisterEventstoreTypes
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool")
}
connConfig, err := pgxpool.ParseConfig(config.GetConnectionURL())
logging.OnError(err).Fatal("unable to parse db url")
testCRDBClient = stdlib.OpenDBFromPool(pool)
connConfig.AfterConnect = new_es.RegisterEventstoreTypes
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
logging.OnError(err).Fatal("unable to create db pool")
if err = testCRDBClient.Ping(); err != nil {
logging.WithFields("error", err).Fatal("unable to ping db")
}
testClient = stdlib.OpenDBFromPool(pool)
defer func() {
testCRDBClient.Close()
ts.Stop()
}()
err = testClient.Ping()
logging.OnError(err).Fatal("unable to ping db")
if err = initDB(context.Background(), &database.DB{DB: testCRDBClient, Database: &cockroach.Config{Database: "zitadel"}}); err != nil {
logging.WithFields("error", err).Fatal("migrations failed")
}
defer func() {
logging.OnError(testClient.Close()).Error("unable to close db")
}()
os.Exit(m.Run())
err = initDB(context.Background(), &database.DB{DB: testClient, Database: &postgres.Config{Database: "zitadel"}})
logging.OnError(err).Fatal("migrations failed")
return m.Run()
}())
}
func initDB(ctx context.Context, db *database.DB) error {
config := new(database.Config)
config.SetConnector(&cockroach.Config{User: cockroach.User{Username: "zitadel"}, Database: "zitadel"})
config.SetConnector(&postgres.Config{User: postgres.User{Username: "zitadel"}, Database: "zitadel"})
if err := initialise.ReadStmts("cockroach"); err != nil {
if err := initialise.ReadStmts(); err != nil {
return err
}
err := initialise.Init(ctx, db,
initialise.VerifyUser(config.Username(), ""),
initialise.VerifyDatabase(config.DatabaseName()),
initialise.VerifyGrant(config.DatabaseName(), config.Username()),
initialise.VerifySettings(config.DatabaseName(), config.Username()))
initialise.VerifyGrant(config.DatabaseName(), config.Username()))
if err != nil {
return err
}
@@ -95,7 +84,7 @@ func (*testDB) DatabaseName() string { return "db" }
func (*testDB) Username() string { return "user" }
func (*testDB) Type() string { return "cockroach" }
func (*testDB) Type() dialect.DatabaseType { return dialect.DatabaseTypePostgres }
const oldEventsTable = `CREATE TABLE IF NOT EXISTS eventstore.events (
id UUID DEFAULT gen_random_uuid()
@@ -116,5 +105,5 @@ const oldEventsTable = `CREATE TABLE IF NOT EXISTS eventstore.events (
, "position" DECIMAL NOT NULL
, in_tx_order INTEGER NOT NULL
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence DESC)
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence)
);`

View File

@@ -0,0 +1,240 @@
package sql
import (
"context"
"database/sql"
"errors"
"regexp"
"strconv"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
// awaitOpenTransactions ensures event ordering, so we don't events younger that open transactions
var (
awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
)
func awaitOpenTransactions(useV1 bool) string {
if useV1 {
return awaitOpenTransactionsV1
}
return awaitOpenTransactionsV2
}
type Postgres struct {
*database.DB
}
func NewPostgres(client *database.DB) *Postgres {
return &Postgres{client}
}
func (db *Postgres) Health(ctx context.Context) error { return db.Ping() }
// FilterToReducer finds all events matching the given search query and passes them to the reduce function.
func (psql *Postgres) FilterToReducer(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
err = query(ctx, psql, searchQuery, reduce, false)
if err == nil {
return nil
}
pgErr := new(pgconn.PgError)
// check events2 not exists
if errors.As(err, &pgErr) && pgErr.Code == "42P01" {
return query(ctx, psql, searchQuery, reduce, true)
}
return err
}
// LatestSequence returns the latest sequence found by the search query
func (db *Postgres) LatestSequence(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (float64, error) {
var position sql.NullFloat64
err := query(ctx, db, searchQuery, &position, false)
return position.Float64, err
}
// InstanceIDs returns the instance ids found by the search query
func (db *Postgres) InstanceIDs(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) ([]string, error) {
var ids []string
err := query(ctx, db, searchQuery, &ids, false)
if err != nil {
return nil, err
}
return ids, nil
}
func (db *Postgres) Client() *database.DB {
return db.DB
}
func (db *Postgres) orderByEventSequence(desc, shouldOrderBySequence, useV1 bool) string {
if useV1 {
if desc {
return ` ORDER BY event_sequence DESC`
}
return ` ORDER BY event_sequence`
}
if shouldOrderBySequence {
if desc {
return ` ORDER BY "sequence" DESC`
}
return ` ORDER BY "sequence"`
}
if desc {
return ` ORDER BY "position" DESC, in_tx_order DESC`
}
return ` ORDER BY "position", in_tx_order`
}
func (db *Postgres) eventQuery(useV1 bool) string {
if useV1 {
return "SELECT" +
" creation_date" +
", event_type" +
", event_sequence" +
", event_data" +
", editor_user" +
", resource_owner" +
", instance_id" +
", aggregate_type" +
", aggregate_id" +
", aggregate_version" +
" FROM eventstore.events"
}
return "SELECT" +
" created_at" +
", event_type" +
`, "sequence"` +
`, "position"` +
", payload" +
", creator" +
`, "owner"` +
", instance_id" +
", aggregate_type" +
", aggregate_id" +
", revision" +
" FROM eventstore.events2"
}
func (db *Postgres) maxSequenceQuery(useV1 bool) string {
if useV1 {
return `SELECT event_sequence FROM eventstore.events`
}
return `SELECT "position" FROM eventstore.events2`
}
func (db *Postgres) instanceIDsQuery(useV1 bool) string {
table := "eventstore.events2"
if useV1 {
table = "eventstore.events"
}
return "SELECT DISTINCT instance_id FROM " + table
}
func (db *Postgres) columnName(col repository.Field, useV1 bool) string {
switch col {
case repository.FieldAggregateID:
return "aggregate_id"
case repository.FieldAggregateType:
return "aggregate_type"
case repository.FieldSequence:
if useV1 {
return "event_sequence"
}
return `"sequence"`
case repository.FieldResourceOwner:
if useV1 {
return "resource_owner"
}
return `"owner"`
case repository.FieldInstanceID:
return "instance_id"
case repository.FieldEditorService:
if useV1 {
return "editor_service"
}
return ""
case repository.FieldEditorUser:
if useV1 {
return "editor_user"
}
return "creator"
case repository.FieldEventType:
return "event_type"
case repository.FieldEventData:
if useV1 {
return "event_data"
}
return "payload"
case repository.FieldCreationDate:
if useV1 {
return "creation_date"
}
return "created_at"
case repository.FieldPosition:
return `"position"`
default:
return ""
}
}
func (db *Postgres) conditionFormat(operation repository.Operation) string {
switch operation {
case repository.OperationIn:
return "%s %s ANY(?)"
case repository.OperationNotIn:
return "%s %s ALL(?)"
case repository.OperationEquals, repository.OperationGreater, repository.OperationLess, repository.OperationJSONContains:
fallthrough
default:
return "%s %s ?"
}
}
func (db *Postgres) operation(operation repository.Operation) string {
switch operation {
case repository.OperationEquals, repository.OperationIn:
return "="
case repository.OperationGreater:
return ">"
case repository.OperationLess:
return "<"
case repository.OperationJSONContains:
return "@>"
case repository.OperationNotIn:
return "<>"
}
return ""
}
var (
placeholder = regexp.MustCompile(`\?`)
)
// placeholder replaces all "?" with postgres placeholders ($<NUMBER>)
func (db *Postgres) placeholder(query string) string {
occurrences := placeholder.FindAllStringIndex(query, -1)
if len(occurrences) == 0 {
return query
}
replaced := query[:occurrences[0][0]]
for i, l := range occurrences {
nextIDX := len(query)
if i < len(occurrences)-1 {
nextIDX = occurrences[i+1][0]
}
replaced = replaced + "$" + strconv.Itoa(i+1) + query[l[1]:nextIDX]
}
return replaced
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
func TestCRDB_placeholder(t *testing.T) {
func TestPostgres_placeholder(t *testing.T) {
type args struct {
query string
}
@@ -50,15 +50,15 @@ func TestCRDB_placeholder(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{}
db := &Postgres{}
if query := db.placeholder(tt.args.query); query != tt.res.query {
t.Errorf("CRDB.placeholder() = %v, want %v", query, tt.res.query)
t.Errorf("Postgres.placeholder() = %v, want %v", query, tt.res.query)
}
})
}
}
func TestCRDB_operation(t *testing.T) {
func TestPostgres_operation(t *testing.T) {
type res struct {
op string
}
@@ -118,15 +118,15 @@ func TestCRDB_operation(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{}
db := &Postgres{}
if got := db.operation(tt.args.operation); got != tt.res.op {
t.Errorf("CRDB.operation() = %v, want %v", got, tt.res.op)
t.Errorf("Postgres.operation() = %v, want %v", got, tt.res.op)
}
})
}
}
func TestCRDB_conditionFormat(t *testing.T) {
func TestPostgres_conditionFormat(t *testing.T) {
type res struct {
format string
}
@@ -159,15 +159,15 @@ func TestCRDB_conditionFormat(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{}
db := &Postgres{}
if got := db.conditionFormat(tt.args.operation); got != tt.res.format {
t.Errorf("CRDB.conditionFormat() = %v, want %v", got, tt.res.format)
t.Errorf("Postgres.conditionFormat() = %v, want %v", got, tt.res.format)
}
})
}
}
func TestCRDB_columnName(t *testing.T) {
func TestPostgres_columnName(t *testing.T) {
type res struct {
name string
}
@@ -295,9 +295,9 @@ func TestCRDB_columnName(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{}
db := &Postgres{}
if got := db.columnName(tt.args.field, tt.args.useV1); got != tt.res.name {
t.Errorf("CRDB.operation() = %v, want %v", got, tt.res.name)
t.Errorf("Postgres.operation() = %v, want %v", got, tt.res.name)
}
})
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -65,11 +64,6 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
if where == "" || query == "" {
return zerrors.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
}
if q.Tx == nil {
if travel := prepareTimeTravel(ctx, criteria, q.AllowTimeTravel); travel != "" {
query += travel
}
}
query += where
// instead of using the max function of the database (which doesn't work for postgres)
@@ -158,15 +152,7 @@ func prepareColumns(criteria querier, columns eventstore.Columns, useV1 bool) (s
}
}
func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string {
if !allow {
return ""
}
took := call.Took(ctx)
return criteria.Timetravel(took)
}
func maxSequenceScanner(row scan, dest interface{}) (err error) {
func maxSequenceScanner(row scan, dest any) (err error) {
position, ok := dest.(*sql.NullFloat64)
if !ok {
return zerrors.ThrowInvalidArgumentf(nil, "SQL-NBjA9", "type must be sql.NullInt64 got: %T", dest)

View File

@@ -14,10 +14,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach"
db_mock "github.com/zitadel/zitadel/internal/database/mock"
"github.com/zitadel/zitadel/internal/database/postgres"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -68,7 +69,7 @@ func Test_getCondition(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{}
db := &Postgres{}
if got := getCondition(db, tt.args.filter, false); got != tt.want {
t.Errorf("getCondition() = %v, want %v", got, tt.want)
}
@@ -236,8 +237,7 @@ func Test_prepareColumns(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
crdb := &CRDB{}
query, rowScanner := prepareColumns(crdb, tt.args.columns, tt.args.useV1)
query, rowScanner := prepareColumns(new(Postgres), tt.args.columns, tt.args.useV1)
if query != tt.res.query {
t.Errorf("prepareColumns() got = %s, want %s", query, tt.res.query)
}
@@ -267,7 +267,7 @@ func Test_prepareColumns(t *testing.T) {
got := reflect.Indirect(reflect.ValueOf(tt.args.dest)).Interface()
if !reflect.DeepEqual(got, tt.res.expected) {
t.Errorf("unexpected result from rowScanner \nwant: %+v \ngot: %+v", tt.res.expected, got)
t.Errorf("unexpected result from rowScanner nwant: %+v ngot: %+v", tt.res.expected, got)
}
})
}
@@ -403,7 +403,7 @@ func Test_prepareCondition(t *testing.T) {
useV1: true,
},
res: res{
clause: " WHERE aggregate_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))",
clause: " WHERE aggregate_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
},
},
@@ -420,7 +420,7 @@ func Test_prepareCondition(t *testing.T) {
},
},
res: res{
clause: ` WHERE aggregate_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))`,
clause: ` WHERE aggregate_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
},
},
@@ -440,7 +440,7 @@ func Test_prepareCondition(t *testing.T) {
useV1: true,
},
res: res{
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))",
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
},
},
@@ -459,15 +459,14 @@ func Test_prepareCondition(t *testing.T) {
},
},
res: res{
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))`,
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
},
},
}
crdb := NewCRDB(&database.DB{Database: new(cockroach.Config)})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotClause, gotValues := prepareConditions(crdb, tt.args.query, tt.args.useV1)
gotClause, gotValues := prepareConditions(NewPostgres(&database.DB{Database: new(postgres.Config)}), tt.args.query, tt.args.useV1)
if gotClause != tt.res.clause {
t.Errorf("prepareCondition() gotClause = %v, want %v", gotClause, tt.res.clause)
}
@@ -484,7 +483,7 @@ func Test_prepareCondition(t *testing.T) {
}
}
func Test_query_events_with_crdb(t *testing.T) {
func Test_query_events_with_postgres(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
@@ -511,7 +510,7 @@ func Test_query_events_with_crdb(t *testing.T) {
Builder(),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{
generateEvent(t, "300"),
generateEvent(t, "300"),
@@ -532,7 +531,7 @@ func Test_query_events_with_crdb(t *testing.T) {
Builder(),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{
generateEvent(t, "301"),
generateEvent(t, "302"),
@@ -555,7 +554,7 @@ func Test_query_events_with_crdb(t *testing.T) {
Builder(),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{
generateEvent(t, "303"),
generateEvent(t, "303"),
@@ -576,7 +575,7 @@ func Test_query_events_with_crdb(t *testing.T) {
ResourceOwner("caos"),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{
generateEvent(t, "306", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
generateEvent(t, "307", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
@@ -599,7 +598,7 @@ func Test_query_events_with_crdb(t *testing.T) {
Builder(),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{
generateEvent(t, "311", func(e *repository.Event) { e.Typ = "user.created" }),
generateEvent(t, "311", func(e *repository.Event) { e.Typ = "user.updated" }),
@@ -623,7 +622,7 @@ func Test_query_events_with_crdb(t *testing.T) {
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.Columns(-1)),
},
fields: fields{
client: testCRDBClient,
client: testClient,
existingEvents: []eventstore.Command{},
},
res: res{
@@ -634,117 +633,37 @@ func Test_query_events_with_crdb(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
DB: &database.DB{
DB: tt.fields.client,
Database: new(testDB),
},
dbClient := &database.DB{
DB: tt.fields.client,
Database: new(testDB),
}
client := &Postgres{
DB: dbClient,
}
pusher := new_es.NewEventstore(dbClient)
// setup initial data for query
if _, err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
if _, err := pusher.Push(context.Background(), dbClient.DB, tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}
events := []eventstore.Event{}
if err := query(context.Background(), db, tt.args.searchQuery, eventstore.Reducer(func(event eventstore.Event) error {
if err := query(context.Background(), client, tt.args.searchQuery, eventstore.Reducer(func(event eventstore.Event) error {
events = append(events, event)
return nil
}), true); (err != nil) != tt.wantErr {
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("eventstore.query() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
/* Cockroach test DB doesn't seem to lock
func Test_query_events_with_crdb_locking(t *testing.T) {
type args struct {
searchQuery *eventstore.SearchQueryBuilder
}
type fields struct {
existingEvents []eventstore.Command
client *sql.DB
}
tests := []struct {
name string
fields fields
args args
lockOption eventstore.LockOption
wantErr bool
}{
{
name: "skip locked",
fields: fields{
client: testCRDBClient,
existingEvents: []eventstore.Command{
generateEvent(t, "306", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
generateEvent(t, "307", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
generateEvent(t, "308", func(e *repository.Event) { e.ResourceOwner = sql.NullString{String: "caos", Valid: true} }),
},
},
args: args{
searchQuery: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
ResourceOwner("caos"),
},
lockOption: eventstore.LockOptionNoWait,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
DB: &database.DB{
DB: tt.fields.client,
Database: new(testDB),
},
}
// setup initial data for query
if _, err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
t.Errorf("error in setup = %v", err)
return
}
// first TX should lock and return all events
tx1, err := db.DB.Begin()
require.NoError(t, err)
defer func() {
require.NoError(t, tx1.Rollback())
}()
searchQuery1 := tt.args.searchQuery.LockRowsDuringTx(tx1, tt.lockOption)
gotEvents1 := []eventstore.Event{}
err = query(context.Background(), db, searchQuery1, eventstore.Reducer(func(event eventstore.Event) error {
gotEvents1 = append(gotEvents1, event)
return nil
}), true)
require.NoError(t, err)
assert.Len(t, gotEvents1, len(tt.fields.existingEvents))
// second TX should not return the events, and might return an error
tx2, err := db.DB.Begin()
require.NoError(t, err)
defer func() {
require.NoError(t, tx2.Rollback())
}()
searchQuery2 := tt.args.searchQuery.LockRowsDuringTx(tx1, tt.lockOption)
gotEvents2 := []eventstore.Event{}
err = query(context.Background(), db, searchQuery2, eventstore.Reducer(func(event eventstore.Event) error {
gotEvents2 = append(gotEvents2, event)
return nil
}), true)
if tt.wantErr {
require.Error(t, err)
}
require.NoError(t, err)
assert.Len(t, gotEvents2, 0)
})
}
}
*/
func Test_query_events_mocked(t *testing.T) {
type args struct {
query *eventstore.SearchQueryBuilder
dest interface{}
dest any
useV1 bool
}
type res struct {
@@ -772,8 +691,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
),
},
@@ -795,8 +714,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence LIMIT \$3`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence LIMIT $3`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
),
},
@@ -818,32 +737,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC LIMIT \$3`,
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "with limit and order by desc as of system time",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
AwaitOpenTransactions().
Limit(5).
AllowTimeTravel().
AddQuery().
AggregateTypes("user").
Builder(),
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC LIMIT \$3`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $3`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
),
},
@@ -864,8 +759,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
@@ -886,8 +781,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE NOWAIT`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE NOWAIT`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
@@ -908,8 +803,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 ORDER BY event_sequence DESC LIMIT \$2 FOR UPDATE SKIP LOCKED`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE SKIP LOCKED`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
@@ -931,8 +826,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQueryErr(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
mock: newMockClient(t).expectQueryErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
sql.ErrConnDone),
},
@@ -954,8 +849,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQueryScanErr(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
mock: newMockClient(t).expectQueryScanErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
&repository.Event{Seq: 100}),
},
@@ -989,8 +884,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \(aggregate_type = \$1 OR \(aggregate_type = \$2 AND aggregate_id = \$3\)\) AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$4\)\) ORDER BY event_sequence DESC LIMIT \$5`,
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE (aggregate_type = $1 OR (aggregate_type = $2 AND aggregate_id = $3)) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($4) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $5`),
[]driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", database.TextArray[string]{}, uint64(5)},
),
},
@@ -1018,10 +913,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
regexp.QuoteMeta(
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY event_sequence DESC LIMIT $9`,
),
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY event_sequence DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)},
),
},
@@ -1049,10 +942,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: false,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
regexp.QuoteMeta(
`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`,
),
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND "position" > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND "position" > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), 123.456, eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", 123.456, uint64(5)},
),
},
@@ -1080,10 +971,8 @@ func Test_query_events_mocked(t *testing.T) {
useV1: false,
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
regexp.QuoteMeta(
`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`,
),
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 WHERE instance_id = $1 AND aggregate_type = $2 AND event_type = $3 AND created_at > $4 AND aggregate_id NOT IN (SELECT aggregate_id FROM eventstore.events2 WHERE aggregate_type = $5 AND event_type = ANY($6) AND instance_id = $7 AND created_at > $8) ORDER BY "position" DESC, in_tx_order DESC LIMIT $9`),
[]driver.Value{"instanceID", eventstore.AggregateType("notify"), eventstore.EventType("notify.foo.bar"), time.Unix(123, 456), eventstore.AggregateType("notify"), []eventstore.EventType{"notification.failed", "notification.success"}, "instanceID", time.Unix(123, 456), uint64(5)},
),
},
@@ -1092,14 +981,14 @@ func Test_query_events_mocked(t *testing.T) {
},
},
}
crdb := NewCRDB(&database.DB{Database: new(testDB)})
client := NewPostgres(&database.DB{Database: new(testDB)})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.fields.mock != nil {
crdb.DB.DB = tt.fields.mock.client
client.DB.DB = tt.fields.mock.client
}
err := query(context.Background(), crdb, tt.args.query, tt.args.dest, tt.args.useV1)
err := query(context.Background(), client, tt.args.query, tt.args.dest, tt.args.useV1)
if (err != nil) != tt.res.wantErr {
t.Errorf("query() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -1120,7 +1009,7 @@ type dbMock struct {
client *sql.DB
}
func (m *dbMock) expectQuery(t *testing.T, expectedQuery string, args []driver.Value, events ...*repository.Event) *dbMock {
func (m *dbMock) expectQuery(expectedQuery string, args []driver.Value, events ...*repository.Event) *dbMock {
query := m.mock.ExpectQuery(expectedQuery).WithArgs(args...)
rows := m.mock.NewRows([]string{"sequence"})
for _, event := range events {
@@ -1130,7 +1019,7 @@ func (m *dbMock) expectQuery(t *testing.T, expectedQuery string, args []driver.V
return m
}
func (m *dbMock) expectQueryScanErr(t *testing.T, expectedQuery string, args []driver.Value, events ...*repository.Event) *dbMock {
func (m *dbMock) expectQueryScanErr(expectedQuery string, args []driver.Value, events ...*repository.Event) *dbMock {
query := m.mock.ExpectQuery(expectedQuery).WithArgs(args...)
rows := m.mock.NewRows([]string{"sequence"})
for _, event := range events {
@@ -1140,7 +1029,7 @@ func (m *dbMock) expectQueryScanErr(t *testing.T, expectedQuery string, args []d
return m
}
func (m *dbMock) expectQueryErr(t *testing.T, expectedQuery string, args []driver.Value, err error) *dbMock {
func (m *dbMock) expectQueryErr(expectedQuery string, args []driver.Value, err error) *dbMock {
m.mock.ExpectQuery(expectedQuery).WithArgs(args...).WillReturnError(err)
return m
}

View File

@@ -25,7 +25,6 @@ type SearchQueryBuilder struct {
tx *sql.Tx
lockRows bool
lockOption LockOption
allowTimeTravel bool
positionAfter float64
awaitOpenTransactions bool
creationDateAfter time.Time
@@ -77,10 +76,6 @@ func (b *SearchQueryBuilder) GetTx() *sql.Tx {
return b.tx
}
func (b *SearchQueryBuilder) GetAllowTimeTravel() bool {
return b.allowTimeTravel
}
func (b SearchQueryBuilder) GetPositionAfter() float64 {
return b.positionAfter
}
@@ -289,13 +284,6 @@ func (builder *SearchQueryBuilder) EditorUser(id string) *SearchQueryBuilder {
return builder
}
// AllowTimeTravel activates the time travel feature of the database if supported
// The queries will be made based on the call time
func (builder *SearchQueryBuilder) AllowTimeTravel() *SearchQueryBuilder {
builder.allowTimeTravel = true
return builder
}
// PositionAfter filters for events which happened after the specified time
func (builder *SearchQueryBuilder) PositionAfter(position float64) *SearchQueryBuilder {
builder.positionAfter = position

View File

@@ -45,16 +45,6 @@ func testSetLimit(limit uint64) func(builder *SearchQueryBuilder) *SearchQueryBu
}
}
func testOr(queryFuncs ...func(*SearchQuery) *SearchQuery) func(*SearchQuery) *SearchQuery {
return func(query *SearchQuery) *SearchQuery {
subQuery := query.Or()
for _, queryFunc := range queryFuncs {
queryFunc(subQuery)
}
return subQuery
}
}
func testSetAggregateTypes(types ...AggregateType) func(*SearchQuery) *SearchQuery {
return func(query *SearchQuery) *SearchQuery {
query = query.AggregateTypes(types...)

View File

@@ -24,9 +24,9 @@ func init() {
var (
// pushPlaceholderFmt defines how data are inserted into the events table
pushPlaceholderFmt string
pushPlaceholderFmt = "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $%d)"
// uniqueConstraintPlaceholderFmt defines the format of the unique constraint error returned from the database
uniqueConstraintPlaceholderFmt string
uniqueConstraintPlaceholderFmt = "(%s, %s, %s)"
_ eventstore.Pusher = (*Eventstore)(nil)
)
@@ -158,15 +158,6 @@ func (es *Eventstore) Client() *database.DB {
}
func NewEventstore(client *database.DB) *Eventstore {
switch client.Type() {
case "cockroach":
pushPlaceholderFmt = "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $%d)"
uniqueConstraintPlaceholderFmt = "('%s', '%s', '%s')"
case "postgres":
pushPlaceholderFmt = "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $%d)"
uniqueConstraintPlaceholderFmt = "(%s, %s, %s)"
}
return &Eventstore{client: client}
}
@@ -200,14 +191,8 @@ func (es *Eventstore) pushTx(ctx context.Context, client database.ContextQueryEx
beginner = es.client
}
isolationLevel := sql.LevelReadCommitted
// cockroach requires serializable to execute the push function
// because we use [cluster_logical_timestamp()](https://www.cockroachlabs.com/docs/stable/functions-and-operators#system-info-functions)
if es.client.Type() == "cockroach" {
isolationLevel = sql.LevelSerializable
}
tx, err = beginner.BeginTx(ctx, &sql.TxOptions{
Isolation: isolationLevel,
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
})
if err != nil {

View File

@@ -156,10 +156,11 @@ func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Tx, c
func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventstore.FillFieldsEvent) error {
for _, event := range events {
if len(event.Fields()) > 0 {
if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil {
return err
}
if len(event.Fields()) == 0 {
continue
}
if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil {
return err
}
}
return nil

View File

@@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach"
"github.com/zitadel/zitadel/internal/database/postgres"
"github.com/zitadel/zitadel/internal/eventstore"
)
@@ -65,7 +65,7 @@ func Test_mapCommands(t *testing.T) {
),
},
placeHolders: []string{
"($1, $2, $3, $4, $5, $6, $7, $8, $9, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $10)",
"($1, $2, $3, $4, $5, $6, $7, $8, $9, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $10)",
},
args: []any{
"instance",
@@ -114,8 +114,8 @@ func Test_mapCommands(t *testing.T) {
),
},
placeHolders: []string{
"($1, $2, $3, $4, $5, $6, $7, $8, $9, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $10)",
"($11, $12, $13, $14, $15, $16, $17, $18, $19, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $20)",
"($1, $2, $3, $4, $5, $6, $7, $8, $9, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $10)",
"($11, $12, $13, $14, $15, $16, $17, $18, $19, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $20)",
},
args: []any{
// first event
@@ -180,8 +180,8 @@ func Test_mapCommands(t *testing.T) {
),
},
placeHolders: []string{
"($1, $2, $3, $4, $5, $6, $7, $8, $9, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $10)",
"($11, $12, $13, $14, $15, $16, $17, $18, $19, hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp(), $20)",
"($1, $2, $3, $4, $5, $6, $7, $8, $9, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $10)",
"($11, $12, $13, $14, $15, $16, $17, $18, $19, statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp()), $20)",
},
args: []any{
// first event
@@ -236,7 +236,7 @@ func Test_mapCommands(t *testing.T) {
}
}
// is used to set the the [pushPlaceholderFmt]
NewEventstore(&database.DB{Database: new(cockroach.Config)})
NewEventstore(&database.DB{Database: new(postgres.Config)})
t.Run(tt.name, func(t *testing.T) {
defer func() {
cause := recover()

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"strings"
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
@@ -16,25 +15,6 @@ import (
"github.com/zitadel/zitadel/internal/zerrors"
)
type transaction struct {
database.Tx
}
var _ crdb.Tx = (*transaction)(nil)
func (t *transaction) Exec(ctx context.Context, query string, args ...interface{}) error {
_, err := t.Tx.ExecContext(ctx, query, args...)
return err
}
func (t *transaction) Commit(ctx context.Context) error {
return t.Tx.Commit()
}
func (t *transaction) Rollback(ctx context.Context) error {
return t.Tx.Rollback()
}
// checks whether the error is caused because setup step 39 was not executed
func isSetupNotExecutedError(err error) bool {
if err == nil {
@@ -64,7 +44,6 @@ func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.Conte
err = closeTx(err)
}()
// tx is not closed because [crdb.ExecuteInTx] takes care of that
var (
sequences []*latestSequence
)