diff --git a/internal/admin/repository/eventsourcing/handler/styling.go b/internal/admin/repository/eventsourcing/handler/styling.go index 2f8964b519..a9dfba0108 100644 --- a/internal/admin/repository/eventsourcing/handler/styling.go +++ b/internal/admin/repository/eventsourcing/handler/styling.go @@ -185,7 +185,7 @@ func (s *Styling) Reducers() []handler.AggregateReducer { } func (m *Styling) processLabelPolicy(event eventstore.Event) (_ *handler.Statement, err error) { - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { policy := new(iam_model.LabelPolicyView) switch event.Type() { case instance.LabelPolicyAddedEventType, diff --git a/internal/eventstore/handler/init.go b/internal/eventstore/handler/init.go index 0ae6d2261b..6b1f108a66 100644 --- a/internal/eventstore/handler/init.go +++ b/internal/eventstore/handler/init.go @@ -6,7 +6,7 @@ import "context" type Init func(context.Context, *Check) error type Check struct { - Executes []func(ex Executer, projectionName string) (bool, error) + Executes []func(ctx context.Context, executer Executer, projectionName string) (bool, error) } func (c *Check) IsNoop() bool { diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index fd8b206b38..2f59aeed62 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -646,7 +646,7 @@ func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, statements for i, statement := range statements { select { case <-ctx.Done(): - break + return lastProcessedIndex, ctx.Err() default: err := h.executeStatement(ctx, tx, statement) if err != nil { @@ -669,7 +669,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S return err } - if err = statement.Execute(tx, h.projection.Name()); err != nil { + if err = statement.Execute(ctx, tx, h.projection.Name()); err != nil { h.log().WithError(err).Error("statement execution failed") _, rollbackErr := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT exec_stmt") diff --git a/internal/eventstore/handler/v2/init.go b/internal/eventstore/handler/v2/init.go index ead1c806d0..f797734644 100644 --- a/internal/eventstore/handler/v2/init.go +++ b/internal/eventstore/handler/v2/init.go @@ -200,7 +200,7 @@ func (h *Handler) Init(ctx context.Context) error { } for i, execute := range check.Init().Executes { logging.WithFields("projection", h.projection.Name(), "execute", i).Debug("executing check") - next, err := execute(tx, h.projection.Name()) + next, err := execute(ctx, tx, h.projection.Name()) if err != nil { logging.OnError(tx.Rollback()).Debug("unable to rollback") return err @@ -218,7 +218,7 @@ func NewTableCheck(table *Table, opts ...execOption) *handler.Check { create := func(config execConfig) string { return createTableStatement(table, config.tableName, "") } - executes := make([]func(handler.Executer, string) (bool, error), len(table.indices)+1) + executes := make([]func(context.Context, handler.Executer, string) (bool, error), len(table.indices)+1) executes[0] = execNextIfExists(config, create, opts, true) for i, index := range table.indices { executes[i+1] = execNextIfExists(config, createIndexCheck(index), opts, true) @@ -239,7 +239,7 @@ func NewMultiTableCheck(primaryTable *Table, secondaryTables ...*SuffixedTable) } return &handler.Check{ - Executes: []func(handler.Executer, string) (bool, error){ + Executes: []func(context.Context, handler.Executer, string) (bool, error){ execNextIfExists(config, create, nil, true), }, } @@ -257,14 +257,14 @@ func NewViewCheck(selectStmt string, secondaryTables ...*SuffixedTable) *handler } return &handler.Check{ - Executes: []func(handler.Executer, string) (bool, error){ + Executes: []func(context.Context, handler.Executer, string) (bool, error){ execNextIfExists(config, create, nil, false), }, } } -func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(handler.Executer, string) (bool, error) { - return func(handler handler.Executer, name string) (shouldExecuteNext bool, err error) { +func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(ctx context.Context, handler handler.Executer, name string) (bool, error) { + return func(ctx context.Context, handler handler.Executer, name string) (shouldExecuteNext bool, err error) { _, err = handler.Exec("SAVEPOINT exec_stmt") if err != nil { return false, zerrors.ThrowInternal(err, "V2-U1wlz", "create savepoint failed") @@ -280,7 +280,7 @@ func execNextIfExists(config execConfig, q query, opts []execOption, executeNext return } }() - err = exec(config, q, opts)(handler, name) + err = exec(config, q, opts)(ctx, handler, name) return false, err } } diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index 5024c8c945..e584160287 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -1,6 +1,7 @@ package handler import ( + "context" "database/sql" "encoding/json" "errors" @@ -91,7 +92,7 @@ type Statement struct { Execute Exec } -type Exec func(ex Executer, projectionName string) error +type Exec func(ctx context.Context, ex Executer, projectionName string) error func WithTableSuffix(name string) func(*execConfig) { return func(o *execConfig) { @@ -670,7 +671,7 @@ type execConfig struct { type query func(config execConfig) string func exec(config execConfig, q query, opts []execOption) Exec { - return func(ex Executer, projectionName string) (err error) { + return func(ctx context.Context, ex Executer, projectionName string) (err error) { if projectionName == "" { return ErrNoProjection } @@ -694,12 +695,12 @@ func exec(config execConfig, q query, opts []execOption) Exec { } func multiExec(execList []Exec) Exec { - return func(ex Executer, projectionName string) error { + return func(ctx context.Context, ex Executer, projectionName string) error { for _, exec := range execList { if exec == nil { continue } - if err := exec(ex, projectionName); err != nil { + if err := exec(ctx, ex, projectionName); err != nil { return err } } diff --git a/internal/eventstore/handler/v2/statement_test.go b/internal/eventstore/handler/v2/statement_test.go index 787ec105e1..8384029ec8 100644 --- a/internal/eventstore/handler/v2/statement_test.go +++ b/internal/eventstore/handler/v2/statement_test.go @@ -1,6 +1,7 @@ package handler import ( + "context" "database/sql" "errors" "reflect" @@ -197,7 +198,7 @@ func TestNewCreateStatement(t *testing.T) { tt.want.executer.t = t stmt := NewCreateStatement(tt.args.event, tt.args.values) - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -506,7 +507,7 @@ func TestNewUpsertStatement(t *testing.T) { tt.want.executer.t = t stmt := NewUpsertStatement(tt.args.event, tt.args.conflictCols, tt.args.values) - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -710,7 +711,7 @@ func TestNewUpdateStatement(t *testing.T) { tt.want.executer.t = t stmt := NewUpdateStatement(tt.args.event, tt.args.values, tt.args.conditions) - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -827,7 +828,7 @@ func TestNewDeleteStatement(t *testing.T) { tt.want.executer.t = t stmt := NewDeleteStatement(tt.args.event, tt.args.conditions) - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -878,7 +879,7 @@ func TestNewNoOpStatement(t *testing.T) { return } tt.want.executer.t = t - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -1054,7 +1055,7 @@ func TestNewMultiStatement(t *testing.T) { return } tt.want.executer.t = t - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -1338,7 +1339,7 @@ func TestNewCopyStatement(t *testing.T) { tt.want.executer.t = t stmt := NewCopyStatement(tt.args.event, tt.args.conflictingCols, tt.args.from, tt.args.to, tt.args.conds) - err := stmt.Execute(tt.want.executer, tt.args.table) + err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table) if !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } @@ -1349,7 +1350,7 @@ func TestNewCopyStatement(t *testing.T) { func TestStatement_Execute(t *testing.T) { type fields struct { - execute func(ex Executer, projectionName string) error + execute func(ctx context.Context, ex Executer, projectionName string) error } type want struct { isErr func(error) bool @@ -1366,7 +1367,7 @@ func TestStatement_Execute(t *testing.T) { { name: "execute returns no error", fields: fields{ - execute: func(ex Executer, projectionName string) error { return nil }, + execute: func(ctx context.Context, ex Executer, projectionName string) error { return nil }, }, args: args{ projectionName: "my_projection", @@ -1383,7 +1384,7 @@ func TestStatement_Execute(t *testing.T) { projectionName: "my_projection", }, fields: fields{ - execute: func(ex Executer, projectionName string) error { return errTest }, + execute: func(ctx context.Context, ex Executer, projectionName string) error { return errTest }, }, want: want{ isErr: func(err error) bool { @@ -1397,7 +1398,7 @@ func TestStatement_Execute(t *testing.T) { stmt := &Statement{ Execute: tt.fields.execute, } - if err := stmt.Execute(nil, tt.args.projectionName); !tt.want.isErr(err) { + if err := stmt.Execute(t.Context(), nil, tt.args.projectionName); !tt.want.isErr(err) { t.Errorf("unexpected error: %v", err) } }) diff --git a/internal/execution/ctx.go b/internal/execution/ctx.go index 9e6bac3e30..d63fa13e5d 100644 --- a/internal/execution/ctx.go +++ b/internal/execution/ctx.go @@ -9,8 +9,8 @@ import ( const ExecutionUserID = "EXECUTION" -func HandlerContext(event *eventstore.Aggregate) context.Context { - ctx := authz.WithInstanceID(context.Background(), event.InstanceID) +func HandlerContext(parent context.Context, event *eventstore.Aggregate) context.Context { + ctx := authz.WithInstanceID(parent, event.InstanceID) return authz.SetCtxData(ctx, authz.CtxData{UserID: ExecutionUserID, OrgID: event.ResourceOwner}) } diff --git a/internal/execution/handlers.go b/internal/execution/handlers.go index 030e6d5186..1c27cb5920 100644 --- a/internal/execution/handlers.go +++ b/internal/execution/handlers.go @@ -113,7 +113,7 @@ func idsForEventType(eventType string) []string { } func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) { - ctx := HandlerContext(e.Aggregate()) + ctx := HandlerContext(context.Background(), e.Aggregate()) targets, err := u.query.TargetsByExecutionID(ctx, idsForEventType(string(e.Type()))) if err != nil { @@ -125,8 +125,8 @@ func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) { return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(e, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(e.Aggregate()) + return handler.NewStatement(e, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, e.Aggregate()) req, err := NewRequest(e, targets) if err != nil { return err diff --git a/internal/execution/handlers_test.go b/internal/execution/handlers_test.go index de220abcc0..a123947160 100644 --- a/internal/execution/handlers_test.go +++ b/internal/execution/handlers_test.go @@ -440,7 +440,7 @@ func TestActionProjection_reduces(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.stmtErr != nil { w.stmtErr(t, err) return diff --git a/internal/notification/handlers/back_channel_logout.go b/internal/notification/handlers/back_channel_logout.go index 983915ac28..12c2c708e9 100644 --- a/internal/notification/handlers/back_channel_logout.go +++ b/internal/notification/handlers/back_channel_logout.go @@ -95,8 +95,8 @@ func (u *backChannelLogoutNotifier) reduceUserSignedOut(event eventstore.Event) return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Gr63h", "reduce.wrong.event.type %s", user.HumanSignedOutType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx, err := u.queries.HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx, err := u.queries.HandlerContext(ctx, event.Aggregate()) if err != nil { return err } @@ -116,8 +116,8 @@ func (u *backChannelLogoutNotifier) reduceSessionTerminated(event eventstore.Eve return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-D6H2h", "reduce.wrong.event.type %s", session.TerminateType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx, err := u.queries.HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx, err := u.queries.HandlerContext(ctx, event.Aggregate()) if err != nil { return err } diff --git a/internal/notification/handlers/ctx.go b/internal/notification/handlers/ctx.go index b091a61cdd..2c2599b060 100644 --- a/internal/notification/handlers/ctx.go +++ b/internal/notification/handlers/ctx.go @@ -9,8 +9,8 @@ import ( const NotifyUserID = "NOTIFICATION" //TODO: system? -func HandlerContext(event *eventstore.Aggregate) context.Context { - ctx := authz.WithInstanceID(context.Background(), event.InstanceID) +func HandlerContext(parent context.Context, event *eventstore.Aggregate) context.Context { + ctx := authz.WithInstanceID(parent, event.InstanceID) return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}) } @@ -18,12 +18,11 @@ func ContextWithNotifier(ctx context.Context, aggregate *eventstore.Aggregate) c return authz.WithInstanceID(authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: aggregate.ResourceOwner}), aggregate.InstanceID) } -func (n *NotificationQueries) HandlerContext(event *eventstore.Aggregate) (context.Context, error) { - ctx := context.Background() - instance, err := n.InstanceByID(ctx, event.InstanceID) +func (n *NotificationQueries) HandlerContext(parent context.Context, event *eventstore.Aggregate) (context.Context, error) { + instance, err := n.InstanceByID(parent, event.InstanceID) if err != nil { return nil, err } - ctx = authz.WithInstance(ctx, instance) + ctx := authz.WithInstance(parent, instance) return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}), nil } diff --git a/internal/notification/handlers/quota_notifier.go b/internal/notification/handlers/quota_notifier.go index f308291243..365888959f 100644 --- a/internal/notification/handlers/quota_notifier.go +++ b/internal/notification/handlers/quota_notifier.go @@ -62,8 +62,8 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler. return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType) if err != nil { return err diff --git a/internal/notification/handlers/telemetry_pusher.go b/internal/notification/handlers/telemetry_pusher.go index 7e510a2b4c..2c32db61c1 100644 --- a/internal/notification/handlers/telemetry_pusher.go +++ b/internal/notification/handlers/telemetry_pusher.go @@ -5,7 +5,6 @@ import ( "net/http" "time" - "github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler/v2" @@ -69,12 +68,11 @@ func (t *telemetryPusher) Reducers() []handler.AggregateReducer { } func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) { - ctx := call.WithTimestamp(context.Background()) e, ok := event.(*milestone.ReachedEvent) if !ok { return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type()) } - return handler.NewStatement(event, func(handler.Executer, string) error { + return handler.NewStatement(event, func(ctx context.Context, _ handler.Executer, _ string) error { // Do not push the milestone again if this was a migration event. if e.ReachedDate != nil { return nil diff --git a/internal/notification/handlers/user_notifier.go b/internal/notification/handlers/user_notifier.go index f36f5d828c..6ca753caa9 100644 --- a/internal/notification/handlers/user_notifier.go +++ b/internal/notification/handlers/user_notifier.go @@ -203,8 +203,8 @@ func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Sta if !ok { return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType, user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType) @@ -253,8 +253,8 @@ func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.St return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType, user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType) @@ -309,8 +309,8 @@ func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType, user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType) @@ -362,8 +362,8 @@ func (u *userNotifier) reduceOTPSMSCodeAdded(event eventstore.Event) (*handler.S return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-ASF3g", "reduce.wrong.event.type %s", user.HumanOTPSMSCodeAddedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.HumanOTPSMSCodeAddedType, user.HumanOTPSMSCodeSentType) @@ -406,8 +406,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, session.OTPSMSChallengedType, session.OTPSMSSentType) @@ -455,8 +455,8 @@ func (u *userNotifier) reduceOTPEmailCodeAdded(event eventstore.Event) (*handler return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-JL3hw", "reduce.wrong.event.type %s", user.HumanOTPEmailCodeAddedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.HumanOTPEmailCodeAddedType, user.HumanOTPEmailCodeSentType) @@ -507,8 +507,8 @@ func (u *userNotifier) reduceSessionOTPEmailChallenged(event eventstore.Event) ( if e.ReturnCode { return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, session.OTPEmailChallengedType, session.OTPEmailSentType) @@ -573,8 +573,8 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta if !ok { return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.UserDomainClaimedType, user.UserDomainClaimedSentType) if err != nil { @@ -619,8 +619,8 @@ func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) ( return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType) if err != nil { return err @@ -668,8 +668,8 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) if err != nil { return err @@ -720,8 +720,8 @@ func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.St return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType, user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType) @@ -768,8 +768,8 @@ func (u *userNotifier) reduceInviteCodeAdded(event eventstore.Event) (*handler.S return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType) if err != nil { diff --git a/internal/notification/handlers/user_notifier_legacy.go b/internal/notification/handlers/user_notifier_legacy.go index 1921510bf3..146c60e10b 100644 --- a/internal/notification/handlers/user_notifier_legacy.go +++ b/internal/notification/handlers/user_notifier_legacy.go @@ -133,8 +133,8 @@ func (u *userNotifierLegacy) reduceInitCodeAdded(event eventstore.Event) (*handl return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType, user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType) @@ -194,8 +194,8 @@ func (u *userNotifierLegacy) reduceEmailCodeAdded(event eventstore.Event) (*hand return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType, user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType) @@ -254,8 +254,8 @@ func (u *userNotifierLegacy) reducePasswordCodeAdded(event eventstore.Event) (*h return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType, user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType) @@ -337,7 +337,7 @@ func (u *userNotifierLegacy) reduceSessionOTPSMSChallenged(event eventstore.Even if e.CodeReturned { return handler.NewNoOpStatement(e), nil } - ctx := HandlerContext(event.Aggregate()) + ctx := HandlerContext(context.Background(), event.Aggregate()) s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil) if err != nil { return nil, err @@ -363,7 +363,7 @@ func (u *userNotifierLegacy) reduceOTPSMS( sentCommand func(ctx context.Context, userID, resourceOwner string, generatorInfo *senders.CodeGeneratorInfo) (err error), eventTypes ...eventstore.EventType, ) (*handler.Statement, error) { - ctx := HandlerContext(event.Aggregate()) + ctx := HandlerContext(context.Background(), event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...) if err != nil { return nil, err @@ -445,7 +445,7 @@ func (u *userNotifierLegacy) reduceSessionOTPEmailChallenged(event eventstore.Ev if e.ReturnCode { return handler.NewNoOpStatement(e), nil } - ctx := HandlerContext(event.Aggregate()) + ctx := HandlerContext(context.Background(), event.Aggregate()) s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil) if err != nil { return nil, err @@ -484,7 +484,7 @@ func (u *userNotifierLegacy) reduceOTPEmail( sentCommand func(ctx context.Context, userID string, resourceOwner string) (err error), eventTypes ...eventstore.EventType, ) (*handler.Statement, error) { - ctx := HandlerContext(event.Aggregate()) + ctx := HandlerContext(context.Background(), event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...) if err != nil { return nil, err @@ -543,8 +543,8 @@ func (u *userNotifierLegacy) reduceDomainClaimed(event eventstore.Event) (*handl if !ok { return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.UserDomainClaimedType, user.UserDomainClaimedSentType) if err != nil { @@ -598,8 +598,8 @@ func (u *userNotifierLegacy) reducePasswordlessCodeRequested(event eventstore.Ev return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType) if err != nil { return err @@ -653,8 +653,8 @@ func (u *userNotifierLegacy) reducePasswordChanged(event eventstore.Event) (*han return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) if err != nil { return err @@ -719,8 +719,8 @@ func (u *userNotifierLegacy) reducePhoneCodeAdded(event eventstore.Event) (*hand return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType, user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType) @@ -777,8 +777,8 @@ func (u *userNotifierLegacy) reduceInviteCodeAdded(event eventstore.Event) (*han return handler.NewNoOpStatement(e), nil } - return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error { - ctx := HandlerContext(event.Aggregate()) + return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error { + ctx = HandlerContext(ctx, event.Aggregate()) alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil, user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType) if err != nil { diff --git a/internal/notification/handlers/user_notifier_legacy_test.go b/internal/notification/handlers/user_notifier_legacy_test.go index a4c24fd196..08461ee706 100644 --- a/internal/notification/handlers/user_notifier_legacy_test.go +++ b/internal/notification/handlers/user_notifier_legacy_test.go @@ -283,7 +283,7 @@ func Test_userNotifierLegacy_reduceInitCodeAdded(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -596,7 +596,7 @@ func Test_userNotifierLegacy_reduceEmailCodeAdded(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -949,7 +949,7 @@ func Test_userNotifierLegacy_reducePasswordCodeAdded(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1080,7 +1080,7 @@ func Test_userNotifierLegacy_reduceDomainClaimed(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1355,7 +1355,7 @@ func Test_userNotifierLegacy_reducePasswordlessCodeRequested(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1495,7 +1495,7 @@ func Test_userNotifierLegacy_reducePasswordChanged(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { diff --git a/internal/notification/handlers/user_notifier_test.go b/internal/notification/handlers/user_notifier_test.go index f7090f0146..874fbdf9af 100644 --- a/internal/notification/handlers/user_notifier_test.go +++ b/internal/notification/handlers/user_notifier_test.go @@ -188,7 +188,7 @@ func Test_userNotifier_reduceInitCodeAdded(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -366,7 +366,7 @@ func Test_userNotifier_reduceEmailCodeAdded(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -601,7 +601,7 @@ func Test_userNotifier_reducePasswordCodeAdded(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -731,7 +731,7 @@ func Test_userNotifier_reduceDomainClaimed(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -906,7 +906,7 @@ func Test_userNotifier_reducePasswordlessCodeRequested(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1066,7 +1066,7 @@ func Test_userNotifier_reducePasswordChanged(t *testing.T) { } else { assert.NoError(t, err) } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1329,7 +1329,7 @@ func Test_userNotifier_reduceOTPEmailChallenged(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1590,7 +1590,7 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { @@ -1886,7 +1886,7 @@ func Test_userNotifier_reduceInviteCodeAdded(t *testing.T) { assert.Nil(t, stmt.Execute) return } - err = stmt.Execute(nil, "") + err = stmt.Execute(t.Context(), nil, "") if w.err != nil { w.err(t, err) } else { diff --git a/internal/query/projection/event_test.go b/internal/query/projection/event_test.go index 073f34c688..317efe817e 100644 --- a/internal/query/projection/event_test.go +++ b/internal/query/projection/event_test.go @@ -99,7 +99,7 @@ func assertReduce(t *testing.T, stmt *handler.Statement, err error, projection s want.executer.Validate(t) return } - err = stmt.Execute(want.executer, projection) + err = stmt.Execute(t.Context(), want.executer, projection) if err != nil { t.Errorf("unexpected error: %v", err) }