mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 13:07:46 +00:00
Merge branch 'fix(handler)--pass-context-to-statement-execution-method' into rt-domains
This commit is contained in:
@@ -185,7 +185,7 @@ func (s *Styling) Reducers() []handler.AggregateReducer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Styling) processLabelPolicy(event eventstore.Event) (_ *handler.Statement, err error) {
|
func (m *Styling) processLabelPolicy(event eventstore.Event) (_ *handler.Statement, err error) {
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
policy := new(iam_model.LabelPolicyView)
|
policy := new(iam_model.LabelPolicyView)
|
||||||
switch event.Type() {
|
switch event.Type() {
|
||||||
case instance.LabelPolicyAddedEventType,
|
case instance.LabelPolicyAddedEventType,
|
||||||
|
@@ -6,7 +6,7 @@ import "context"
|
|||||||
type Init func(context.Context, *Check) error
|
type Init func(context.Context, *Check) error
|
||||||
|
|
||||||
type Check struct {
|
type Check struct {
|
||||||
Executes []func(ex Executer, projectionName string) (bool, error)
|
Executes []func(ctx context.Context, executer Executer, projectionName string) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Check) IsNoop() bool {
|
func (c *Check) IsNoop() bool {
|
||||||
|
@@ -646,7 +646,7 @@ func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, statements
|
|||||||
for i, statement := range statements {
|
for i, statement := range statements {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break
|
return lastProcessedIndex, nil
|
||||||
default:
|
default:
|
||||||
err := h.executeStatement(ctx, tx, statement)
|
err := h.executeStatement(ctx, tx, statement)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -669,7 +669,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = statement.Execute(tx, h.projection.Name()); err != nil {
|
if err = statement.Execute(ctx, tx, h.projection.Name()); err != nil {
|
||||||
h.log().WithError(err).Error("statement execution failed")
|
h.log().WithError(err).Error("statement execution failed")
|
||||||
|
|
||||||
_, rollbackErr := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT exec_stmt")
|
_, rollbackErr := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT exec_stmt")
|
||||||
|
@@ -200,7 +200,7 @@ func (h *Handler) Init(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
for i, execute := range check.Init().Executes {
|
for i, execute := range check.Init().Executes {
|
||||||
logging.WithFields("projection", h.projection.Name(), "execute", i).Debug("executing check")
|
logging.WithFields("projection", h.projection.Name(), "execute", i).Debug("executing check")
|
||||||
next, err := execute(tx, h.projection.Name())
|
next, err := execute(ctx, tx, h.projection.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logging.OnError(tx.Rollback()).Debug("unable to rollback")
|
logging.OnError(tx.Rollback()).Debug("unable to rollback")
|
||||||
return err
|
return err
|
||||||
@@ -218,7 +218,7 @@ func NewTableCheck(table *Table, opts ...execOption) *handler.Check {
|
|||||||
create := func(config execConfig) string {
|
create := func(config execConfig) string {
|
||||||
return createTableStatement(table, config.tableName, "")
|
return createTableStatement(table, config.tableName, "")
|
||||||
}
|
}
|
||||||
executes := make([]func(handler.Executer, string) (bool, error), len(table.indices)+1)
|
executes := make([]func(context.Context, handler.Executer, string) (bool, error), len(table.indices)+1)
|
||||||
executes[0] = execNextIfExists(config, create, opts, true)
|
executes[0] = execNextIfExists(config, create, opts, true)
|
||||||
for i, index := range table.indices {
|
for i, index := range table.indices {
|
||||||
executes[i+1] = execNextIfExists(config, createIndexCheck(index), opts, true)
|
executes[i+1] = execNextIfExists(config, createIndexCheck(index), opts, true)
|
||||||
@@ -239,7 +239,7 @@ func NewMultiTableCheck(primaryTable *Table, secondaryTables ...*SuffixedTable)
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &handler.Check{
|
return &handler.Check{
|
||||||
Executes: []func(handler.Executer, string) (bool, error){
|
Executes: []func(context.Context, handler.Executer, string) (bool, error){
|
||||||
execNextIfExists(config, create, nil, true),
|
execNextIfExists(config, create, nil, true),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -257,14 +257,14 @@ func NewViewCheck(selectStmt string, secondaryTables ...*SuffixedTable) *handler
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &handler.Check{
|
return &handler.Check{
|
||||||
Executes: []func(handler.Executer, string) (bool, error){
|
Executes: []func(context.Context, handler.Executer, string) (bool, error){
|
||||||
execNextIfExists(config, create, nil, false),
|
execNextIfExists(config, create, nil, false),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(handler.Executer, string) (bool, error) {
|
func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(ctx context.Context, handler handler.Executer, name string) (bool, error) {
|
||||||
return func(handler handler.Executer, name string) (shouldExecuteNext bool, err error) {
|
return func(ctx context.Context, handler handler.Executer, name string) (shouldExecuteNext bool, err error) {
|
||||||
_, err = handler.Exec("SAVEPOINT exec_stmt")
|
_, err = handler.Exec("SAVEPOINT exec_stmt")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, zerrors.ThrowInternal(err, "V2-U1wlz", "create savepoint failed")
|
return false, zerrors.ThrowInternal(err, "V2-U1wlz", "create savepoint failed")
|
||||||
@@ -280,7 +280,7 @@ func execNextIfExists(config execConfig, q query, opts []execOption, executeNext
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
err = exec(config, q, opts)(handler, name)
|
err = exec(config, q, opts)(ctx, handler, name)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -91,7 +92,7 @@ type Statement struct {
|
|||||||
Execute Exec
|
Execute Exec
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exec func(ex Executer, projectionName string) error
|
type Exec func(ctx context.Context, ex Executer, projectionName string) error
|
||||||
|
|
||||||
func WithTableSuffix(name string) func(*execConfig) {
|
func WithTableSuffix(name string) func(*execConfig) {
|
||||||
return func(o *execConfig) {
|
return func(o *execConfig) {
|
||||||
@@ -670,7 +671,7 @@ type execConfig struct {
|
|||||||
type query func(config execConfig) string
|
type query func(config execConfig) string
|
||||||
|
|
||||||
func exec(config execConfig, q query, opts []execOption) Exec {
|
func exec(config execConfig, q query, opts []execOption) Exec {
|
||||||
return func(ex Executer, projectionName string) (err error) {
|
return func(ctx context.Context, ex Executer, projectionName string) (err error) {
|
||||||
if projectionName == "" {
|
if projectionName == "" {
|
||||||
return ErrNoProjection
|
return ErrNoProjection
|
||||||
}
|
}
|
||||||
@@ -694,12 +695,12 @@ func exec(config execConfig, q query, opts []execOption) Exec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func multiExec(execList []Exec) Exec {
|
func multiExec(execList []Exec) Exec {
|
||||||
return func(ex Executer, projectionName string) error {
|
return func(ctx context.Context, ex Executer, projectionName string) error {
|
||||||
for _, exec := range execList {
|
for _, exec := range execList {
|
||||||
if exec == nil {
|
if exec == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := exec(ex, projectionName); err != nil {
|
if err := exec(ctx, ex, projectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
@@ -197,7 +198,7 @@ func TestNewCreateStatement(t *testing.T) {
|
|||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
stmt := NewCreateStatement(tt.args.event, tt.args.values)
|
stmt := NewCreateStatement(tt.args.event, tt.args.values)
|
||||||
|
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -506,7 +507,7 @@ func TestNewUpsertStatement(t *testing.T) {
|
|||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
stmt := NewUpsertStatement(tt.args.event, tt.args.conflictCols, tt.args.values)
|
stmt := NewUpsertStatement(tt.args.event, tt.args.conflictCols, tt.args.values)
|
||||||
|
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -710,7 +711,7 @@ func TestNewUpdateStatement(t *testing.T) {
|
|||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
stmt := NewUpdateStatement(tt.args.event, tt.args.values, tt.args.conditions)
|
stmt := NewUpdateStatement(tt.args.event, tt.args.values, tt.args.conditions)
|
||||||
|
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -827,7 +828,7 @@ func TestNewDeleteStatement(t *testing.T) {
|
|||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
stmt := NewDeleteStatement(tt.args.event, tt.args.conditions)
|
stmt := NewDeleteStatement(tt.args.event, tt.args.conditions)
|
||||||
|
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -878,7 +879,7 @@ func TestNewNoOpStatement(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -1054,7 +1055,7 @@ func TestNewMultiStatement(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -1338,7 +1339,7 @@ func TestNewCopyStatement(t *testing.T) {
|
|||||||
tt.want.executer.t = t
|
tt.want.executer.t = t
|
||||||
stmt := NewCopyStatement(tt.args.event, tt.args.conflictingCols, tt.args.from, tt.args.to, tt.args.conds)
|
stmt := NewCopyStatement(tt.args.event, tt.args.conflictingCols, tt.args.from, tt.args.to, tt.args.conds)
|
||||||
|
|
||||||
err := stmt.Execute(tt.want.executer, tt.args.table)
|
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
|
||||||
if !tt.want.isErr(err) {
|
if !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -1349,7 +1350,7 @@ func TestNewCopyStatement(t *testing.T) {
|
|||||||
|
|
||||||
func TestStatement_Execute(t *testing.T) {
|
func TestStatement_Execute(t *testing.T) {
|
||||||
type fields struct {
|
type fields struct {
|
||||||
execute func(ex Executer, projectionName string) error
|
execute func(ctx context.Context, ex Executer, projectionName string) error
|
||||||
}
|
}
|
||||||
type want struct {
|
type want struct {
|
||||||
isErr func(error) bool
|
isErr func(error) bool
|
||||||
@@ -1366,7 +1367,7 @@ func TestStatement_Execute(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "execute returns no error",
|
name: "execute returns no error",
|
||||||
fields: fields{
|
fields: fields{
|
||||||
execute: func(ex Executer, projectionName string) error { return nil },
|
execute: func(ctx context.Context, ex Executer, projectionName string) error { return nil },
|
||||||
},
|
},
|
||||||
args: args{
|
args: args{
|
||||||
projectionName: "my_projection",
|
projectionName: "my_projection",
|
||||||
@@ -1383,7 +1384,7 @@ func TestStatement_Execute(t *testing.T) {
|
|||||||
projectionName: "my_projection",
|
projectionName: "my_projection",
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
execute: func(ex Executer, projectionName string) error { return errTest },
|
execute: func(ctx context.Context, ex Executer, projectionName string) error { return errTest },
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
isErr: func(err error) bool {
|
isErr: func(err error) bool {
|
||||||
@@ -1397,7 +1398,7 @@ func TestStatement_Execute(t *testing.T) {
|
|||||||
stmt := &Statement{
|
stmt := &Statement{
|
||||||
Execute: tt.fields.execute,
|
Execute: tt.fields.execute,
|
||||||
}
|
}
|
||||||
if err := stmt.Execute(nil, tt.args.projectionName); !tt.want.isErr(err) {
|
if err := stmt.Execute(t.Context(), nil, tt.args.projectionName); !tt.want.isErr(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
const ExecutionUserID = "EXECUTION"
|
const ExecutionUserID = "EXECUTION"
|
||||||
|
|
||||||
func HandlerContext(event *eventstore.Aggregate) context.Context {
|
func HandlerContext(parent context.Context, event *eventstore.Aggregate) context.Context {
|
||||||
ctx := authz.WithInstanceID(context.Background(), event.InstanceID)
|
ctx := authz.WithInstanceID(parent, event.InstanceID)
|
||||||
return authz.SetCtxData(ctx, authz.CtxData{UserID: ExecutionUserID, OrgID: event.ResourceOwner})
|
return authz.SetCtxData(ctx, authz.CtxData{UserID: ExecutionUserID, OrgID: event.ResourceOwner})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -113,7 +113,7 @@ func idsForEventType(eventType string) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) {
|
func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) {
|
||||||
ctx := HandlerContext(e.Aggregate())
|
ctx := HandlerContext(context.Background(), e.Aggregate())
|
||||||
|
|
||||||
targets, err := u.query.TargetsByExecutionID(ctx, idsForEventType(string(e.Type())))
|
targets, err := u.query.TargetsByExecutionID(ctx, idsForEventType(string(e.Type())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -125,8 +125,8 @@ func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) {
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(e, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(e, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(e.Aggregate())
|
ctx = HandlerContext(ctx, e.Aggregate())
|
||||||
req, err := NewRequest(e, targets)
|
req, err := NewRequest(e, targets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -440,7 +440,7 @@ func TestActionProjection_reduces(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.stmtErr != nil {
|
if w.stmtErr != nil {
|
||||||
w.stmtErr(t, err)
|
w.stmtErr(t, err)
|
||||||
return
|
return
|
||||||
|
@@ -95,8 +95,8 @@ func (u *backChannelLogoutNotifier) reduceUserSignedOut(event eventstore.Event)
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Gr63h", "reduce.wrong.event.type %s", user.HumanSignedOutType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Gr63h", "reduce.wrong.event.type %s", user.HumanSignedOutType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx, err := u.queries.HandlerContext(event.Aggregate())
|
ctx, err := u.queries.HandlerContext(ctx, event.Aggregate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -116,8 +116,8 @@ func (u *backChannelLogoutNotifier) reduceSessionTerminated(event eventstore.Eve
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-D6H2h", "reduce.wrong.event.type %s", session.TerminateType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-D6H2h", "reduce.wrong.event.type %s", session.TerminateType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx, err := u.queries.HandlerContext(event.Aggregate())
|
ctx, err := u.queries.HandlerContext(ctx, event.Aggregate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
const NotifyUserID = "NOTIFICATION" //TODO: system?
|
const NotifyUserID = "NOTIFICATION" //TODO: system?
|
||||||
|
|
||||||
func HandlerContext(event *eventstore.Aggregate) context.Context {
|
func HandlerContext(parent context.Context, event *eventstore.Aggregate) context.Context {
|
||||||
ctx := authz.WithInstanceID(context.Background(), event.InstanceID)
|
ctx := authz.WithInstanceID(parent, event.InstanceID)
|
||||||
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner})
|
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -18,12 +18,11 @@ func ContextWithNotifier(ctx context.Context, aggregate *eventstore.Aggregate) c
|
|||||||
return authz.WithInstanceID(authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: aggregate.ResourceOwner}), aggregate.InstanceID)
|
return authz.WithInstanceID(authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: aggregate.ResourceOwner}), aggregate.InstanceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NotificationQueries) HandlerContext(event *eventstore.Aggregate) (context.Context, error) {
|
func (n *NotificationQueries) HandlerContext(parent context.Context, event *eventstore.Aggregate) (context.Context, error) {
|
||||||
ctx := context.Background()
|
instance, err := n.InstanceByID(parent, event.InstanceID)
|
||||||
instance, err := n.InstanceByID(ctx, event.InstanceID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ctx = authz.WithInstance(ctx, instance)
|
ctx := authz.WithInstance(parent, instance)
|
||||||
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}), nil
|
return authz.SetCtxData(ctx, authz.CtxData{UserID: NotifyUserID, OrgID: event.ResourceOwner}), nil
|
||||||
}
|
}
|
||||||
|
@@ -62,8 +62,8 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler.
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType)
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -5,7 +5,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/api/call"
|
|
||||||
"github.com/zitadel/zitadel/internal/command"
|
"github.com/zitadel/zitadel/internal/command"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
||||||
@@ -69,12 +68,11 @@ func (t *telemetryPusher) Reducers() []handler.AggregateReducer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
|
func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
|
||||||
ctx := call.WithTimestamp(context.Background())
|
|
||||||
e, ok := event.(*milestone.ReachedEvent)
|
e, ok := event.(*milestone.ReachedEvent)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type())
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type())
|
||||||
}
|
}
|
||||||
return handler.NewStatement(event, func(handler.Executer, string) error {
|
return handler.NewStatement(event, func(ctx context.Context, _ handler.Executer, _ string) error {
|
||||||
// Do not push the milestone again if this was a migration event.
|
// Do not push the milestone again if this was a migration event.
|
||||||
if e.ReachedDate != nil {
|
if e.ReachedDate != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@@ -203,8 +203,8 @@ func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Sta
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
|
||||||
}
|
}
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
|
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
|
||||||
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
|
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
|
||||||
@@ -253,8 +253,8 @@ func (u *userNotifier) reduceEmailCodeAdded(event eventstore.Event) (*handler.St
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
|
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
|
||||||
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
|
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
|
||||||
@@ -309,8 +309,8 @@ func (u *userNotifier) reducePasswordCodeAdded(event eventstore.Event) (*handler
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
|
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
|
||||||
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
|
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
|
||||||
@@ -362,8 +362,8 @@ func (u *userNotifier) reduceOTPSMSCodeAdded(event eventstore.Event) (*handler.S
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-ASF3g", "reduce.wrong.event.type %s", user.HumanOTPSMSCodeAddedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-ASF3g", "reduce.wrong.event.type %s", user.HumanOTPSMSCodeAddedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.HumanOTPSMSCodeAddedType,
|
user.HumanOTPSMSCodeAddedType,
|
||||||
user.HumanOTPSMSCodeSentType)
|
user.HumanOTPSMSCodeSentType)
|
||||||
@@ -406,8 +406,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
session.OTPSMSChallengedType,
|
session.OTPSMSChallengedType,
|
||||||
session.OTPSMSSentType)
|
session.OTPSMSSentType)
|
||||||
@@ -455,8 +455,8 @@ func (u *userNotifier) reduceOTPEmailCodeAdded(event eventstore.Event) (*handler
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-JL3hw", "reduce.wrong.event.type %s", user.HumanOTPEmailCodeAddedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-JL3hw", "reduce.wrong.event.type %s", user.HumanOTPEmailCodeAddedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.HumanOTPEmailCodeAddedType,
|
user.HumanOTPEmailCodeAddedType,
|
||||||
user.HumanOTPEmailCodeSentType)
|
user.HumanOTPEmailCodeSentType)
|
||||||
@@ -507,8 +507,8 @@ func (u *userNotifier) reduceSessionOTPEmailChallenged(event eventstore.Event) (
|
|||||||
if e.ReturnCode {
|
if e.ReturnCode {
|
||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
session.OTPEmailChallengedType,
|
session.OTPEmailChallengedType,
|
||||||
session.OTPEmailSentType)
|
session.OTPEmailSentType)
|
||||||
@@ -573,8 +573,8 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
||||||
}
|
}
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
||||||
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -619,8 +619,8 @@ func (u *userNotifier) reducePasswordlessCodeRequested(event eventstore.Event) (
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -668,8 +668,8 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -720,8 +720,8 @@ func (u *userNotifier) reducePhoneCodeAdded(event eventstore.Event) (*handler.St
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
|
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
|
||||||
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
|
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
|
||||||
@@ -768,8 +768,8 @@ func (u *userNotifier) reduceInviteCodeAdded(event eventstore.Event) (*handler.S
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType)
|
user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -133,8 +133,8 @@ func (u *userNotifierLegacy) reduceInitCodeAdded(event eventstore.Event) (*handl
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-EFe2f", "reduce.wrong.event.type %s", user.HumanInitialCodeAddedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
|
user.UserV1InitialCodeAddedType, user.UserV1InitialCodeSentType,
|
||||||
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
|
user.HumanInitialCodeAddedType, user.HumanInitialCodeSentType)
|
||||||
@@ -194,8 +194,8 @@ func (u *userNotifierLegacy) reduceEmailCodeAdded(event eventstore.Event) (*hand
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
|
user.UserV1EmailCodeAddedType, user.UserV1EmailCodeSentType,
|
||||||
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
|
user.HumanEmailCodeAddedType, user.HumanEmailCodeSentType)
|
||||||
@@ -254,8 +254,8 @@ func (u *userNotifierLegacy) reducePasswordCodeAdded(event eventstore.Event) (*h
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
|
user.UserV1PasswordCodeAddedType, user.UserV1PasswordCodeSentType,
|
||||||
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
|
user.HumanPasswordCodeAddedType, user.HumanPasswordCodeSentType)
|
||||||
@@ -337,7 +337,7 @@ func (u *userNotifierLegacy) reduceSessionOTPSMSChallenged(event eventstore.Even
|
|||||||
if e.CodeReturned {
|
if e.CodeReturned {
|
||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx := HandlerContext(context.Background(), event.Aggregate())
|
||||||
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -363,7 +363,7 @@ func (u *userNotifierLegacy) reduceOTPSMS(
|
|||||||
sentCommand func(ctx context.Context, userID, resourceOwner string, generatorInfo *senders.CodeGeneratorInfo) (err error),
|
sentCommand func(ctx context.Context, userID, resourceOwner string, generatorInfo *senders.CodeGeneratorInfo) (err error),
|
||||||
eventTypes ...eventstore.EventType,
|
eventTypes ...eventstore.EventType,
|
||||||
) (*handler.Statement, error) {
|
) (*handler.Statement, error) {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx := HandlerContext(context.Background(), event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...)
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -445,7 +445,7 @@ func (u *userNotifierLegacy) reduceSessionOTPEmailChallenged(event eventstore.Ev
|
|||||||
if e.ReturnCode {
|
if e.ReturnCode {
|
||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx := HandlerContext(context.Background(), event.Aggregate())
|
||||||
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -484,7 +484,7 @@ func (u *userNotifierLegacy) reduceOTPEmail(
|
|||||||
sentCommand func(ctx context.Context, userID string, resourceOwner string) (err error),
|
sentCommand func(ctx context.Context, userID string, resourceOwner string) (err error),
|
||||||
eventTypes ...eventstore.EventType,
|
eventTypes ...eventstore.EventType,
|
||||||
) (*handler.Statement, error) {
|
) (*handler.Statement, error) {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx := HandlerContext(context.Background(), event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...)
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, expiry, nil, eventTypes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -543,8 +543,8 @@ func (u *userNotifierLegacy) reduceDomainClaimed(event eventstore.Event) (*handl
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
||||||
}
|
}
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
||||||
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -598,8 +598,8 @@ func (u *userNotifierLegacy) reducePasswordlessCodeRequested(event eventstore.Ev
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, map[string]interface{}{"id": e.ID}, user.HumanPasswordlessInitCodeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -653,8 +653,8 @@ func (u *userNotifierLegacy) reducePasswordChanged(event eventstore.Event) (*han
|
|||||||
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -719,8 +719,8 @@ func (u *userNotifierLegacy) reducePhoneCodeAdded(event eventstore.Event) (*hand
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
|
user.UserV1PhoneCodeAddedType, user.UserV1PhoneCodeSentType,
|
||||||
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
|
user.HumanPhoneCodeAddedType, user.HumanPhoneCodeSentType)
|
||||||
@@ -777,8 +777,8 @@ func (u *userNotifierLegacy) reduceInviteCodeAdded(event eventstore.Event) (*han
|
|||||||
return handler.NewNoOpStatement(e), nil
|
return handler.NewNoOpStatement(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return handler.NewStatement(event, func(ex handler.Executer, projectionName string) error {
|
return handler.NewStatement(event, func(ctx context.Context, ex handler.Executer, projectionName string) error {
|
||||||
ctx := HandlerContext(event.Aggregate())
|
ctx = HandlerContext(ctx, event.Aggregate())
|
||||||
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
alreadyHandled, err := u.checkIfCodeAlreadyHandledOrExpired(ctx, event, e.Expiry, nil,
|
||||||
user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType)
|
user.HumanInviteCodeAddedType, user.HumanInviteCodeSentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -283,7 +283,7 @@ func Test_userNotifierLegacy_reduceInitCodeAdded(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -596,7 +596,7 @@ func Test_userNotifierLegacy_reduceEmailCodeAdded(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -949,7 +949,7 @@ func Test_userNotifierLegacy_reducePasswordCodeAdded(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1080,7 +1080,7 @@ func Test_userNotifierLegacy_reduceDomainClaimed(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1355,7 +1355,7 @@ func Test_userNotifierLegacy_reducePasswordlessCodeRequested(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1495,7 +1495,7 @@ func Test_userNotifierLegacy_reducePasswordChanged(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
@@ -188,7 +188,7 @@ func Test_userNotifier_reduceInitCodeAdded(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -366,7 +366,7 @@ func Test_userNotifier_reduceEmailCodeAdded(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -601,7 +601,7 @@ func Test_userNotifier_reducePasswordCodeAdded(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -731,7 +731,7 @@ func Test_userNotifier_reduceDomainClaimed(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -906,7 +906,7 @@ func Test_userNotifier_reducePasswordlessCodeRequested(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1066,7 +1066,7 @@ func Test_userNotifier_reducePasswordChanged(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1329,7 +1329,7 @@ func Test_userNotifier_reduceOTPEmailChallenged(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1590,7 +1590,7 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1886,7 +1886,7 @@ func Test_userNotifier_reduceInviteCodeAdded(t *testing.T) {
|
|||||||
assert.Nil(t, stmt.Execute)
|
assert.Nil(t, stmt.Execute)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(nil, "")
|
err = stmt.Execute(t.Context(), nil, "")
|
||||||
if w.err != nil {
|
if w.err != nil {
|
||||||
w.err(t, err)
|
w.err(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
@@ -99,7 +99,7 @@ func assertReduce(t *testing.T, stmt *handler.Statement, err error, projection s
|
|||||||
want.executer.Validate(t)
|
want.executer.Validate(t)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = stmt.Execute(want.executer, projection)
|
err = stmt.Execute(t.Context(), want.executer, projection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user