diff --git a/Makefile b/Makefile index 8c7640ef588..630e676bc1a 100644 --- a/Makefile +++ b/Makefile @@ -138,7 +138,7 @@ core_integration_server_start: core_integration_setup .PHONY: core_integration_test_packages core_integration_test_packages: - go test -race -count 1 -tags integration -timeout 30m $$(go list -tags integration ./... | grep "integration_test") + go test -race -count 1 -tags integration -timeout 60m -parallel 1 $$(go list -tags integration ./... | grep "integration_test") .PHONY: core_integration_server_stop core_integration_server_stop: diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index a0fd815bec8..e0467af41ac 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -387,6 +387,12 @@ Projections: # Maximum amount of instances cached as active # If set to 0, every instance is always considered active MaxActiveInstances: 0 # ZITADEL_PROJECTIONS_MAXACTIVEINSTANCES + # Limits the amount of concurrently running projection triggers + # If set to 0, 1/3 of database.MaxOpenConns is used + # The number must be lower than the Database.MaxOpenConns + # A good starting point is to set this to Database.MaxOpenConns / 3 + # so that there are enough connections free for other operations (e.g. writing events, reading projections, notifications, ...) + MaxParallelTriggers: 0 # ZITADEL_PROJECTIONS_MAXPARALLELTRIGGERS # In the Customizations section, all settings from above can be overwritten for each specific projection Customizations: custom_texts: diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index 3c25731c833..9d350e016bd 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -10,7 +10,9 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/shopspring/decimal" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/zerrors" ) type FieldHandler struct { @@ -69,7 +71,14 @@ func (h *FieldHandler) Trigger(ctx context.Context, opts ...TriggerOpt) (err err defer cancel() for i := 0; ; i++ { - additionalIteration, err := h.processEvents(ctx, config) + var additionalIteration bool + var wg sync.WaitGroup + wg.Add(1) + queue <- func() { + additionalIteration, err = h.processEvents(ctx, config) + wg.Done() + } + wg.Wait() h.log().OnError(err).Info("process events failed") h.log().WithField("iteration", i).Debug("trigger iteration") if !additionalIteration || err != nil { @@ -101,7 +110,7 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) defer cancel() } - tx, err := h.client.BeginTx(txCtx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) + tx, err := h.client.BeginTx(txCtx, nil) if err != nil { return false, err } @@ -117,13 +126,19 @@ func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) } }() + var hasLocked bool + err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1), hashtext($2))", h.ProjectionName(), authz.GetInstance(ctx).InstanceID()).Scan(&hasLocked) + if err != nil { + return false, err + } + if !hasLocked { + return false, zerrors.ThrowInternal(nil, "V2-xRffO", "projection already locked") + } + // always await currently running transactions config.awaitRunning = true - currentState, err := h.currentState(ctx, tx, config) + currentState, err := h.currentState(ctx, tx) if err != nil { - if errors.Is(err, errJustUpdated) { - return false, nil - } return additionalIteration, err } // stop execution if currentState.eventTimestamp >= config.maxCreatedAt diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index b9e9691e707..ce548428910 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -20,6 +20,7 @@ import ( "github.com/zitadel/zitadel/internal/migration" "github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/pseudo" + "github.com/zitadel/zitadel/internal/zerrors" ) type EventStore interface { @@ -405,6 +406,9 @@ type triggerConfig struct { type TriggerOpt func(conf *triggerConfig) +// WithAwaitRunning instructs the projection to wait until previous triggers within the same container are finished +// If multiple containers are involved, we do not await them to finish. If another container is currently projecting the trigger is skipped. +// The reason is that we do not want to cause potential database connection exhaustion. func WithAwaitRunning() TriggerOpt { return func(conf *triggerConfig) { conf.awaitRunning = true @@ -423,6 +427,31 @@ func WithMinPosition(position decimal.Decimal) TriggerOpt { } } +var ( + queue chan func() + queueStart sync.Once +) + +func StartWorkerPool(count uint16) { + queueStart.Do(func() { + queue = make(chan func()) + + for range count { + go worker() + } + }) +} + +func worker() { + for { + processEvents, ok := <-queue + if !ok { + return + } + processEvents() + } +} + func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Context, err error) { if slices.Contains(h.skipInstanceIDs, authz.GetInstance(ctx).InstanceID()) { return call.ResetTimestamp(ctx), nil @@ -439,7 +468,16 @@ func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Co defer cancel() for i := 0; ; i++ { - additionalIteration, err := h.processEvents(ctx, config) + var ( + additionalIteration bool + wg sync.WaitGroup + ) + wg.Add(1) + queue <- func() { + additionalIteration, err = h.processEvents(ctx, config) + wg.Done() + } + wg.Wait() h.log().OnError(err).Info("process events failed") h.log().WithField("iteration", i).Debug("trigger iteration") if !additionalIteration || err != nil { @@ -526,11 +564,17 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add } }() - currentState, err := h.currentState(ctx, tx, config) + var hasLocked bool + err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1), hashtext($2))", h.ProjectionName(), authz.GetInstance(ctx).InstanceID()).Scan(&hasLocked) + if err != nil { + return false, err + } + if !hasLocked { + return false, zerrors.ThrowInternal(nil, "V2-lpiK0", "projection already locked") + } + + currentState, err := h.currentState(ctx, tx) if err != nil { - if errors.Is(err, errJustUpdated) { - return false, nil - } return additionalIteration, err } // stop execution if currentState.position >= config.maxPosition diff --git a/internal/eventstore/handler/v2/state.go b/internal/eventstore/handler/v2/state.go index c4afaed204b..20febece764 100644 --- a/internal/eventstore/handler/v2/state.go +++ b/internal/eventstore/handler/v2/state.go @@ -27,17 +27,11 @@ type state struct { var ( //go:embed state_get.sql currentStateStmt string - //go:embed state_get_await.sql - currentStateAwaitStmt string //go:embed state_set.sql updateStateStmt string - //go:embed state_lock.sql - lockStateStmt string - - errJustUpdated = errors.New("projection was just updated") ) -func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerConfig) (currentState *state, err error) { +func (h *Handler) currentState(ctx context.Context, tx *sql.Tx) (currentState *state, err error) { currentState = &state{ instanceID: authz.GetInstance(ctx).InstanceID(), } @@ -51,12 +45,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC offset = new(sql.NullInt64) ) - stateQuery := currentStateStmt - if config.awaitRunning { - stateQuery = currentStateAwaitStmt - } - - row := tx.QueryRow(stateQuery, currentState.instanceID, h.projection.Name()) + row := tx.QueryRow(currentStateStmt, currentState.instanceID, h.projection.Name()) err = row.Scan( aggregateID, aggregateType, @@ -65,10 +54,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC position, offset, ) - if errors.Is(err, sql.ErrNoRows) { - err = h.lockState(tx, currentState.instanceID) - } - if err != nil { + if err != nil && !errors.Is(err, sql.ErrNoRows) { h.log().WithError(err).Debug("unable to query current state") return nil, err } @@ -104,17 +90,3 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error { } return nil } - -func (h *Handler) lockState(tx *sql.Tx, instanceID string) error { - res, err := tx.Exec(lockStateStmt, - h.projection.Name(), - instanceID, - ) - if err != nil { - return err - } - if affected, err := res.RowsAffected(); affected == 0 || err != nil { - return zerrors.ThrowInternal(err, "V2-lpiK0", "projection already locked") - } - return nil -} diff --git a/internal/eventstore/handler/v2/state_get.sql b/internal/eventstore/handler/v2/state_get.sql index a2082e48407..a324dbbab34 100644 --- a/internal/eventstore/handler/v2/state_get.sql +++ b/internal/eventstore/handler/v2/state_get.sql @@ -10,4 +10,4 @@ FROM WHERE instance_id = $1 AND projection_name = $2 -FOR UPDATE NOWAIT; \ No newline at end of file +FOR NO KEY UPDATE; \ No newline at end of file diff --git a/internal/eventstore/handler/v2/state_get_await.sql b/internal/eventstore/handler/v2/state_get_await.sql deleted file mode 100644 index 8c7587f7dde..00000000000 --- a/internal/eventstore/handler/v2/state_get_await.sql +++ /dev/null @@ -1,13 +0,0 @@ -SELECT - aggregate_id - , aggregate_type - , "sequence" - , event_date - , "position" - , filter_offset -FROM - projections.current_states -WHERE - instance_id = $1 - AND projection_name = $2 -FOR UPDATE; \ No newline at end of file diff --git a/internal/eventstore/handler/v2/state_lock.sql b/internal/eventstore/handler/v2/state_lock.sql deleted file mode 100644 index 55888e7f38a..00000000000 --- a/internal/eventstore/handler/v2/state_lock.sql +++ /dev/null @@ -1,9 +0,0 @@ -INSERT INTO projections.current_states ( - projection_name - , instance_id - , last_updated -) VALUES ( - $1 - , $2 - , now() -) ON CONFLICT DO NOTHING; \ No newline at end of file diff --git a/internal/eventstore/handler/v2/state_test.go b/internal/eventstore/handler/v2/state_test.go index ef91d78e553..309c5497f1b 100644 --- a/internal/eventstore/handler/v2/state_test.go +++ b/internal/eventstore/handler/v2/state_test.go @@ -19,123 +19,6 @@ import ( "github.com/zitadel/zitadel/internal/zerrors" ) -func TestHandler_lockState(t *testing.T) { - type fields struct { - projection Projection - mock *mock.SQLMock - } - type args struct { - instanceID string - } - tests := []struct { - name string - fields fields - args args - isErr func(t *testing.T, err error) - }{ - { - name: "tx closed", - fields: fields{ - projection: &projection{ - name: "projection", - }, - mock: mock.NewSQLMock(t, - mock.ExpectBegin(nil), - mock.ExcpectExec( - lockStateStmt, - mock.WithExecArgs( - "projection", - "instance", - ), - mock.WithExecErr(sql.ErrTxDone), - ), - ), - }, - args: args{ - instanceID: "instance", - }, - isErr: func(t *testing.T, err error) { - if !errors.Is(err, sql.ErrTxDone) { - t.Errorf("unexpected error, want: %v got: %v", sql.ErrTxDone, err) - } - }, - }, - { - name: "no rows affeced", - fields: fields{ - projection: &projection{ - name: "projection", - }, - mock: mock.NewSQLMock(t, - mock.ExpectBegin(nil), - mock.ExcpectExec( - lockStateStmt, - mock.WithExecArgs( - "projection", - "instance", - ), - mock.WithExecNoRowsAffected(), - ), - ), - }, - args: args{ - instanceID: "instance", - }, - isErr: func(t *testing.T, err error) { - if !errors.Is(err, zerrors.ThrowInternal(nil, "V2-lpiK0", "")) { - t.Errorf("unexpected error: want internal (V2lpiK0), got: %v", err) - } - }, - }, - { - name: "rows affected", - fields: fields{ - projection: &projection{ - name: "projection", - }, - mock: mock.NewSQLMock(t, - mock.ExpectBegin(nil), - mock.ExcpectExec( - lockStateStmt, - mock.WithExecArgs( - "projection", - "instance", - ), - mock.WithExecRowsAffected(1), - ), - ), - }, - args: args{ - instanceID: "instance", - }, - }, - } - for _, tt := range tests { - if tt.isErr == nil { - tt.isErr = func(t *testing.T, err error) { - if err != nil { - t.Error("expected no error got:", err) - } - } - } - t.Run(tt.name, func(t *testing.T) { - h := &Handler{ - projection: tt.fields.projection, - } - - tx, err := tt.fields.mock.DB.BeginTx(context.Background(), nil) - if err != nil { - t.Fatalf("unable to begin transaction: %v", err) - } - - err = h.lockState(tx, tt.args.instanceID) - tt.isErr(t, err) - - tt.fields.mock.Assert(t) - }) - } -} - func TestHandler_updateLastUpdated(t *testing.T) { type fields struct { projection Projection @@ -309,41 +192,6 @@ func TestHandler_currentState(t *testing.T) { }, }, }, - { - name: "no row but lock err", - fields: fields{ - projection: &projection{ - name: "projection", - }, - mock: mock.NewSQLMock(t, - mock.ExpectBegin(nil), - mock.ExpectQuery(currentStateStmt, - mock.WithQueryArgs( - "instance", - "projection", - ), - mock.WithQueryErr(sql.ErrNoRows), - ), - mock.ExcpectExec(lockStateStmt, - mock.WithExecArgs( - "projection", - "instance", - ), - mock.WithExecErr(sql.ErrTxDone), - ), - ), - }, - args: args{ - ctx: authz.WithInstanceID(context.Background(), "instance"), - }, - want: want{ - isErr: func(t *testing.T, err error) { - if !errors.Is(err, sql.ErrTxDone) { - t.Errorf("unexpected error, want: %v, got: %v", sql.ErrTxDone, err) - } - }, - }, - }, { name: "state locked", fields: fields{ @@ -440,7 +288,7 @@ func TestHandler_currentState(t *testing.T) { t.Fatalf("unable to begin transaction: %v", err) } - gotCurrentState, err := h.currentState(tt.args.ctx, tx, new(triggerConfig)) + gotCurrentState, err := h.currentState(tt.args.ctx, tx) tt.want.isErr(t, err) if !reflect.DeepEqual(gotCurrentState, tt.want.currentState) { diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index 760f7f616c7..e1f848b1436 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -16,8 +16,6 @@ type SearchQuery struct { SubQueries [][]*Filter Tx *sql.Tx - LockRows bool - LockOption eventstore.LockOption AwaitOpenTransactions bool Limit uint64 Offset uint32 @@ -135,7 +133,6 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err AwaitOpenTransactions: builder.GetAwaitOpenTransactions(), SubQueries: make([][]*Filter, len(builder.GetQueries())), } - query.LockRows, query.LockOption = builder.GetLockRows() for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{ instanceIDFilter, diff --git a/internal/eventstore/repository/sql/postgres.go b/internal/eventstore/repository/sql/postgres.go index 0dc2210f7bf..5d56d913190 100644 --- a/internal/eventstore/repository/sql/postgres.go +++ b/internal/eventstore/repository/sql/postgres.go @@ -17,8 +17,8 @@ import ( // awaitOpenTransactions ensures event ordering, so we don't events younger that open transactions var ( - awaitOpenTransactionsV1 = ` AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')` - awaitOpenTransactionsV2 = ` AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')` + awaitOpenTransactionsV1 = ` AND created_at <= now()` + awaitOpenTransactionsV2 = ` AND "position" <= EXTRACT(EPOCH FROM now())` ) func awaitOpenTransactions(useV1 bool) string { diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index 8584a82fa08..d3cf2550035 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -12,6 +12,7 @@ import ( "github.com/shopspring/decimal" "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database/dialect" "github.com/zitadel/zitadel/internal/eventstore" @@ -65,6 +66,32 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search if where == "" || query == "" { return zerrors.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory") } + + var contextQuerier interface { + QueryContext(context.Context, func(rows *sql.Rows) error, string, ...interface{}) error + ExecContext(context.Context, string, ...any) (sql.Result, error) + } + contextQuerier = criteria.Client() + if q.Tx != nil { + contextQuerier = &tx{Tx: q.Tx} + } + + if q.AwaitOpenTransactions && q.Columns == eventstore.ColumnsEvent { + instanceID := authz.GetInstance(ctx).InstanceID() + if q.InstanceID != nil { + instanceID = q.InstanceID.Value.(string) + } + + _, err = contextQuerier.ExecContext(ctx, + "select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))", + instanceID, + ) + if err != nil { + return err + } + + where += awaitOpenTransactions(useV1) + } query += where // instead of using the max function of the database (which doesn't work for postgres) @@ -100,28 +127,8 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search query += " OFFSET ?" } - if q.LockRows { - query += " FOR UPDATE" - switch q.LockOption { - case eventstore.LockOptionWait: // default behavior - case eventstore.LockOptionNoWait: - query += " NOWAIT" - case eventstore.LockOptionSkipLocked: - query += " SKIP LOCKED" - - } - } - query = criteria.placeholder(query) - var contextQuerier interface { - QueryContext(context.Context, func(rows *sql.Rows) error, string, ...interface{}) error - } - contextQuerier = criteria.Client() - if q.Tx != nil { - contextQuerier = &tx{Tx: q.Tx} - } - err = contextQuerier.QueryContext(ctx, func(rows *sql.Rows) error { for rows.Next() { @@ -289,22 +296,6 @@ func prepareConditions(criteria querier, query *repository.SearchQuery, useV1 bo args = append(args, excludeAggregateIDsArgs...) } - if query.AwaitOpenTransactions { - instanceIDs := make(database.TextArray[string], 0, 3) - if query.InstanceID != nil { - instanceIDs = append(instanceIDs, query.InstanceID.Value.(string)) - } else if query.InstanceIDs != nil { - instanceIDs = append(instanceIDs, query.InstanceIDs.Value.(database.TextArray[string])...) - } - - for i := range instanceIDs { - instanceIDs[i] = "zitadel_es_pusher_" + instanceIDs[i] - } - - clauses += awaitOpenTransactions(useV1) - args = append(args, instanceIDs) - } - if clauses == "" { return "", nil } diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index 0e2425dd07c..4c6e30daf57 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -405,8 +405,8 @@ func Test_prepareCondition(t *testing.T) { useV1: true, }, res: res{ - clause: " WHERE aggregate_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')", - values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}}, + clause: " WHERE aggregate_type = ANY(?)", + values: []interface{}{[]eventstore.AggregateType{"user", "org"}}, }, }, { @@ -422,8 +422,8 @@ func Test_prepareCondition(t *testing.T) { }, }, res: res{ - clause: ` WHERE aggregate_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`, - values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}}, + clause: ` WHERE aggregate_type = ANY(?)`, + values: []interface{}{[]eventstore.AggregateType{"user", "org"}}, }, }, { @@ -442,8 +442,8 @@ func Test_prepareCondition(t *testing.T) { useV1: true, }, res: res{ - clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')", - values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}}, + clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?)", + values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}}, }, }, { @@ -461,8 +461,8 @@ func Test_prepareCondition(t *testing.T) { }, }, res: res{ - clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND "position" < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY(?) AND state <> 'idle')`, - values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}}, + clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?)`, + values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}}, }, }, } @@ -693,10 +693,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`), - []driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}}, - ), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQuery( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`), + []driver.Value{eventstore.AggregateType("user")}, + ), }, res: res{ wantErr: false, @@ -716,10 +720,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence LIMIT $3`), - []driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)}, - ), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQuery( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence LIMIT $2`), + []driver.Value{eventstore.AggregateType("user"), uint64(5)}, + ), }, res: res{ wantErr: false, @@ -739,76 +747,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $3`), - []driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)}, - ), - }, - res: res{ - wantErr: false, - }, - }, - { - name: "lock, wait", - args: args{ - dest: &[]*repository.Event{}, - query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - OrderDesc(). - Limit(5). - AddQuery(). - AggregateTypes("user"). - Builder().LockRowsDuringTx(nil, eventstore.LockOptionWait), - useV1: true, - }, - fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE`), - []driver.Value{eventstore.AggregateType("user"), uint64(5)}, - ), - }, - res: res{ - wantErr: false, - }, - }, - { - name: "lock, no wait", - args: args{ - dest: &[]*repository.Event{}, - query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - OrderDesc(). - Limit(5). - AddQuery(). - AggregateTypes("user"). - Builder().LockRowsDuringTx(nil, eventstore.LockOptionNoWait), - useV1: true, - }, - fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE NOWAIT`), - []driver.Value{eventstore.AggregateType("user"), uint64(5)}, - ), - }, - res: res{ - wantErr: false, - }, - }, - { - name: "lock, skip locked", - args: args{ - dest: &[]*repository.Event{}, - query: eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - OrderDesc(). - Limit(5). - AddQuery(). - AggregateTypes("user"). - Builder().LockRowsDuringTx(nil, eventstore.LockOptionSkipLocked), - useV1: true, - }, - fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2 FOR UPDATE SKIP LOCKED`), - []driver.Value{eventstore.AggregateType("user"), uint64(5)}, - ), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQuery( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC LIMIT $2`), + []driver.Value{eventstore.AggregateType("user"), uint64(5)}, + ), }, res: res{ wantErr: false, @@ -828,10 +774,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQueryErr( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`), - []driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}}, - sql.ErrConnDone), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQueryErr( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`), + []driver.Value{eventstore.AggregateType("user")}, + sql.ErrConnDone), }, res: res{ wantErr: true, @@ -851,10 +801,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQueryScanErr( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($2) AND state <> 'idle') ORDER BY event_sequence DESC`), - []driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}}, - &repository.Event{Seq: 100}), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQueryScanErr( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 AND created_at <= now() ORDER BY event_sequence DESC`), + []driver.Value{eventstore.AggregateType("user")}, + &repository.Event{Seq: 100}), }, res: res{ wantErr: true, @@ -886,10 +840,14 @@ func Test_query_events_mocked(t *testing.T) { useV1: true, }, fields: fields{ - mock: newMockClient(t).expectQuery( - regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE (aggregate_type = $1 OR (aggregate_type = $2 AND aggregate_id = $3)) AND EXTRACT(EPOCH FROM created_at) < (SELECT COALESCE(EXTRACT(EPOCH FROM min(xact_start)), EXTRACT(EPOCH FROM now())) FROM pg_stat_activity WHERE datname = current_database() AND application_name = ANY($4) AND state <> 'idle') ORDER BY event_sequence DESC LIMIT $5`), - []driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", database.TextArray[string]{}, uint64(5)}, - ), + mock: newMockClient(t). + expectExec(regexp.QuoteMeta( + `select pg_advisory_lock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1)), pg_advisory_unlock('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))`), + []driver.Value{""}). + expectQuery( + regexp.QuoteMeta(`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE (aggregate_type = $1 OR (aggregate_type = $2 AND aggregate_id = $3)) AND created_at <= now() ORDER BY event_sequence DESC LIMIT $4`), + []driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", uint64(5)}, + ), }, res: res{ wantErr: false, @@ -1040,6 +998,11 @@ func (m *dbMock) expectQueryErr(expectedQuery string, args []driver.Value, err e return m } +func (m *dbMock) expectExec(expectedQuery string, args []driver.Value) *dbMock { + m.mock.ExpectExec(expectedQuery).WithArgs(args...).WillReturnResult(sqlmock.NewResult(1, 1)) + return m +} + func newMockClient(t *testing.T) *dbMock { t.Helper() db, mock, err := sqlmock.New(sqlmock.ValueConverterOption(new(db_mock.TypeConverter))) diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index dc92f5a4de5..d431c4dcb20 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -25,8 +25,6 @@ type SearchQueryBuilder struct { queries []*SearchQuery excludeAggregateIDs *ExclusionQuery tx *sql.Tx - lockRows bool - lockOption LockOption positionAtLeast decimal.Decimal awaitOpenTransactions bool creationDateAfter time.Time @@ -98,10 +96,6 @@ func (q SearchQueryBuilder) GetCreationDateBefore() time.Time { return q.creationDateBefore } -func (q SearchQueryBuilder) GetLockRows() (bool, LockOption) { - return q.lockRows, q.lockOption -} - // ensureInstanceID makes sure that the instance id is always set func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) { if b.instanceID == nil && len(b.instanceIDs) == 0 && authz.GetInstance(ctx).InstanceID() != "" { @@ -322,27 +316,6 @@ func (builder *SearchQueryBuilder) CreationDateBefore(creationDate time.Time) *S return builder } -type LockOption int - -const ( - // Wait until the previous lock on all of the selected rows is released (default) - LockOptionWait LockOption = iota - // With NOWAIT, the statement reports an error, rather than waiting, if a selected row cannot be locked immediately. - LockOptionNoWait - // With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped. - LockOptionSkipLocked -) - -// LockRowsDuringTx locks the found rows for the duration of the transaction, -// using the [`FOR UPDATE`](https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE) lock strength. -// The lock is removed on transaction commit or rollback. -func (builder *SearchQueryBuilder) LockRowsDuringTx(tx *sql.Tx, option LockOption) *SearchQueryBuilder { - builder.tx = tx - builder.lockRows = true - builder.lockOption = option - return builder -} - // AddQuery creates a new sub query. // All fields in the sub query are AND-connected in the storage request. // Multiple sub queries are OR-connected in the storage request. diff --git a/internal/eventstore/v3/push.go b/internal/eventstore/v3/push.go index 862e0c44048..726b4ca6124 100644 --- a/internal/eventstore/v3/push.go +++ b/internal/eventstore/v3/push.go @@ -17,11 +17,6 @@ import ( "github.com/zitadel/zitadel/internal/telemetry/tracing" ) -var pushTxOpts = &sql.TxOptions{ - Isolation: sql.LevelReadCommitted, - ReadOnly: false, -} - func (es *Eventstore) Push(ctx context.Context, client database.ContextQueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() @@ -60,7 +55,8 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context }() } - _, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL application_name = '%s'", fmt.Sprintf("zitadel_es_pusher_%s", authz.GetInstance(ctx).InstanceID()))) + // lock the instance for reading events if await events is set for the duration of the transaction. + _, err = tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock_shared('eventstore.events2'::REGCLASS::OID::INTEGER, hashtext($1))", authz.GetInstance(ctx).InstanceID()) if err != nil { return nil, err } @@ -69,7 +65,6 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context if err != nil { return nil, err } - if err = handleUniqueConstraints(ctx, tx, commands); err != nil { return nil, err } diff --git a/internal/eventstore/v3/sequences_query.sql b/internal/eventstore/v3/sequences_query.sql index 468a2752538..21be2716d46 100644 --- a/internal/eventstore/v3/sequences_query.sql +++ b/internal/eventstore/v3/sequences_query.sql @@ -15,4 +15,4 @@ ON AND e.aggregate_type = existing.aggregate_type AND e.aggregate_id = existing.aggregate_id AND e.sequence = existing.sequence -FOR UPDATE; \ No newline at end of file +FOR NO KEY UPDATE; \ No newline at end of file diff --git a/internal/integration/config/zitadel.yaml b/internal/integration/config/zitadel.yaml index fed746d823a..00d55d0f6da 100644 --- a/internal/integration/config/zitadel.yaml +++ b/internal/integration/config/zitadel.yaml @@ -59,7 +59,7 @@ LogStore: Projections: HandleActiveInstances: 30m - RequeueEvery: 5s + RequeueEvery: 20s Customizations: NotificationsQuotas: RequeueEvery: 1s diff --git a/internal/query/current_state.go b/internal/query/current_state.go index d0a5b369bfc..14e21cb7fe4 100644 --- a/internal/query/current_state.go +++ b/internal/query/current_state.go @@ -130,7 +130,7 @@ func (q *Queries) checkAndLock(tx *sql.Tx, projectionName string) (name string, From(currentStateTable.identifier()). Where(sq.Eq{ CurrentStateColProjectionName.identifier(): projectionName, - }).Suffix("FOR UPDATE"). + }).Suffix("FOR NO KEY UPDATE"). PlaceholderFormat(sq.Dollar). ToSql() if err != nil { diff --git a/internal/query/projection/config.go b/internal/query/projection/config.go index 70ffad0778b..0cc36225ea1 100644 --- a/internal/query/projection/config.go +++ b/internal/query/projection/config.go @@ -17,6 +17,7 @@ type Config struct { ActiveInstancer interface { ActiveInstances() []string } + MaxParallelTriggers uint16 } type CustomConfig struct { diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index b8dac72b9a8..3b22f6c7d7d 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -121,6 +121,14 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, ActiveInstancer: config.ActiveInstancer, } + if config.MaxParallelTriggers == 0 { + config.MaxParallelTriggers = uint16(sqlClient.Pool.Config().MaxConns / 3) + } + if sqlClient.Pool.Config().MaxConns <= int32(config.MaxParallelTriggers) { + logging.WithFields("database.MaxOpenConnections", sqlClient.Pool.Config().MaxConns, "projections.MaxParallelTriggers", config.MaxParallelTriggers).Fatal("Number of max parallel triggers must be lower than max open connections") + } + handler.StartWorkerPool(config.MaxParallelTriggers) + OrgProjection = newOrgProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["orgs"])) OrgMetadataProjection = newOrgMetadataProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["org_metadata"])) ActionProjection = newActionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["actions"])) @@ -214,7 +222,7 @@ func Start(ctx context.Context) error { for _, projection := range projections { table := projection.String() if projectionTableMap[table] { - return fmt.Errorf("projeciton for %s already added", table) + return fmt.Errorf("projection for %s already added", table) } projectionTableMap[table] = true diff --git a/internal/query/projection/projection_test.go b/internal/query/projection/projection_test.go index c7b071163a2..2c5040adb68 100644 --- a/internal/query/projection/projection_test.go +++ b/internal/query/projection/projection_test.go @@ -56,7 +56,7 @@ func TestStart(t *testing.T) { return projections }, - err: fmt.Errorf("projeciton for %s already added", duplicateName), + err: fmt.Errorf("projection for %s already added", duplicateName), }, } for _, tt := range tests { diff --git a/load-test/Makefile b/load-test/Makefile index 808280009cb..83a583a6c39 100644 --- a/load-test/Makefile +++ b/load-test/Makefile @@ -68,13 +68,11 @@ verify_all_user_grants_exist: ensure_modules bundle .PHONY: users_by_metadata_key users_by_metadata_key: ensure_modules bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_key.js --vus ${VUS} --duration ${DURATION} -# --out csv=output/users_by_metadata_${DATE}.csv + ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_key.js --vus ${VUS} --duration ${DURATION} --out csv=output/users_by_metadata_${DATE}.csv .PHONY: users_by_metadata_value users_by_metadata_value: ensure_modules bundle - ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_value.js --vus ${VUS} --duration ${DURATION} -# --out csv=output/users_by_metadata_${DATE}.csv + ${K6} run --summary-trend-stats "min,avg,max,p(50),p(95),p(99)" dist/users_by_metadata_value.js --vus ${VUS} --duration ${DURATION} --out csv=output/users_by_metadata_${DATE}.csv .PHONY: lint lint: diff --git a/load-test/README.md b/load-test/README.md index f161fac6def..4e942a73d6a 100644 --- a/load-test/README.md +++ b/load-test/README.md @@ -17,16 +17,16 @@ The use cases under tests are defined in `src/use_cases`. The implementation of ### Env vars -- `VUS`: Amount of parallel processes execute the test (default is 20) -- `DURATION`: Defines how long the tests are executed (default is `200s`) -- `ZITADEL_HOST`: URL of ZITADEL (default is `http://localhost:8080`) -- `ADMIN_LOGIN_NAME`: Loginanme of a human user with `IAM_OWNER`-role -- `ADMIN_PASSWORD`: password of the human user +* `VUS`: Amount of parallel processes execute the test (default is 20) +* `DURATION`: Defines how long the tests are executed (default is `200s`) +* `ZITADEL_HOST`: URL of ZITADEL (default is `http://localhost:8080`) +* `ADMIN_LOGIN_NAME`: Loginanme of a human user with `IAM_OWNER`-role +* `ADMIN_PASSWORD`: password of the human user To setup the tests we use the credentials of console and log in using an admin. The user must be able to create organizations and all resources inside organizations. -- `ADMIN_LOGIN_NAME`: `zitadel-admin@zitadel.localhost` -- `ADMIN_PASSWORD`: `Password1!` +* `ADMIN_LOGIN_NAME`: `zitadel-admin@zitadel.localhost` +* `ADMIN_PASSWORD`: `Password1!` ### Test @@ -66,4 +66,14 @@ Before you run the tests you need an initialized user. The tests don't implement test: creates a token and calls user info * `make machine_jwt_profile_grant_single_user` setup: generates private/public key, creates machine user, adds a key - test: creates a token and calls user info in parallel for the same user \ No newline at end of file + test: creates a token and calls user info in parallel for the same user +* `make users_by_metadata_key` + setup: creates for half of the VUS a human user and a machine for the other half, adds 3 metadata to each user + test: calls the list users endpoint and filters by a metadata key +* `make users_by_metadata_value` + setup: creates for half of the VUS a human user and a machine for the other half, adds 3 metadata to each user + test: calls the list users endpoint and filters by a metadata value +* `make verify_all_user_grants_exists` + setup: creates 50 projects, 1 machine per VU + test: creates a machine and grants all projects to the machine + teardown: the organization is not removed to verify the data of the projections are correct. You can find additional information [at the bottom of this file](./src/use_cases/verify_all_user_grants_exist.ts) diff --git a/load-test/src/use_cases/session/add_session.ts b/load-test/src/use_cases/session/add_session.ts index 808bbf03cdb..848029cc172 100644 --- a/load-test/src/use_cases/session/add_session.ts +++ b/load-test/src/use_cases/session/add_session.ts @@ -1,6 +1,6 @@ import { loginByUsernamePassword } from '../../login_ui'; import { createOrg, removeOrg } from '../../org'; -import { User, createHuman } from '../../user'; +import { User, createHuman, createMachine } from '../../user'; import { Trend } from 'k6/metrics'; import { Config, MaxVUs } from '../../config'; import { createSession } from '../../session'; diff --git a/load-test/src/use_cases/verify_all_user_grants_exist.ts b/load-test/src/use_cases/verify_all_user_grants_exist.ts new file mode 100644 index 00000000000..3115f99e3e3 --- /dev/null +++ b/load-test/src/use_cases/verify_all_user_grants_exist.ts @@ -0,0 +1,77 @@ +import { loginByUsernamePassword } from '../login_ui'; +import { createOrg } from '../org'; +import { User, createMachine } from '../user'; +import { Config, MaxVUs } from '../config'; +import { createProject, Project } from '../project'; +import { addUserGrant } from '../user_grant'; + +export async function setup() { + const tokens = loginByUsernamePassword(Config.admin as User); + console.info('setup: admin signed in'); + + const org = await createOrg(tokens.accessToken!); + console.info(`setup: org (${org.organizationId}) created`); + + const projects = await Promise.all( + Array.from({ length: 50 }, (_, i) => { + return createProject(`project-${i}`, org, tokens.accessToken!); + }), + ); + console.log(`setup: ${projects.length} projects created`); + + let machines = ( + await Promise.all( + Array.from({ length: MaxVUs() }, async (_, i) => { + return await createMachine(`zitachine-${i}`, org, tokens.accessToken!); + }), + ) + ).map((machine) => { + return { userId: machine.userId, loginName: machine.loginNames[0] }; + }); + console.log(`setup: ${machines.length} machines created`); + + return { tokens, org, machines, projects }; +} + +export default async function (data: any) { + const machine = await createMachine(`zitachine-${__VU}-${__ITER}`, data.org, data.tokens.accessToken!); + let userGrants = await Promise.all( + data.projects.map((project: Project) => { + return addUserGrant(data.org, machine.userId, project, [], data.tokens.accessToken!); + }), + ); + + return { userGrants }; +} + +export function teardown(data: any) { + // removeOrg(data.org, data.tokens.accessToken); + console.info('teardown: org is not removed to verify correctness of projections, do not forget to remove the org afterwards'); +} + +/** + * To verify the correctness of the projections you can use the following statements: + * + * set the owner of the events: + * + * set my.owner = ''; + * + * check if the amount of events is the same as amount of objects + * + * select * from ( + select 'projections.user_grants5', count(*) from projections.user_grants5 where resource_owner = (select current_setting('my.owner')) + union all + select 'projections.users14', count(*) from projections.users14 where resource_owner = (select current_setting('my.owner')) + union all + select 'projections.sessions8', count(*) from projections.sessions8 where user_resource_owner = (select current_setting('my.owner')) + union all + select aggregate_type, count(*) from eventstore.events2 + where + aggregate_type in ('user', 'usergrant', 'session') + and event_type in ('user.machine.added', 'user.human.added', 'user.grant.added', 'session.user.checked') + and (owner = (select current_setting('my.owner')) + OR payload->>'userResourceOwner' = (select current_setting('my.owner')) + ) + group by aggregate_type +) order by 2; + */ \ No newline at end of file