From 1ac1492fd335899f34c2368a5737e087d90d8b15 Mon Sep 17 00:00:00 2001 From: Livio Amstutz Date: Wed, 8 Sep 2021 13:54:31 +0200 Subject: [PATCH] fix: handle multiple statements for a single event in projections (#2313) * fix: handle multiple statements for a single event in projections * export func type * fix test * Update internal/eventstore/handler/crdb/statement.go Co-authored-by: Silvan * Update internal/eventstore/handler/crdb/statement.go Co-authored-by: Silvan * change to pointers * add error test case Co-authored-by: Silvan --- .../eventstore/handler/crdb/failed_stmt.go | 2 +- .../eventstore/handler/crdb/handler_stmt.go | 14 +- .../handler/crdb/handler_stmt_test.go | 69 ++++-- internal/eventstore/handler/crdb/reduce.go | 4 +- internal/eventstore/handler/crdb/statement.go | 75 +++++- .../eventstore/handler/crdb/statement_test.go | 227 ++++++++++++++++-- .../eventstore/handler/handler_projection.go | 10 +- .../handler/handler_projection_test.go | 76 +++--- internal/query/projection/org.go | 124 +++++----- .../query/projection/org/owner/projection.go | 128 +++++----- internal/query/projection/project.go | 118 +++++---- 11 files changed, 546 insertions(+), 301 deletions(-) diff --git a/internal/eventstore/handler/crdb/failed_stmt.go b/internal/eventstore/handler/crdb/failed_stmt.go index 2f41170a24..c509f8e95a 100644 --- a/internal/eventstore/handler/crdb/failed_stmt.go +++ b/internal/eventstore/handler/crdb/failed_stmt.go @@ -20,7 +20,7 @@ const ( ") AS failure_count" ) -func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt handler.Statement, execErr error) (shouldContinue bool) { +func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt *handler.Statement, execErr error) (shouldContinue bool) { failureCount, err := h.failureCount(tx, stmt.Sequence) if err != nil { logging.LogWithFields("CRDB-WJaFk", "projection", h.ProjectionName, "seq", stmt.Sequence).WithError(err).Warn("unable to get failure count") diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index f278c7fe7f..916424aad6 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -116,7 +116,7 @@ func (h *StatementHandler) SearchQuery() (*eventstore.SearchQueryBuilder, uint64 } //Update implements handler.Update -func (h *StatementHandler) Update(ctx context.Context, stmts []handler.Statement, reduce handler.Reduce) (unexecutedStmts []handler.Statement, err error) { +func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statement, reduce handler.Reduce) (unexecutedStmts []*handler.Statement, err error) { tx, err := h.client.BeginTx(ctx, nil) if err != nil { return stmts, errors.ThrowInternal(err, "CRDB-e89Gq", "begin failed") @@ -158,7 +158,7 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []handler.Statement return stmts, handler.ErrSomeStmtsFailed } - unexecutedStmts = make([]handler.Statement, len(stmts)-(lastSuccessfulIdx+1)) + unexecutedStmts = make([]*handler.Statement, len(stmts)-(lastSuccessfulIdx+1)) copy(unexecutedStmts, stmts[lastSuccessfulIdx+1:]) stmts = nil @@ -174,7 +174,7 @@ func (h *StatementHandler) fetchPreviousStmts( stmtSeq uint64, sequences currentSequences, reduce handler.Reduce, -) (previousStmts []handler.Statement, err error) { +) (previousStmts []*handler.Statement, err error) { query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent) queriesAdded := false @@ -202,18 +202,18 @@ func (h *StatementHandler) fetchPreviousStmts( } for _, event := range events { - stmts, err := reduce(event) + stmt, err := reduce(event) if err != nil { return nil, err } - previousStmts = append(previousStmts, stmts...) + previousStmts = append(previousStmts, stmt) } return previousStmts, nil } func (h *StatementHandler) executeStmts( tx *sql.Tx, - stmts []handler.Statement, + stmts []*handler.Statement, sequences currentSequences, ) int { @@ -244,7 +244,7 @@ func (h *StatementHandler) executeStmts( //executeStmt handles sql statements //an error is returned if the statement could not be inserted properly -func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt handler.Statement) error { +func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt *handler.Statement) error { if stmt.IsNoop() { return nil } diff --git a/internal/eventstore/handler/crdb/handler_stmt_test.go b/internal/eventstore/handler/crdb/handler_stmt_test.go index 16c2c7a7d1..7d78cc9670 100644 --- a/internal/eventstore/handler/crdb/handler_stmt_test.go +++ b/internal/eventstore/handler/crdb/handler_stmt_test.go @@ -162,7 +162,7 @@ func TestStatementHandler_Update(t *testing.T) { } type args struct { ctx context.Context - stmts []handler.Statement + stmts []*handler.Statement reduce handler.Reduce } tests := []struct { @@ -212,7 +212,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewNoOpStatement(&testEvent{ aggregateType: "agg", sequence: 6, @@ -242,7 +242,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "testAgg", @@ -279,7 +279,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -320,7 +320,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -361,7 +361,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewNoOpStatement(&testEvent{ aggregateType: "testAgg", sequence: 7, @@ -392,7 +392,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewNoOpStatement(&testEvent{ aggregateType: "testAgg", sequence: 7, @@ -428,7 +428,7 @@ func TestStatementHandler_Update(t *testing.T) { }, args: args{ ctx: context.Background(), - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewNoOpStatement(&testEvent{ aggregateType: "testAgg", sequence: 7, @@ -684,7 +684,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { failedEventsTable string } type args struct { - stmts []handler.Statement + stmts []*handler.Statement sequences currentSequences } type want struct { @@ -703,7 +703,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { projectionName: "my_projection", }, args: args{ - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -732,7 +732,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { projectionName: "my_projection", }, args: args{ - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -780,7 +780,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { failedEventsTable: "failed_events", }, args: args{ - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -844,7 +844,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { failedEventsTable: "failed_events", }, args: args{ - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -909,7 +909,7 @@ func TestStatementHandler_executeStmts(t *testing.T) { projectionName: "my_projection", }, args: args{ - stmts: []handler.Statement{ + stmts: []*handler.Statement{ NewCreateStatement( &testEvent{ aggregateType: "agg", @@ -946,6 +946,29 @@ func TestStatementHandler_executeStmts(t *testing.T) { Value: "val", }, }), + NewMultiStatement( + &testEvent{ + aggregateType: "agg", + sequence: 8, + previousSequence: 7, + }, + AddCreateStatement( + []handler.Column{ + { + Name: "col", + Value: "val", + }, + }, + ), + AddCreateStatement( + []handler.Column{ + { + Name: "col", + Value: "val", + }, + }, + ), + ), }, sequences: currentSequences{ "agg": 2, @@ -962,8 +985,12 @@ func TestStatementHandler_executeStmts(t *testing.T) { expectSavePoint(), expectCreate("my_projection", []string{"col"}, []string{"$1"}), expectSavePointRelease(), + expectSavePoint(), + expectCreate("my_projection", []string{"col"}, []string{"$1"}), + expectCreate("my_projection", []string{"col"}, []string{"$1"}), + expectSavePointRelease(), }, - idx: 2, + idx: 3, }, }, } @@ -1025,7 +1052,7 @@ func TestStatementHandler_executeStmt(t *testing.T) { projectionName string } type args struct { - stmt handler.Statement + stmt *handler.Statement } type want struct { expectations []mockExpectation @@ -1532,16 +1559,14 @@ func TestStatementHandler_updateCurrentSequence(t *testing.T) { } } -func testReduce(stmts ...handler.Statement) handler.Reduce { - return func(event eventstore.EventReader) ([]handler.Statement, error) { - return []handler.Statement{ - NewNoOpStatement(event), - }, nil +func testReduce() handler.Reduce { + return func(event eventstore.EventReader) (*handler.Statement, error) { + return NewNoOpStatement(event), nil } } func testReduceErr(err error) handler.Reduce { - return func(event eventstore.EventReader) ([]handler.Statement, error) { + return func(event eventstore.EventReader) (*handler.Statement, error) { return nil, err } } diff --git a/internal/eventstore/handler/crdb/reduce.go b/internal/eventstore/handler/crdb/reduce.go index 4ee4f53a4c..355d89ded0 100644 --- a/internal/eventstore/handler/crdb/reduce.go +++ b/internal/eventstore/handler/crdb/reduce.go @@ -6,10 +6,10 @@ import ( ) //reduce implements handler.Reduce function -func (h *StatementHandler) reduce(event eventstore.EventReader) ([]handler.Statement, error) { +func (h *StatementHandler) reduce(event eventstore.EventReader) (*handler.Statement, error) { reduce, ok := h.reduces[event.Type()] if !ok { - return []handler.Statement{NewNoOpStatement(event)}, nil + return NewNoOpStatement(event), nil } return reduce(event) diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index 1c79e0cfbb..2ecf5b615b 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -23,7 +23,7 @@ func WithTableSuffix(name string) func(*execConfig) { } } -func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement { +func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement { cols, params, args := columnsToQuery(values) columnNames := strings.Join(cols, ", ") valuesPlaceholder := strings.Join(params, ", ") @@ -40,7 +40,7 @@ func NewCreateStatement(event eventstore.EventReader, values []handler.Column, o return "INSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")" } - return handler.Statement{ + return &handler.Statement{ AggregateType: event.Aggregate().Type, Sequence: event.Sequence(), PreviousSequence: event.PreviousAggregateTypeSequence(), @@ -48,7 +48,7 @@ func NewCreateStatement(event eventstore.EventReader, values []handler.Column, o } } -func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement { +func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement { cols, params, args := columnsToQuery(values) columnNames := strings.Join(cols, ", ") valuesPlaceholder := strings.Join(params, ", ") @@ -65,7 +65,7 @@ func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, o return "UPSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")" } - return handler.Statement{ + return &handler.Statement{ AggregateType: event.Aggregate().Type, Sequence: event.Sequence(), PreviousSequence: event.PreviousAggregateTypeSequence(), @@ -73,7 +73,7 @@ func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, o } } -func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) handler.Statement { +func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) *handler.Statement { cols, params, args := columnsToQuery(values) wheres, whereArgs := conditionsToWhere(conditions, len(params)) args = append(args, whereArgs...) @@ -98,7 +98,7 @@ func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, c return "UPDATE " + config.tableName + " SET (" + columnNames + ") = (" + valuesPlaceholder + ") WHERE " + wheresPlaceholders } - return handler.Statement{ + return &handler.Statement{ AggregateType: event.Aggregate().Type, Sequence: event.Sequence(), PreviousSequence: event.PreviousAggregateTypeSequence(), @@ -106,7 +106,7 @@ func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, c } } -func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) handler.Statement { +func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) *handler.Statement { wheres, args := conditionsToWhere(conditions, 0) wheresPlaceholders := strings.Join(wheres, " AND ") @@ -123,7 +123,7 @@ func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condi return "DELETE FROM " + config.tableName + " WHERE " + wheresPlaceholders } - return handler.Statement{ + return &handler.Statement{ AggregateType: event.Aggregate().Type, Sequence: event.Sequence(), PreviousSequence: event.PreviousAggregateTypeSequence(), @@ -131,14 +131,56 @@ func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condi } } -func NewNoOpStatement(event eventstore.EventReader) handler.Statement { - return handler.Statement{ +func NewNoOpStatement(event eventstore.EventReader) *handler.Statement { + return &handler.Statement{ AggregateType: event.Aggregate().Type, Sequence: event.Sequence(), PreviousSequence: event.PreviousAggregateTypeSequence(), } } +func NewMultiStatement(event eventstore.EventReader, opts ...func(eventstore.EventReader) Exec) *handler.Statement { + if len(opts) == 0 { + return NewNoOpStatement(event) + } + execs := make([]Exec, len(opts)) + for i, opt := range opts { + execs[i] = opt(event) + } + return &handler.Statement{ + AggregateType: event.Aggregate().Type, + Sequence: event.Sequence(), + PreviousSequence: event.PreviousAggregateTypeSequence(), + Execute: multiExec(execs), + } +} + +type Exec func(ex handler.Executer, projectionName string) error + +func AddCreateStatement(columns []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec { + return func(event eventstore.EventReader) Exec { + return NewCreateStatement(event, columns, opts...).Execute + } +} + +func AddUpsertStatement(values []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec { + return func(event eventstore.EventReader) Exec { + return NewUpsertStatement(event, values, opts...).Execute + } +} + +func AddUpdateStatement(values []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec { + return func(event eventstore.EventReader) Exec { + return NewUpdateStatement(event, values, conditions, opts...).Execute + } +} + +func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec { + return func(event eventstore.EventReader) Exec { + return NewDeleteStatement(event, conditions, opts...).Execute + } +} + func columnsToQuery(cols []handler.Column) (names []string, parameters []string, values []interface{}) { names = make([]string, len(cols)) values = make([]interface{}, len(cols)) @@ -166,7 +208,7 @@ func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []stri type query func(config execConfig) string -func exec(config execConfig, q query, opts []execOption) func(ex handler.Executer, projectionName string) error { +func exec(config execConfig, q query, opts []execOption) Exec { return func(ex handler.Executer, projectionName string) error { if projectionName == "" { return handler.ErrNoProjection @@ -188,3 +230,14 @@ func exec(config execConfig, q query, opts []execOption) func(ex handler.Execute return nil } } + +func multiExec(execList []Exec) Exec { + return func(ex handler.Executer, projectionName string) error { + for _, exec := range execList { + if err := exec(ex, projectionName); err != nil { + return err + } + } + return nil + } +} diff --git a/internal/eventstore/handler/crdb/statement_test.go b/internal/eventstore/handler/crdb/statement_test.go index 858aa247b3..3558a6a7ef 100644 --- a/internal/eventstore/handler/crdb/statement_test.go +++ b/internal/eventstore/handler/crdb/statement_test.go @@ -11,13 +11,18 @@ import ( ) type wantExecuter struct { - query string - args []interface{} + params []params + i int t *testing.T wasExecuted bool shouldExecute bool } +type params struct { + query string + args []interface{} +} + var errTestErr = errors.New("some error") func (ex *wantExecuter) check(t *testing.T) { @@ -34,12 +39,18 @@ func (ex *wantExecuter) check(t *testing.T) { func (ex *wantExecuter) Exec(query string, args ...interface{}) (sql.Result, error) { ex.t.Helper() ex.wasExecuted = true - if query != ex.query { - ex.t.Errorf("wrong query:\n expected:\n %q\n got:\n %q", ex.query, query) + if ex.i >= len(ex.params) { + ex.t.Errorf("did not expect more exec, but got:\n %q with %q", query, args) + return nil, nil } - if !reflect.DeepEqual(ex.args, args) { - ex.t.Errorf("wrong args:\n expected:\n %v\n got:\n %v", ex.args, args) + p := ex.params[ex.i] + if query != p.query { + ex.t.Errorf("wrong query:\n expected:\n %q\n got:\n %q", p.query, query) } + if !reflect.DeepEqual(p.args, args) { + ex.t.Errorf("wrong args:\n expected:\n %v\n got:\n %v", p.args, args) + } + ex.i++ return nil, nil } @@ -137,9 +148,13 @@ func TestNewCreateStatement(t *testing.T) { sequence: 1, previousSequence: 1, executer: &wantExecuter{ - query: "INSERT INTO my_table (col1) VALUES ($1)", + params: []params{ + { + query: "INSERT INTO my_table (col1) VALUES ($1)", + args: []interface{}{"val"}, + }, + }, shouldExecute: true, - args: []interface{}{"val"}, }, isErr: func(err error) bool { return err == nil @@ -255,9 +270,13 @@ func TestNewUpsertStatement(t *testing.T) { sequence: 1, previousSequence: 1, executer: &wantExecuter{ - query: "UPSERT INTO my_table (col1) VALUES ($1)", + params: []params{ + { + query: "UPSERT INTO my_table (col1) VALUES ($1)", + args: []interface{}{"val"}, + }, + }, shouldExecute: true, - args: []interface{}{"val"}, }, isErr: func(err error) bool { return err == nil @@ -422,9 +441,13 @@ func TestNewUpdateStatement(t *testing.T) { sequence: 1, previousSequence: 1, executer: &wantExecuter{ - query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)", + params: []params{ + { + query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)", + args: []interface{}{"val", 1}, + }, + }, shouldExecute: true, - args: []interface{}{"val", 1}, }, isErr: func(err error) bool { return err == nil @@ -541,9 +564,13 @@ func TestNewDeleteStatement(t *testing.T) { sequence: 1, previousSequence: 1, executer: &wantExecuter{ - query: "DELETE FROM my_table WHERE (col1 = $1)", + params: []params{ + { + query: "DELETE FROM my_table WHERE (col1 = $1)", + args: []interface{}{1}, + }, + }, shouldExecute: true, - args: []interface{}{1}, }, isErr: func(err error) bool { return err == nil @@ -572,7 +599,7 @@ func TestNewNoOpStatement(t *testing.T) { tests := []struct { name string args args - want handler.Statement + want *handler.Statement }{ { name: "generate correctly", @@ -583,7 +610,7 @@ func TestNewNoOpStatement(t *testing.T) { previousSequence: 3, }, }, - want: handler.Statement{ + want: &handler.Statement{ AggregateType: "agg", Execute: nil, Sequence: 5, @@ -600,6 +627,174 @@ func TestNewNoOpStatement(t *testing.T) { } } +func TestNewMultiStatement(t *testing.T) { + type args struct { + table string + event *testEvent + execs []func(eventstore.EventReader) Exec + } + + type want struct { + table string + aggregateType eventstore.AggregateType + sequence uint64 + previousSequence uint64 + executer *wantExecuter + isErr func(error) bool + } + tests := []struct { + name string + args args + want want + }{ + { + name: "no op", + args: args{ + table: "my_table", + event: &testEvent{ + aggregateType: "agg", + sequence: 1, + previousSequence: 0, + }, + execs: nil, + }, + want: want{ + executer: nil, + }, + }, + { + name: "no condition", + args: args{ + table: "my_table", + event: &testEvent{ + aggregateType: "agg", + sequence: 1, + previousSequence: 0, + }, + execs: []func(eventstore.EventReader) Exec{ + AddDeleteStatement( + []handler.Condition{}, + ), + AddCreateStatement( + []handler.Column{ + { + Name: "col1", + Value: 1, + }, + }), + }, + }, + want: want{ + table: "my_table", + aggregateType: "agg", + sequence: 1, + previousSequence: 1, + executer: &wantExecuter{ + shouldExecute: false, + }, + isErr: func(err error) bool { + return errors.Is(err, handler.ErrNoCondition) + }, + }, + }, + { + name: "correct", + args: args{ + table: "my_table", + event: &testEvent{ + sequence: 1, + previousSequence: 0, + aggregateType: "agg", + }, + execs: []func(eventstore.EventReader) Exec{ + AddDeleteStatement( + []handler.Condition{ + { + Name: "col1", + Value: 1, + }, + }), + AddCreateStatement( + []handler.Column{ + { + Name: "col1", + Value: 1, + }, + }), + AddUpsertStatement( + []handler.Column{ + { + Name: "col1", + Value: 1, + }, + }), + AddUpdateStatement( + []handler.Column{ + { + Name: "col1", + Value: 1, + }, + }, + []handler.Condition{ + { + Name: "col1", + Value: 1, + }, + }), + }, + }, + want: want{ + table: "my_table", + aggregateType: "agg", + sequence: 1, + previousSequence: 1, + executer: &wantExecuter{ + params: []params{ + { + query: "DELETE FROM my_table WHERE (col1 = $1)", + args: []interface{}{1}, + }, + { + query: "INSERT INTO my_table (col1) VALUES ($1)", + args: []interface{}{1}, + }, + { + query: "UPSERT INTO my_table (col1) VALUES ($1)", + args: []interface{}{1}, + }, + { + query: "UPDATE my_table SET (col1) = ($1) WHERE (col1 = $2)", + args: []interface{}{1, 1}, + }, + }, + shouldExecute: true, + }, + isErr: func(err error) bool { + return err == nil + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stmt := NewMultiStatement(tt.args.event, tt.args.execs...) + + if tt.want.executer != nil && stmt.Execute == nil { + t.Error("expected executer, but was nil") + } + if stmt.Execute == nil { + return + } + tt.want.executer.t = t + err := stmt.Execute(tt.want.executer, tt.args.table) + if !tt.want.isErr(err) { + t.Errorf("unexpected error: %v", err) + } + tt.want.executer.check(t) + }) + } +} + func TestStatement_Execute(t *testing.T) { type fields struct { execute func(ex handler.Executer, projectionName string) error diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index 98ebaba7d2..f21af035f0 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -18,11 +18,11 @@ type ProjectionHandlerConfig struct { } //Update updates the projection with the given statements -type Update func(context.Context, []Statement, Reduce) (unexecutedStmts []Statement, err error) +type Update func(context.Context, []*Statement, Reduce) (unexecutedStmts []*Statement, err error) //Reduce reduces the given event to a statement //which is used to update the projection -type Reduce func(eventstore.EventReader) ([]Statement, error) +type Reduce func(eventstore.EventReader) (*Statement, error) //Lock is used for mutex handling if needed on the projection type Lock func(context.Context, time.Duration) <-chan error @@ -46,7 +46,7 @@ type ProjectionHandler struct { ProjectionName string lockMu sync.Mutex - stmts []Statement + stmts []*Statement } func NewProjectionHandler(config ProjectionHandlerConfig) *ProjectionHandler { @@ -156,7 +156,7 @@ func (h *ProjectionHandler) processEvent( event eventstore.EventReader, reduce Reduce, ) error { - stmts, err := reduce(event) + stmt, err := reduce(event) if err != nil { logging.Log("EVENT-PTr4j").WithError(err).Warn("unable to process event") return err @@ -165,7 +165,7 @@ func (h *ProjectionHandler) processEvent( h.lockMu.Lock() defer h.lockMu.Unlock() - h.stmts = append(h.stmts, stmts...) + h.stmts = append(h.stmts, stmt) return nil } diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index f4124dcb2b..033fe2f053 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -24,8 +24,9 @@ var ( ErrUpdate = errors.New("update err") ) -func newTestStatement(seq, previousSeq uint64) Statement { - return Statement{ +func newTestStatement(aggType eventstore.AggregateType, seq, previousSeq uint64) *Statement { + return &Statement{ + AggregateType: aggType, Sequence: seq, PreviousSequence: previousSeq, } @@ -39,7 +40,7 @@ func initTimer() *time.Timer { func TestProjectionHandler_processEvent(t *testing.T) { type fields struct { - stmts []Statement + stmts []*Statement pushSet bool shouldPush *time.Timer } @@ -50,7 +51,7 @@ func TestProjectionHandler_processEvent(t *testing.T) { } type want struct { isErr func(err error) bool - stmts []Statement + stmts []*Statement } tests := []struct { name string @@ -76,41 +77,43 @@ func TestProjectionHandler_processEvent(t *testing.T) { }, }, { - name: "no stmts", + name: "single new stmt", fields: fields{ stmts: nil, pushSet: false, shouldPush: initTimer(), }, args: args{ - reduce: testReduce(), + reduce: testReduce(newTestStatement("aggregate1", 1, 0)), }, want: want{ isErr: func(err error) bool { return err == nil }, - stmts: nil, + stmts: []*Statement{ + newTestStatement("aggregate1", 1, 0), + }, }, }, { name: "existing stmts", fields: fields{ - stmts: []Statement{ - newTestStatement(1, 0), + stmts: []*Statement{ + newTestStatement("aggregate1", 1, 0), }, pushSet: false, shouldPush: initTimer(), }, args: args{ - reduce: testReduce(newTestStatement(2, 1)), + reduce: testReduce(newTestStatement("aggregate1", 2, 1)), }, want: want{ isErr: func(err error) bool { return err == nil }, - stmts: []Statement{ - newTestStatement(1, 0), - newTestStatement(2, 1), + stmts: []*Statement{ + newTestStatement("aggregate1", 1, 0), + newTestStatement("aggregate1", 2, 1), }, }, }, @@ -163,7 +166,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { args: args{ ctx: context.Background(), query: testQuery(nil, 0, ErrQuery), - reduce: testReduce(), + reduce: testReduce(newTestStatement("aggregate1", 1, 0)), }, fields: fields{}, want: want{ @@ -185,7 +188,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { 5, nil, ), - reduce: testReduce(), + reduce: testReduce(newTestStatement("test", 1, 0)), }, fields: fields{ eventstore: eventstore.NewEventstore( @@ -211,7 +214,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { 5, nil, ), - reduce: testReduce(), + reduce: testReduce(newTestStatement("test", 1, 0)), }, fields: fields{ eventstore: eventstore.NewEventstore( @@ -237,7 +240,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { 5, nil, ), - reduce: testReduce(), + reduce: testReduce(newTestStatement("test", 1, 0)), }, fields: fields{ eventstore: eventstore.NewEventstore( @@ -284,7 +287,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { 2, nil, ), - reduce: testReduce(), + reduce: testReduce(newTestStatement("test", 1, 0)), }, fields: fields{ eventstore: eventstore.NewEventstore( @@ -343,7 +346,7 @@ func TestProjectionHandler_fetchBulkStmts(t *testing.T) { func TestProjectionHandler_push(t *testing.T) { type fields struct { - stmts []Statement + stmts []*Statement pushSet bool } type args struct { @@ -365,9 +368,9 @@ func TestProjectionHandler_push(t *testing.T) { { name: "previous lock", fields: fields{ - stmts: []Statement{ - newTestStatement(1, 0), - newTestStatement(2, 1), + stmts: []*Statement{ + newTestStatement("aggregate1", 1, 0), + newTestStatement("aggregate1", 2, 1), }, pushSet: true, }, @@ -375,7 +378,7 @@ func TestProjectionHandler_push(t *testing.T) { ctx: context.Background(), previousLock: 200 * time.Millisecond, update: testUpdate(t, 2, nil), - reduce: testReduce(), + reduce: testReduce(newTestStatement("aggregate1", 1, 0)), }, want: want{ isErr: func(err error) bool { return err == nil }, @@ -385,16 +388,16 @@ func TestProjectionHandler_push(t *testing.T) { { name: "error in update", fields: fields{ - stmts: []Statement{ - newTestStatement(1, 0), - newTestStatement(2, 1), + stmts: []*Statement{ + newTestStatement("aggregate1", 1, 0), + newTestStatement("aggregate1", 2, 1), }, pushSet: true, }, args: args{ ctx: context.Background(), update: testUpdate(t, 2, errors.New("some error")), - reduce: testReduce(), + reduce: testReduce(newTestStatement("test", 1, 0)), }, want: want{ isErr: func(err error) bool { return err.Error() == "some error" }, @@ -668,7 +671,7 @@ func TestProjectionHandler_prepareExecuteBulk(t *testing.T) { type fields struct { Handler Handler SequenceTable string - stmts []Statement + stmts []*Statement pushSet bool shouldPush *time.Timer } @@ -754,7 +757,7 @@ func TestProjectionHandler_prepareExecuteBulk(t *testing.T) { nil, ), reduce: testReduce( - newTestStatement(2, 1), + newTestStatement("aggregate1", 2, 1), ), ctx: context.Background(), }, @@ -797,7 +800,7 @@ func TestProjectionHandler_prepareExecuteBulk(t *testing.T) { shouldPush: initTimer(), }, args: args{ - update: testUpdate(t, 4, nil), + update: testUpdate(t, 2, nil), query: testQuery( eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -807,8 +810,7 @@ func TestProjectionHandler_prepareExecuteBulk(t *testing.T) { nil, ), reduce: testReduce( - newTestStatement(1, 0), - newTestStatement(2, 1), + newTestStatement("aggregate1", 0, 1), ), ctx: context.Background(), }, @@ -838,22 +840,22 @@ func TestProjectionHandler_prepareExecuteBulk(t *testing.T) { } func testUpdate(t *testing.T, expectedStmtCount int, returnedErr error) Update { - return func(ctx context.Context, stmts []Statement, reduce Reduce) ([]Statement, error) { + return func(ctx context.Context, stmts []*Statement, reduce Reduce) ([]*Statement, error) { if expectedStmtCount != len(stmts) { t.Errorf("expected %d stmts got %d", expectedStmtCount, len(stmts)) } - return []Statement{}, returnedErr + return []*Statement{}, returnedErr } } -func testReduce(stmts ...Statement) Reduce { - return func(event eventstore.EventReader) ([]Statement, error) { +func testReduce(stmts *Statement) Reduce { + return func(event eventstore.EventReader) (*Statement, error) { return stmts, nil } } func testReduceErr(err error) Reduce { - return func(event eventstore.EventReader) ([]Statement, error) { + return func(event eventstore.EventReader) (*Statement, error) { return nil, err } } diff --git a/internal/query/projection/org.go b/internal/query/projection/org.go index 75b3c0a7d7..adb9d7ff39 100644 --- a/internal/query/projection/org.go +++ b/internal/query/projection/org.go @@ -65,29 +65,27 @@ const ( orgNameCol = "name" ) -func (p *OrgProjection) reduceOrgAdded(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgProjection) reduceOrgAdded(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgAddedEvent) if !ok { logging.LogWithFields("HANDL-zWCk3", "seq", event.Sequence, "expectedType", org.OrgAddedEventType).Error("was not an event") return nil, errors.ThrowInvalidArgument(nil, "HANDL-uYq4r", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewCreateStatement( - e, - []handler.Column{ - handler.NewCol(orgIDCol, e.Aggregate().ID), - handler.NewCol(orgCreationDateCol, e.CreationDate()), - handler.NewCol(orgChangeDateCol, e.CreationDate()), - handler.NewCol(orgResourceOwnerCol, e.Aggregate().ResourceOwner), - handler.NewCol(orgSequenceCol, e.Sequence()), - handler.NewCol(orgNameCol, e.Name), - handler.NewCol(orgStateCol, domain.OrgStateActive), - }, - ), - }, nil + return crdb.NewCreateStatement( + e, + []handler.Column{ + handler.NewCol(orgIDCol, e.Aggregate().ID), + handler.NewCol(orgCreationDateCol, e.CreationDate()), + handler.NewCol(orgChangeDateCol, e.CreationDate()), + handler.NewCol(orgResourceOwnerCol, e.Aggregate().ResourceOwner), + handler.NewCol(orgSequenceCol, e.Sequence()), + handler.NewCol(orgNameCol, e.Name), + handler.NewCol(orgStateCol, domain.OrgStateActive), + }, + ), nil } -func (p *OrgProjection) reduceOrgChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgProjection) reduceOrgChanged(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgChangedEvent) if !ok { logging.LogWithFields("HANDL-q4oq8", "seq", event.Sequence, "expected", org.OrgChangedEventType).Error("wrong event type") @@ -100,76 +98,68 @@ func (p *OrgProjection) reduceOrgChanged(event eventstore.EventReader) ([]handle if e.Name != "" { values = append(values, handler.NewCol(orgNameCol, e.Name)) } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - values, - []handler.Condition{ - handler.NewCond(orgIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + values, + []handler.Condition{ + handler.NewCond(orgIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *OrgProjection) reduceOrgDeactivated(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgProjection) reduceOrgDeactivated(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgDeactivatedEvent) if !ok { logging.LogWithFields("HANDL-1gwdc", "seq", event.Sequence, "expectedType", org.OrgDeactivatedEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "HANDL-BApK4", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(orgChangeDateCol, e.CreationDate()), - handler.NewCol(orgSequenceCol, e.Sequence()), - handler.NewCol(orgStateCol, domain.OrgStateInactive), - }, - []handler.Condition{ - handler.NewCond(orgIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(orgChangeDateCol, e.CreationDate()), + handler.NewCol(orgSequenceCol, e.Sequence()), + handler.NewCol(orgStateCol, domain.OrgStateInactive), + }, + []handler.Condition{ + handler.NewCond(orgIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *OrgProjection) reduceOrgReactivated(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgProjection) reduceOrgReactivated(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgReactivatedEvent) if !ok { logging.LogWithFields("HANDL-Vjwiy", "seq", event.Sequence, "expectedType", org.OrgReactivatedEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "HANDL-o37De", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(orgChangeDateCol, e.CreationDate()), - handler.NewCol(orgSequenceCol, e.Sequence()), - handler.NewCol(orgStateCol, domain.OrgStateActive), - }, - []handler.Condition{ - handler.NewCond(orgIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(orgChangeDateCol, e.CreationDate()), + handler.NewCol(orgSequenceCol, e.Sequence()), + handler.NewCol(orgStateCol, domain.OrgStateActive), + }, + []handler.Condition{ + handler.NewCond(orgIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *OrgProjection) reducePrimaryDomainSet(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgProjection) reducePrimaryDomainSet(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.DomainPrimarySetEvent) if !ok { logging.LogWithFields("HANDL-79OhB", "seq", event.Sequence, "expectedType", org.OrgDomainPrimarySetEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "HANDL-4TbKT", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(orgChangeDateCol, e.CreationDate()), - handler.NewCol(orgSequenceCol, e.Sequence()), - handler.NewCol(orgDomainCol, e.Domain), - }, - []handler.Condition{ - handler.NewCond(orgIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(orgChangeDateCol, e.CreationDate()), + handler.NewCol(orgSequenceCol, e.Sequence()), + handler.NewCol(orgDomainCol, e.Domain), + }, + []handler.Condition{ + handler.NewCond(orgIDCol, e.Aggregate().ID), + }, + ), nil } diff --git a/internal/query/projection/org/owner/projection.go b/internal/query/projection/org/owner/projection.go index 3c0afd54d2..48d68f36de 100644 --- a/internal/query/projection/org/owner/projection.go +++ b/internal/query/projection/org/owner/projection.go @@ -110,7 +110,7 @@ func (p *OrgOwnerProjection) reducers() []handler.AggregateReducer { } } -func (p *OrgOwnerProjection) reduceMemberAdded(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceMemberAdded(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.MemberAddedEvent) if !ok { logging.LogWithFields("PROJE-kL530", "seq", event.Sequence, "expected", org.MemberAddedEventType).Error("wrong event type") @@ -118,7 +118,7 @@ func (p *OrgOwnerProjection) reduceMemberAdded(event eventstore.EventReader) ([] } if !isOrgOwner(e.Roles) { - return []handler.Statement{crdb.NewNoOpStatement(e)}, nil + return crdb.NewNoOpStatement(e), nil } stmt, err := p.addOwner(e, e.Aggregate().ResourceOwner, e.UserID) @@ -126,10 +126,10 @@ func (p *OrgOwnerProjection) reduceMemberAdded(event eventstore.EventReader) ([] return nil, err } - return []handler.Statement{stmt}, nil + return stmt, nil } -func (p *OrgOwnerProjection) reduceMemberChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceMemberChanged(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.MemberChangedEvent) if !ok { logging.LogWithFields("PROJE-kL530", "seq", event.Sequence, "expected", org.MemberAddedEventType).Error("wrong event type") @@ -137,7 +137,7 @@ func (p *OrgOwnerProjection) reduceMemberChanged(event eventstore.EventReader) ( } if !isOrgOwner(e.Roles) { - return []handler.Statement{p.deleteOwner(e, e.Aggregate().ID, e.UserID)}, nil + return p.deleteOwner(e, e.Aggregate().ID, e.UserID), nil } stmt, err := p.addOwner(e, e.Aggregate().ResourceOwner, e.UserID) @@ -145,41 +145,39 @@ func (p *OrgOwnerProjection) reduceMemberChanged(event eventstore.EventReader) ( return nil, err } - return []handler.Statement{stmt}, nil + return stmt, nil } -func (p *OrgOwnerProjection) reduceMemberRemoved(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceMemberRemoved(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.MemberRemovedEvent) if !ok { logging.LogWithFields("PROJE-boIbP", "seq", event.Sequence, "expected", org.MemberRemovedEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "PROJE-pk6TS", "reduce.wrong.event.type") } - return []handler.Statement{p.deleteOwner(e, e.Aggregate().ID, e.UserID)}, nil + return p.deleteOwner(e, e.Aggregate().ID, e.UserID), nil } -func (p *OrgOwnerProjection) reduceHumanEmailChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceHumanEmailChanged(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*user.HumanEmailChangedEvent) if !ok { logging.LogWithFields("PROJE-IHFwh", "seq", event.Sequence, "expected", user.HumanEmailChangedType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "PROJE-jMlwT", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(userEmailCol, e.EmailAddress), - }, - []handler.Condition{ - handler.NewCond(userIDCol, e.Aggregate().ID), - }, - crdb.WithTableSuffix(userTableSuffix), - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(userEmailCol, e.EmailAddress), + }, + []handler.Condition{ + handler.NewCond(userIDCol, e.Aggregate().ID), + }, + crdb.WithTableSuffix(userTableSuffix), + ), nil } -func (p *OrgOwnerProjection) reduceHumanProfileChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceHumanProfileChanged(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*user.HumanProfileChangedEvent) if !ok { logging.LogWithFields("PROJE-WqgUS", "seq", event.Sequence, "expected", user.HumanProfileChangedType).Error("wrong event type") @@ -201,41 +199,37 @@ func (p *OrgOwnerProjection) reduceHumanProfileChanged(event eventstore.EventRea } if len(values) == 0 { - return []handler.Statement{crdb.NewNoOpStatement(e)}, nil + return crdb.NewNoOpStatement(e), nil } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - values, - []handler.Condition{ - handler.NewCond(userIDCol, e.Aggregate().ID), - }, - crdb.WithTableSuffix(userTableSuffix), - ), - }, nil + return crdb.NewUpdateStatement( + e, + values, + []handler.Condition{ + handler.NewCond(userIDCol, e.Aggregate().ID), + }, + crdb.WithTableSuffix(userTableSuffix), + ), nil } -func (p *OrgOwnerProjection) reduceOrgAdded(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceOrgAdded(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgAddedEvent) if !ok { logging.LogWithFields("PROJE-wbOrL", "seq", event.Sequence, "expected", org.OrgAddedEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "PROJE-pk6TS", "reduce.wrong.event.type") } - return []handler.Statement{ - crdb.NewCreateStatement( - e, - []handler.Column{ - handler.NewCol(orgIDCol, e.Aggregate().ResourceOwner), - handler.NewCol(orgNameCol, e.Name), - handler.NewCol(orgCreationDateCol, e.CreationDate()), - }, - crdb.WithTableSuffix(orgTableSuffix), - ), - }, nil + return crdb.NewCreateStatement( + e, + []handler.Column{ + handler.NewCol(orgIDCol, e.Aggregate().ResourceOwner), + handler.NewCol(orgNameCol, e.Name), + handler.NewCol(orgCreationDateCol, e.CreationDate()), + }, + crdb.WithTableSuffix(orgTableSuffix), + ), nil } -func (p *OrgOwnerProjection) reduceOrgChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *OrgOwnerProjection) reduceOrgChanged(event eventstore.EventReader) (*handler.Statement, error) { e, ok := event.(*org.OrgChangedEvent) if !ok { logging.LogWithFields("PROJE-piy2b", "seq", event.Sequence, "expected", org.OrgChangedEventType).Error("wrong event type") @@ -248,46 +242,42 @@ func (p *OrgOwnerProjection) reduceOrgChanged(event eventstore.EventReader) ([]h } if len(values) == 0 { - return []handler.Statement{crdb.NewNoOpStatement(e)}, nil + return crdb.NewNoOpStatement(e), nil } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - values, - []handler.Condition{ - handler.NewCond(orgIDCol, e.Aggregate().ResourceOwner), - }, - crdb.WithTableSuffix(orgTableSuffix), - ), - }, nil + return crdb.NewUpdateStatement( + e, + values, + []handler.Condition{ + handler.NewCond(orgIDCol, e.Aggregate().ResourceOwner), + }, + crdb.WithTableSuffix(orgTableSuffix), + ), nil } -func (p *OrgOwnerProjection) reduceOrgRemoved(event eventstore.EventReader) ([]handler.Statement, error) { - e, ok := event.(*org.OrgChangedEvent) +func (p *OrgOwnerProjection) reduceOrgRemoved(event eventstore.EventReader) (*handler.Statement, error) { + e, ok := event.(*org.OrgRemovedEvent) if !ok { logging.LogWithFields("PROJE-F1mHQ", "seq", event.Sequence, "expected", org.OrgRemovedEventType).Error("wrong event type") return nil, errors.ThrowInvalidArgument(nil, "PROJE-9ZR2w", "reduce.wrong.event.type") } - return []handler.Statement{ + return crdb.NewMultiStatement(e, //delete org in org table - crdb.NewDeleteStatement( - e, + crdb.AddDeleteStatement( []handler.Condition{ handler.NewCond(orgIDCol, e.Aggregate().ResourceOwner), }, crdb.WithTableSuffix(orgTableSuffix), ), // delete users of the org - crdb.NewDeleteStatement( - e, + crdb.AddDeleteStatement( []handler.Condition{ handler.NewCond(userOrgIDCol, e.Aggregate().ResourceOwner), }, crdb.WithTableSuffix(userTableSuffix), ), - }, nil + ), nil } func isOrgOwner(roles []string) bool { @@ -299,7 +289,7 @@ func isOrgOwner(roles []string) bool { return false } -func (p *OrgOwnerProjection) deleteOwner(event eventstore.EventReader, orgID, ownerID string) handler.Statement { +func (p *OrgOwnerProjection) deleteOwner(event eventstore.EventReader, orgID, ownerID string) *handler.Statement { return crdb.NewDeleteStatement( event, []handler.Condition{ @@ -310,7 +300,7 @@ func (p *OrgOwnerProjection) deleteOwner(event eventstore.EventReader, orgID, ow ) } -func (p *OrgOwnerProjection) addOwner(event eventstore.EventReader, orgID, userID string) (handler.Statement, error) { +func (p *OrgOwnerProjection) addOwner(event eventstore.EventReader, orgID, userID string) (*handler.Statement, error) { events, err := p.Eventstore.FilterEvents(context.Background(), eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AddQuery(). @@ -330,12 +320,12 @@ func (p *OrgOwnerProjection) addOwner(event eventstore.EventReader, orgID, userI SequenceLess(event.Sequence()). Builder()) if err != nil { - return handler.Statement{}, err + return nil, err } if len(events) == 0 { logging.LogWithFields("mqd3w", "user", userID, "org", orgID, "seq", event.Sequence()).Warn("no events for user found") - return handler.Statement{}, errors.ThrowInternal(nil, "PROJE-Qk7Tv", "unable to find user events") + return nil, errors.ThrowInternal(nil, "PROJE-Qk7Tv", "unable to find user events") } owner := &OrgOwner{ diff --git a/internal/query/projection/project.go b/internal/query/projection/project.go index f20d1fee48..e4eea357fc 100644 --- a/internal/query/projection/project.go +++ b/internal/query/projection/project.go @@ -66,89 +66,79 @@ const ( projectInactive ) -func (p *ProjectProjection) reduceProjectAdded(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *ProjectProjection) reduceProjectAdded(event eventstore.EventReader) (*handler.Statement, error) { e := event.(*project.ProjectAddedEvent) - return []handler.Statement{ - crdb.NewCreateStatement( - e, - []handler.Column{ - handler.NewCol(projectIDCol, e.Aggregate().ID), - handler.NewCol(projectNameCol, e.Name), - handler.NewCol(projectCreationDateCol, e.CreationDate()), - handler.NewCol(projectChangeDateCol, e.CreationDate()), - handler.NewCol(projectOwnerCol, e.Aggregate().ResourceOwner), - handler.NewCol(projectCreatorCol, e.EditorUser()), - handler.NewCol(projectStateCol, projectActive), - }, - ), - }, nil + return crdb.NewCreateStatement( + e, + []handler.Column{ + handler.NewCol(projectIDCol, e.Aggregate().ID), + handler.NewCol(projectNameCol, e.Name), + handler.NewCol(projectCreationDateCol, e.CreationDate()), + handler.NewCol(projectChangeDateCol, e.CreationDate()), + handler.NewCol(projectOwnerCol, e.Aggregate().ResourceOwner), + handler.NewCol(projectCreatorCol, e.EditorUser()), + handler.NewCol(projectStateCol, projectActive), + }, + ), nil } -func (p *ProjectProjection) reduceProjectChanged(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *ProjectProjection) reduceProjectChanged(event eventstore.EventReader) (*handler.Statement, error) { e := event.(*project.ProjectChangeEvent) if e.Name == nil { - return []handler.Statement{crdb.NewNoOpStatement(e)}, nil + return crdb.NewNoOpStatement(e), nil } - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(projectNameCol, e.Name), - handler.NewCol(projectChangeDateCol, e.CreationDate()), - }, - []handler.Condition{ - handler.NewCond(projectIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(projectNameCol, e.Name), + handler.NewCol(projectChangeDateCol, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(projectIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *ProjectProjection) reduceProjectDeactivated(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *ProjectProjection) reduceProjectDeactivated(event eventstore.EventReader) (*handler.Statement, error) { e := event.(*project.ProjectDeactivatedEvent) - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(projectStateCol, projectInactive), - handler.NewCol(projectChangeDateCol, e.CreationDate()), - }, - []handler.Condition{ - handler.NewCond(projectIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(projectStateCol, projectInactive), + handler.NewCol(projectChangeDateCol, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(projectIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *ProjectProjection) reduceProjectReactivated(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *ProjectProjection) reduceProjectReactivated(event eventstore.EventReader) (*handler.Statement, error) { e := event.(*project.ProjectReactivatedEvent) - return []handler.Statement{ - crdb.NewUpdateStatement( - e, - []handler.Column{ - handler.NewCol(projectStateCol, projectActive), - handler.NewCol(projectChangeDateCol, e.CreationDate()), - }, - []handler.Condition{ - handler.NewCond(projectIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(projectStateCol, projectActive), + handler.NewCol(projectChangeDateCol, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(projectIDCol, e.Aggregate().ID), + }, + ), nil } -func (p *ProjectProjection) reduceProjectRemoved(event eventstore.EventReader) ([]handler.Statement, error) { +func (p *ProjectProjection) reduceProjectRemoved(event eventstore.EventReader) (*handler.Statement, error) { e := event.(*project.ProjectRemovedEvent) - return []handler.Statement{ - crdb.NewDeleteStatement( - e, - []handler.Condition{ - handler.NewCond(projectIDCol, e.Aggregate().ID), - }, - ), - }, nil + return crdb.NewDeleteStatement( + e, + []handler.Condition{ + handler.NewCond(projectIDCol, e.Aggregate().ID), + }, + ), nil }