fix(projections): overhaul the event projection system (#10560)

This PR overhauls our event projection system to make it more robust and
prevent skipped events under high load. The core change replaces our
custom, transaction-based locking with standard PostgreSQL advisory
locks. We also introduce a worker pool to manage concurrency and prevent
database connection exhaustion.

### Key Changes

* **Advisory Locks for Projections:** Replaces exclusive row locks and
inspection of `pg_stat_activity` with PostgreSQL advisory locks for
managing projection state. This is a more reliable and standard approach
to distributed locking.
* **Simplified Await Logic:** Removes the complex logic for awaiting
open transactions, simplifying it to a more straightforward time-based
filtering of events.
* **Projection Worker Pool:** Implements a worker pool to limit
concurrent projection triggers, preventing connection exhaustion and
improving stability under load. A new `MaxParallelTriggers`
configuration option is introduced.

### Problem Solved

Under high throughput, a race condition could cause projections to miss
events from the eventstore. This led to inconsistent data in projection
tables (e.g., a user grant might be missing). This PR fixes the
underlying locking and concurrency issues to ensure all events are
processed reliably.

### How it Works

1. **Event Writing:** When writing events, a *shared* advisory lock is
taken. This signals that a write is in progress.
2.  **Event Handling (Projections):**
* A projection worker attempts to acquire an *exclusive* advisory lock
for that specific projection. If the lock is already held, it means
another worker is on the job, so the current one backs off.
* Once the lock is acquired, the worker briefly acquires and releases
the same *shared* lock used by event writers. This acts as a barrier,
ensuring it waits for any in-flight writes to complete.
* Finally, it processes all events that occurred before its transaction
began.

### Additional Information

* ZITADEL no longer modifies the `application_name` PostgreSQL variable
during event writes.
*   The lock on the `current_states` table is now `FOR NO KEY UPDATE`.
*   Fixes https://github.com/zitadel/zitadel/issues/8509

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
This commit is contained in:
Silvan
2025-09-03 17:29:00 +02:00
committed by GitHub
parent bdefd9147f
commit 0575f67e94
25 changed files with 286 additions and 410 deletions

View File

@@ -138,7 +138,7 @@ core_integration_server_start: core_integration_setup
.PHONY: core_integration_test_packages
core_integration_test_packages:
go test -race -count 1 -tags integration -timeout 30m $$(go list -tags integration ./... | grep "integration_test")
go test -race -count 1 -tags integration -timeout 60m -parallel 1 $$(go list -tags integration ./... | grep "integration_test")
.PHONY: core_integration_server_stop
core_integration_server_stop:

View File

@@ -387,6 +387,12 @@ Projections:
# Maximum amount of instances cached as active
# If set to 0, every instance is always considered active
MaxActiveInstances: 0 # ZITADEL_PROJECTIONS_MAXACTIVEINSTANCES
# Limits the amount of concurrently running projection triggers
# If set to 0, 1/3 of database.MaxOpenConns is used
# The number must be lower than the Database.MaxOpenConns
# A good starting point is to set this to Database.MaxOpenConns / 3
# so that there are enough connections free for other operations (e.g. writing events, reading projections, notifications, ...)
MaxParallelTriggers: 0 # ZITADEL_PROJECTIONS_MAXPARALLELTRIGGERS
# In the Customizations section, all settings from above can be overwritten for each specific projection
Customizations:
custom_texts:

View File

@@ -10,7 +10,9 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
type FieldHandler struct {
@@ -69,7 +71,14 @@ func (h *FieldHandler) Trigger(ctx context.Context, opts ...TriggerOpt) (err err
defer cancel()
for i := 0; ; i++ {
additionalIteration, err := h.processEvents(ctx, config)
var additionalIteration bool
var wg sync.WaitGroup
wg.Add(1)
queue <- func() {
additionalIteration, err = h.processEvents(ctx, config)
wg.Done()
}
wg.Wait()
h.log().OnError(err).Info("process events failed")
h.log().WithField("iteration", i).Debug("trigger iteration")
if !additionalIteration || err != nil {
@@ -101,7 +110,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
defer cancel()
}
tx, err := h.client.BeginTx(txCtx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
tx, err := h.client.BeginTx(txCtx, nil)
if err != nil {
return false, err
}
@@ -117,13 +126,19 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig)
}
}()
var hasLocked bool
err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1), hashtext($2))", h.ProjectionName(), authz.GetInstance(ctx).InstanceID()).Scan(&hasLocked)
if err != nil {
return false, err
}
if !hasLocked {
return false, zerrors.ThrowInternal(nil, "V2-xRffO", "projection already locked")
}
// always await currently running transactions
config.awaitRunning = true
currentState, err := h.currentState(ctx, tx, config)
currentState, err := h.currentState(ctx, tx)
if err != nil {
if errors.Is(err, errJustUpdated) {
return false, nil
}
return additionalIteration, err
}
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt

View File

@@ -20,6 +20,7 @@ import (
"github.com/zitadel/zitadel/internal/migration"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/zitadel/internal/zerrors"
)
type EventStore interface {
@@ -405,6 +406,9 @@ type triggerConfig struct {
type TriggerOpt func(conf *triggerConfig)
// WithAwaitRunning instructs the projection to wait until previous triggers within the same container are finished
// If multiple containers are involved, we do not await them to finish. If another container is currently projecting the trigger is skipped.
// The reason is that we do not want to cause potential database connection exhaustion.
func WithAwaitRunning() TriggerOpt {
return func(conf *triggerConfig) {
conf.awaitRunning = true
@@ -423,6 +427,31 @@ func WithMinPosition(position decimal.Decimal) TriggerOpt {
}
}
var (
queue chan func()
queueStart sync.Once
)
func StartWorkerPool(count uint16) {
queueStart.Do(func() {
queue = make(chan func())
for range count {
go worker()
}
})
}
func worker() {
for {
processEvents, ok := <-queue
if !ok {
return
}
processEvents()
}
}
func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) {
if slices.Contains(h.skipInstanceIDs, authz.GetInstance(ctx).InstanceID()) {
return call.ResetTimestamp(ctx), nil
@@ -439,7 +468,16 @@ func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Co
defer cancel()
for i := 0; ; i++ {
additionalIteration, err := h.processEvents(ctx, config)
var (
additionalIteration bool
wg sync.WaitGroup
)
wg.Add(1)
queue <- func() {
additionalIteration, err = h.processEvents(ctx, config)
wg.Done()
}
wg.Wait()
h.log().OnError(err).Info("process events failed")
h.log().WithField("iteration", i).Debug("trigger iteration")
if !additionalIteration || err != nil {
@@ -526,11 +564,17 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
}
}()
currentState, err := h.currentState(ctx, tx, config)
var hasLocked bool
err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1), hashtext($2))", h.ProjectionName(), authz.GetInstance(ctx).InstanceID()).Scan(&hasLocked)
if err != nil {
return false, err
}
if !hasLocked {
return false, zerrors.ThrowInternal(nil, "V2-lpiK0", "projection already locked")
}
currentState, err := h.currentState(ctx, tx)
if err != nil {
if errors.Is(err, errJustUpdated) {
return false, nil
}
return additionalIteration, err
}
// stop execution if currentState.position >= config.maxPosition

View File

@@ -27,17 +27,11 @@ type state struct {
var (
//go:embed state_get.sql
currentStateStmt string
//go:embed state_get_await.sql
currentStateAwaitStmt string
//go:embed state_set.sql
updateStateStmt string
//go:embed state_lock.sql
lockStateStmt string
errJustUpdated = errors.New("projection was just updated")
)
func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerConfig) (currentState *state, err error) {
func (h *Handler) currentState(ctx context.Context, tx *sql.Tx) (currentState *state, err error) {
currentState = &state{
instanceID: authz.GetInstance(ctx).InstanceID(),
}
@@ -51,12 +45,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
offset = new(sql.NullInt64)
)
stateQuery := currentStateStmt
if config.awaitRunning {
stateQuery = currentStateAwaitStmt
}
row := tx.QueryRow(stateQuery, currentState.instanceID, h.projection.Name())
row := tx.QueryRow(currentStateStmt, currentState.instanceID, h.projection.Name())
err = row.Scan(
aggregateID,
aggregateType,
@@ -65,10 +54,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
position,
offset,
)
if errors.Is(err, sql.ErrNoRows) {
err = h.lockState(tx, currentState.instanceID)
}
if err != nil {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
h.log().WithError(err).Debug("unable to query current state")
return nil, err
}
@@ -104,17 +90,3 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error {
}
return nil
}
func (h *Handler) lockState(tx *sql.Tx, instanceID string) error {
res, err := tx.Exec(lockStateStmt,
h.projection.Name(),
instanceID,
)
if err != nil {
return err
}
if affected, err := res.RowsAffected(); affected == 0 || err != nil {
return zerrors.ThrowInternal(err, "V2-lpiK0", "projection already locked")
}
return nil
}

View File

@@ -10,4 +10,4 @@ FROM
WHERE
instance_id = $1
AND projection_name = $2
FOR UPDATE NOWAIT;
FOR NO KEY UPDATE;

View File

@@ -1,13 +0,0 @@
SELECT
aggregate_id
, aggregate_type
, "sequence"
, event_date
, "position"
, filter_offset
FROM
projections.current_states
WHERE
instance_id = $1
AND projection_name = $2
FOR UPDATE;

View File

@@ -1,9 +0,0 @@
INSERT INTO projections.current_states (
projection_name
, instance_id
, last_updated
) VALUES (
$1
, $2
, now()
) ON CONFLICT DO NOTHING;

View File

@@ -19,123 +19,6 @@ import (
"github.com/zitadel/zitadel/internal/zerrors"
)
func TestHandler_lockState(t *testing.T) {
type fields struct {
projection Projection
mock *mock.SQLMock
}
type args struct {
instanceID string
}
tests := []struct {
name string
fields fields
args args
isErr func(t *testing.T, err error)
}{
{
name: "tx closed",
fields: fields{
projection: &projection{
name: "projection",
},
mock: mock.NewSQLMock(t,
mock.ExpectBegin(nil),
mock.ExcpectExec(
lockStateStmt,
mock.WithExecArgs(
"projection",
"instance",
),
mock.WithExecErr(sql.ErrTxDone),
),
),
},
args: args{
instanceID: "instance",
},
isErr: func(t *testing.T, err error) {
if !errors.Is(err, sql.ErrTxDone) {
t.Errorf("unexpected error, want: %v got: %v", sql.ErrTxDone, err)
}
},
},
{
name: "no rows affeced",
fields: fields{
projection: &projection{
name: "projection",
},
mock: mock.NewSQLMock(t,
mock.ExpectBegin(nil),
mock.ExcpectExec(
lockStateStmt,
mock.WithExecArgs(
"projection",
"instance",
),
mock.WithExecNoRowsAffected(),
),
),
},
args: args{
instanceID: "instance",
},
isErr: func(t *testing.T, err error) {
if !errors.Is(err, zerrors.ThrowInternal(nil, "V2-lpiK0", "")) {
t.Errorf("unexpected error: want internal (V2lpiK0), got: %v", err)
}
},
},
{
name: "rows affected",
fields: fields{
projection: &projection{
name: "projection",
},
mock: mock.NewSQLMock(t,
mock.ExpectBegin(nil),
mock.ExcpectExec(
lockStateStmt,
mock.WithExecArgs(
"projection",
"instance",
),
mock.WithExecRowsAffected(1),
),
),
},
args: args{
instanceID: "instance",
},
},
}
for _, tt := range tests {
if tt.isErr == nil {
tt.isErr = func(t *testing.T, err error) {
if err != nil {
t.Error("expected no error got:", err)
}
}
}
t.Run(tt.name, func(t *testing.T) {
h := &Handler{
projection: tt.fields.projection,
}
tx, err := tt.fields.mock.DB.BeginTx(context.Background(), nil)
if err != nil {
t.Fatalf("unable to begin transaction: %v", err)
}
err = h.lockState(tx, tt.args.instanceID)
tt.isErr(t, err)
tt.fields.mock.Assert(t)
})
}
}
func TestHandler_updateLastUpdated(t *testing.T) {
type fields struct {
projection Projection
@@ -309,41 +192,6 @@ func TestHandler_currentState(t *testing.T) {
},
},
},
{
name: "no row but lock err",
fields: fields{
projection: &projection{
name: "projection",
},
mock: mock.NewSQLMock(t,
mock.ExpectBegin(nil),
mock.ExpectQuery(currentStateStmt,
mock.WithQueryArgs(
"instance",
"projection",
),
mock.WithQueryErr(sql.ErrNoRows),
),
mock.ExcpectExec(lockStateStmt,
mock.WithExecArgs(
"projection",
"instance",
),
mock.WithExecErr(sql.ErrTxDone),
),
),
},
args: args{
ctx: authz.WithInstanceID(context.Background(), "instance"),
},
want: want{
isErr: func(t *testing.T, err error) {
if !errors.Is(err, sql.ErrTxDone) {
t.Errorf("unexpected error, want: %v, got: %v", sql.ErrTxDone, err)
}
},
},
},
{
name: "state locked",
fields: fields{
@@ -440,7 +288,7 @@ func TestHandler_currentState(t *testing.T) {
t.Fatalf("unable to begin transaction: %v", err)
}
gotCurrentState, err := h.currentState(tt.args.ctx, tx, new(triggerConfig))
gotCurrentState, err := h.currentState(tt.args.ctx, tx)
tt.want.isErr(t, err)
if !reflect.DeepEqual(gotCurrentState, tt.want.currentState) {

View File

@@ -16,8 +16,6 @@ type SearchQuery struct {
SubQueries [][]*Filter
Tx *sql.Tx
LockRows bool
LockOption eventstore.LockOption
AwaitOpenTransactions bool
Limit uint64
Offset uint32
@@ -135,7 +133,6 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
SubQueries: make([][]*Filter, len(builder.GetQueries())),
}
query.LockRows, query.LockOption = builder.GetLockRows()
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
instanceIDFilter,

View File

@@ -17,8 +17,8 @@ import (
// awaitOpenTransactions ensures event ordering, so we don't events younger that open transactions
var (
awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`
awaitOpenTransactionsV1 = ` AND created_at <= now()`
awaitOpenTransactionsV2 = ` AND "position" <= EXTRACT(EPOCH FROM now())`
)
func awaitOpenTransactions(useV1 bool) string {

View File

@@ -12,6 +12,7 @@ import (
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/eventstore"
@@ -65,6 +66,32 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
if where == "" || query == "" {
return zerrors.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
}
var contextQuerier interface {
QueryContext(context.Context, func(rows *sql.Rows) error, string, ...interface{}) error
ExecContext(context.Context, string, ...any) (sql.Result, error)
}
contextQuerier = criteria.Client()
if q.Tx != nil {
contextQuerier = &tx{Tx: q.Tx}
}
if q.AwaitOpenTransactions && q.Columns == eventstore.ColumnsEvent {
instanceID := authz.GetInstance(ctx).InstanceID()
if q.InstanceID != nil {
instanceID = q.InstanceID.Value.(string)
}
_, err = contextQuerier.ExecContext(ctx,
"select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))",
instanceID,
)
if err != nil {
return err
}
where += awaitOpenTransactions(useV1)
}
query += where
// instead of using the max function of the database (which doesn't work for postgres)
@@ -100,28 +127,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
query += " OFFSET ?"
}
if q.LockRows {
query += " FOR UPDATE"
switch q.LockOption {
case eventstore.LockOptionWait: // default behavior
case eventstore.LockOptionNoWait:
query += " NOWAIT"
case eventstore.LockOptionSkipLocked:
query += " SKIP LOCKED"
}
}
query = criteria.placeholder(query)
var contextQuerier interface {
QueryContext(context.Context, func(rows *sql.Rows) error, string, ...interface{}) error
}
contextQuerier = criteria.Client()
if q.Tx != nil {
contextQuerier = &tx{Tx: q.Tx}
}
err = contextQuerier.QueryContext(ctx,
func(rows *sql.Rows) error {
for rows.Next() {
@@ -289,22 +296,6 @@ func prepareConditions(criteria querier, query *repository.SearchQuery, useV1 bo
args = append(args, excludeAggregateIDsArgs...)
}
if query.AwaitOpenTransactions {
instanceIDs := make(database.TextArray[string], 0, 3)
if query.InstanceID != nil {
instanceIDs = append(instanceIDs, query.InstanceID.Value.(string))
} else if query.InstanceIDs != nil {
instanceIDs = append(instanceIDs, query.InstanceIDs.Value.(database.TextArray[string])...)
}
for i := range instanceIDs {
instanceIDs[i] = "zitadel_es_pusher_" + instanceIDs[i]
}
clauses += awaitOpenTransactions(useV1)
args = append(args, instanceIDs)
}
if clauses == "" {
return "", nil
}

View File

@@ -405,8 +405,8 @@ func Test_prepareCondition(t *testing.T) {
useV1: true,
},
res: res{
clause: " WHERE aggregate_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
clause: " WHERE aggregate_type = ANY(?)",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}},
},
},
{
@@ -422,8 +422,8 @@ func Test_prepareCondition(t *testing.T) {
},
},
res: res{
clause: ` WHERE aggregate_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
clause: ` WHERE aggregate_type = ANY(?)`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}},
},
},
{
@@ -442,8 +442,8 @@ func Test_prepareCondition(t *testing.T) {
useV1: true,
},
res: res{
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?)",
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}},
},
},
{
@@ -461,8 +461,8 @@ func Test_prepareCondition(t *testing.T) {
},
},
res: res{
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?)`,
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}},
},
},
}
@@ -693,10 +693,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user")},
),
},
res: res{
wantErr: false,
@@ -716,10 +720,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence LIMIT $3`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence LIMIT $2`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
@@ -739,76 +747,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $3`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "lock, wait",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionWait),
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "lock, no wait",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionNoWait),
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE NOWAIT`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "lock, skip locked",
args: args{
dest: &[]*repository.Event{},
query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderDesc().
Limit(5).
AddQuery().
AggregateTypes("user").
Builder().LockRowsDuringTx(nil, eventstore.LockOptionSkipLocked),
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE SKIP LOCKED`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC LIMIT $2`),
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
@@ -828,10 +774,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQueryErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
sql.ErrConnDone),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQueryErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user")},
sql.ErrConnDone),
},
res: res{
wantErr: true,
@@ -851,10 +801,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQueryScanErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
&repository.Event{Seq: 100}),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQueryScanErr(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`),
[]driver.Value{eventstore.AggregateType("user")},
&repository.Event{Seq: 100}),
},
res: res{
wantErr: true,
@@ -886,10 +840,14 @@ func Test_query_events_mocked(t *testing.T) {
useV1: true,
},
fields: fields{
mock: newMockClient(t).expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE (aggregate_type = $1 OR (aggregate_type = $2 AND aggregate_id = $3)) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($4) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $5`),
[]driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", database.TextArray[string]{}, uint64(5)},
),
mock: newMockClient(t).
expectExec(regexp.QuoteMeta(
`select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`),
[]driver.Value{""}).
expectQuery(
regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE (aggregate_type = $1 OR (aggregate_type = $2 AND aggregate_id = $3)) AND created_at <= now() ORDER BY event_sequence DESC LIMIT $4`),
[]driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", uint64(5)},
),
},
res: res{
wantErr: false,
@@ -1040,6 +998,11 @@ func (m *dbMock) expectQueryErr(expectedQuery string, args []driver.Value, err e
return m
}
func (m *dbMock) expectExec(expectedQuery string, args []driver.Value) *dbMock {
m.mock.ExpectExec(expectedQuery).WithArgs(args...).WillReturnResult(sqlmock.NewResult(1, 1))
return m
}
func newMockClient(t *testing.T) *dbMock {
t.Helper()
db, mock, err := sqlmock.New(sqlmock.ValueConverterOption(new(db_mock.TypeConverter)))

View File

@@ -25,8 +25,6 @@ type SearchQueryBuilder struct {
queries []*SearchQuery
excludeAggregateIDs *ExclusionQuery
tx *sql.Tx
lockRows bool
lockOption LockOption
positionAtLeast decimal.Decimal
awaitOpenTransactions bool
creationDateAfter time.Time
@@ -98,10 +96,6 @@ func (q SearchQueryBuilder) GetCreationDateBefore() time.Time {
return q.creationDateBefore
}
func (q SearchQueryBuilder) GetLockRows() (bool, LockOption) {
return q.lockRows, q.lockOption
}
// ensureInstanceID makes sure that the instance id is always set
func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) {
if b.instanceID == nil && len(b.instanceIDs) == 0 && authz.GetInstance(ctx).InstanceID() != "" {
@@ -322,27 +316,6 @@ func (builder *SearchQueryBuilder) CreationDateBefore(creationDate time.Time) *S
return builder
}
type LockOption int
const (
// Wait until the previous lock on all of the selected rows is released (default)
LockOptionWait LockOption = iota
// With NOWAIT, the statement reports an error, rather than waiting, if a selected row cannot be locked immediately.
LockOptionNoWait
// With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped.
LockOptionSkipLocked
)
// LockRowsDuringTx locks the found rows for the duration of the transaction,
// using the [`FOR UPDATE`](https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE) lock strength.
// The lock is removed on transaction commit or rollback.
func (builder *SearchQueryBuilder) LockRowsDuringTx(tx *sql.Tx, option LockOption) *SearchQueryBuilder {
builder.tx = tx
builder.lockRows = true
builder.lockOption = option
return builder
}
// AddQuery creates a new sub query.
// All fields in the sub query are AND-connected in the storage request.
// Multiple sub queries are OR-connected in the storage request.

View File

@@ -17,11 +17,6 @@ import (
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
var pushTxOpts = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
}
func (es *Eventstore) Push(ctx context.Context, client database.ContextQueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -60,7 +55,8 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context
}()
}
_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL application_name = '%s'", fmt.Sprintf("zitadel_es_pusher_%s", authz.GetInstance(ctx).InstanceID())))
// lock the instance for reading events if await events is set for the duration of the transaction.
_, err = tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock_shared('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))", authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
@@ -69,7 +65,6 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context
if err != nil {
return nil, err
}
if err = handleUniqueConstraints(ctx, tx, commands); err != nil {
return nil, err
}

View File

@@ -15,4 +15,4 @@ ON
AND e.aggregate_type = existing.aggregate_type
AND e.aggregate_id = existing.aggregate_id
AND e.sequence = existing.sequence
FOR UPDATE;
FOR NO KEY UPDATE;

View File

@@ -59,7 +59,7 @@ LogStore:
Projections:
HandleActiveInstances: 30m
RequeueEvery: 5s
RequeueEvery: 20s
Customizations:
NotificationsQuotas:
RequeueEvery: 1s

View File

@@ -130,7 +130,7 @@ func (q *Queries) checkAndLock(tx *sql.Tx, projectionName string) (name string,
From(currentStateTable.identifier()).
Where(sq.Eq{
CurrentStateColProjectionName.identifier(): projectionName,
}).Suffix("FOR UPDATE").
}).Suffix("FOR NO KEY UPDATE").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {

View File

@@ -17,6 +17,7 @@ type Config struct {
ActiveInstancer interface {
ActiveInstances() []string
}
MaxParallelTriggers uint16
}
type CustomConfig struct {

View File

@@ -121,6 +121,14 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
ActiveInstancer: config.ActiveInstancer,
}
if config.MaxParallelTriggers == 0 {
config.MaxParallelTriggers = uint16(sqlClient.Pool.Config().MaxConns / 3)
}
if sqlClient.Pool.Config().MaxConns <= int32(config.MaxParallelTriggers) {
logging.WithFields("database.MaxOpenConnections", sqlClient.Pool.Config().MaxConns, "projections.MaxParallelTriggers", config.MaxParallelTriggers).Fatal("Number of max parallel triggers must be lower than max open connections")
}
handler.StartWorkerPool(config.MaxParallelTriggers)
OrgProjection = newOrgProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["orgs"]))
OrgMetadataProjection = newOrgMetadataProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["org_metadata"]))
ActionProjection = newActionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["actions"]))
@@ -214,7 +222,7 @@ func Start(ctx context.Context) error {
for _, projection := range projections {
table := projection.String()
if projectionTableMap[table] {
return fmt.Errorf("projeciton for %s already added", table)
return fmt.Errorf("projection for %s already added", table)
}
projectionTableMap[table] = true

View File

@@ -56,7 +56,7 @@ func TestStart(t *testing.T) {
return projections
},
err: fmt.Errorf("projeciton for %s already added", duplicateName),
err: fmt.Errorf("projection for %s already added", duplicateName),
},
}
for _, tt := range tests {

View File

@@ -68,13 +68,11 @@ verify_all_user_grants_exist: ensure_modules bundle
.PHONY: users_by_metadata_key
users_by_metadata_key: ensure_modules bundle
${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_key.js --vus ${VUS} --duration ${DURATION}
# --out csv=output/users_by_metadata_${DATE}.csv
${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_key.js --vus ${VUS} --duration ${DURATION} --out csv=output/users_by_metadata_${DATE}.csv
.PHONY: users_by_metadata_value
users_by_metadata_value: ensure_modules bundle
${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_value.js --vus ${VUS} --duration ${DURATION}
# --out csv=output/users_by_metadata_${DATE}.csv
${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_value.js --vus ${VUS} --duration ${DURATION} --out csv=output/users_by_metadata_${DATE}.csv
.PHONY: lint
lint:

View File

@@ -17,16 +17,16 @@ The use cases under tests are defined in `src/use_cases`. The implementation of
### Env vars
- `VUS`: Amount of parallel processes execute the test (default is 20)
- `DURATION`: Defines how long the tests are executed (default is `200s`)
- `ZITADEL_HOST`: URL of ZITADEL (default is `http://localhost:8080`)
- `ADMIN_LOGIN_NAME`: Loginanme of a human user with `IAM_OWNER`-role
- `ADMIN_PASSWORD`: password of the human user
* `VUS`: Amount of parallel processes execute the test (default is 20)
* `DURATION`: Defines how long the tests are executed (default is `200s`)
* `ZITADEL_HOST`: URL of ZITADEL (default is `http://localhost:8080`)
* `ADMIN_LOGIN_NAME`: Loginanme of a human user with `IAM_OWNER`-role
* `ADMIN_PASSWORD`: password of the human user
To setup the tests we use the credentials of console and log in using an admin. The user must be able to create organizations and all resources inside organizations.
- `ADMIN_LOGIN_NAME`: `zitadel-admin@zitadel.localhost`
- `ADMIN_PASSWORD`: `Password1!`
* `ADMIN_LOGIN_NAME`: `zitadel-admin@zitadel.localhost`
* `ADMIN_PASSWORD`: `Password1!`
### Test
@@ -66,4 +66,14 @@ Before you run the tests you need an initialized user. The tests don't implement
test: creates a token and calls user info
* `make machine_jwt_profile_grant_single_user`
setup: generates private/public key, creates machine user, adds a key
test: creates a token and calls user info in parallel for the same user
test: creates a token and calls user info in parallel for the same user
* `make users_by_metadata_key`
setup: creates for half of the VUS a human user and a machine for the other half, adds 3 metadata to each user
test: calls the list users endpoint and filters by a metadata key
* `make users_by_metadata_value`
setup: creates for half of the VUS a human user and a machine for the other half, adds 3 metadata to each user
test: calls the list users endpoint and filters by a metadata value
* `make verify_all_user_grants_exists`
setup: creates 50 projects, 1 machine per VU
test: creates a machine and grants all projects to the machine
teardown: the organization is not removed to verify the data of the projections are correct. You can find additional information [at the bottom of this file](./src/use_cases/verify_all_user_grants_exist.ts)

View File

@@ -1,6 +1,6 @@
import { loginByUsernamePassword } from '../../login_ui';
import { createOrg, removeOrg } from '../../org';
import { User, createHuman } from '../../user';
import { User, createHuman, createMachine } from '../../user';
import { Trend } from 'k6/metrics';
import { Config, MaxVUs } from '../../config';
import { createSession } from '../../session';

View File

@@ -0,0 +1,77 @@
import { loginByUsernamePassword } from '../login_ui';
import { createOrg } from '../org';
import { User, createMachine } from '../user';
import { Config, MaxVUs } from '../config';
import { createProject, Project } from '../project';
import { addUserGrant } from '../user_grant';
export async function setup() {
const tokens = loginByUsernamePassword(Config.admin as User);
console.info('setup: admin signed in');
const org = await createOrg(tokens.accessToken!);
console.info(`setup: org (${org.organizationId}) created`);
const projects = await Promise.all(
Array.from({ length: 50 }, (_, i) => {
return createProject(`project-${i}`, org, tokens.accessToken!);
}),
);
console.log(`setup: ${projects.length} projects created`);
let machines = (
await Promise.all(
Array.from({ length: MaxVUs() }, async (_, i) => {
return await createMachine(`zitachine-${i}`, org, tokens.accessToken!);
}),
)
).map((machine) => {
return { userId: machine.userId, loginName: machine.loginNames[0] };
});
console.log(`setup: ${machines.length} machines created`);
return { tokens, org, machines, projects };
}
export default async function (data: any) {
const machine = await createMachine(`zitachine-${__VU}-${__ITER}`, data.org, data.tokens.accessToken!);
let userGrants = await Promise.all(
data.projects.map((project: Project) => {
return addUserGrant(data.org, machine.userId, project, [], data.tokens.accessToken!);
}),
);
return { userGrants };
}
export function teardown(data: any) {
// removeOrg(data.org, data.tokens.accessToken);
console.info('teardown: org is not removed to verify correctness of projections, do not forget to remove the org afterwards');
}
/**
* To verify the correctness of the projections you can use the following statements:
*
* set the owner of the events:
*
* set my.owner = '<org id of the created org>';
*
* check if the amount of events is the same as amount of objects
*
* select * from (
select 'projections.user_grants5', count(*) from projections.user_grants5 where resource_owner = (select current_setting('my.owner'))
union all
select 'projections.users14', count(*) from projections.users14 where resource_owner = (select current_setting('my.owner'))
union all
select 'projections.sessions8', count(*) from projections.sessions8 where user_resource_owner = (select current_setting('my.owner'))
union all
select aggregate_type, count(*) from eventstore.events2
where
aggregate_type in ('user', 'usergrant', 'session')
and event_type in ('user.machine.added', 'user.human.added', 'user.grant.added', 'session.user.checked')
and (owner = (select current_setting('my.owner'))
OR payload->>'userResourceOwner' = (select current_setting('my.owner'))
)
group by aggregate_type
) order by 2;
*/