fix: handle multiple statements for a single event in projections (#2313)

* fix: handle multiple statements for a single event in projections

* export func type

* fix test

* Update internal/eventstore/handler/crdb/statement.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* Update internal/eventstore/handler/crdb/statement.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* change to pointers

* add error test case

Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
Livio Amstutz
2021-09-08 13:54:31 +02:00
committed by GitHub
parent ec6a3a1847
commit 1ac1492fd3
11 changed files with 546 additions and 301 deletions

View File

@@ -20,7 +20,7 @@ const (
") AS failure_count"
)
func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt handler.Statement, execErr error) (shouldContinue bool) {
func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt *handler.Statement, execErr error) (shouldContinue bool) {
failureCount, err := h.failureCount(tx, stmt.Sequence)
if err != nil {
logging.LogWithFields("CRDB-WJaFk", "projection", h.ProjectionName, "seq", stmt.Sequence).WithError(err).Warn("unable to get failure count")

View File

@@ -116,7 +116,7 @@ func (h *StatementHandler) SearchQuery() (*eventstore.SearchQueryBuilder, uint64
}
//Update implements handler.Update
func (h *StatementHandler) Update(ctx context.Context, stmts []handler.Statement, reduce handler.Reduce) (unexecutedStmts []handler.Statement, err error) {
func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statement, reduce handler.Reduce) (unexecutedStmts []*handler.Statement, err error) {
tx, err := h.client.BeginTx(ctx, nil)
if err != nil {
return stmts, errors.ThrowInternal(err, "CRDB-e89Gq", "begin failed")
@@ -158,7 +158,7 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []handler.Statement
return stmts, handler.ErrSomeStmtsFailed
}
unexecutedStmts = make([]handler.Statement, len(stmts)-(lastSuccessfulIdx+1))
unexecutedStmts = make([]*handler.Statement, len(stmts)-(lastSuccessfulIdx+1))
copy(unexecutedStmts, stmts[lastSuccessfulIdx+1:])
stmts = nil
@@ -174,7 +174,7 @@ func (h *StatementHandler) fetchPreviousStmts(
stmtSeq uint64,
sequences currentSequences,
reduce handler.Reduce,
) (previousStmts []handler.Statement, err error) {
) (previousStmts []*handler.Statement, err error) {
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent)
queriesAdded := false
@@ -202,18 +202,18 @@ func (h *StatementHandler) fetchPreviousStmts(
}
for _, event := range events {
stmts, err := reduce(event)
stmt, err := reduce(event)
if err != nil {
return nil, err
}
previousStmts = append(previousStmts, stmts...)
previousStmts = append(previousStmts, stmt)
}
return previousStmts, nil
}
func (h *StatementHandler) executeStmts(
tx *sql.Tx,
stmts []handler.Statement,
stmts []*handler.Statement,
sequences currentSequences,
) int {
@@ -244,7 +244,7 @@ func (h *StatementHandler) executeStmts(
//executeStmt handles sql statements
//an error is returned if the statement could not be inserted properly
func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt handler.Statement) error {
func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt *handler.Statement) error {
if stmt.IsNoop() {
return nil
}

View File

@@ -162,7 +162,7 @@ func TestStatementHandler_Update(t *testing.T) {
}
type args struct {
ctx context.Context
stmts []handler.Statement
stmts []*handler.Statement
reduce handler.Reduce
}
tests := []struct {
@@ -212,7 +212,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewNoOpStatement(&testEvent{
aggregateType: "agg",
sequence: 6,
@@ -242,7 +242,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "testAgg",
@@ -279,7 +279,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -320,7 +320,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -361,7 +361,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewNoOpStatement(&testEvent{
aggregateType: "testAgg",
sequence: 7,
@@ -392,7 +392,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewNoOpStatement(&testEvent{
aggregateType: "testAgg",
sequence: 7,
@@ -428,7 +428,7 @@ func TestStatementHandler_Update(t *testing.T) {
},
args: args{
ctx: context.Background(),
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewNoOpStatement(&testEvent{
aggregateType: "testAgg",
sequence: 7,
@@ -684,7 +684,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
failedEventsTable string
}
type args struct {
stmts []handler.Statement
stmts []*handler.Statement
sequences currentSequences
}
type want struct {
@@ -703,7 +703,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
projectionName: "my_projection",
},
args: args{
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -732,7 +732,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
projectionName: "my_projection",
},
args: args{
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -780,7 +780,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
failedEventsTable: "failed_events",
},
args: args{
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -844,7 +844,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
failedEventsTable: "failed_events",
},
args: args{
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -909,7 +909,7 @@ func TestStatementHandler_executeStmts(t *testing.T) {
projectionName: "my_projection",
},
args: args{
stmts: []handler.Statement{
stmts: []*handler.Statement{
NewCreateStatement(
&testEvent{
aggregateType: "agg",
@@ -946,6 +946,29 @@ func TestStatementHandler_executeStmts(t *testing.T) {
Value: "val",
},
}),
NewMultiStatement(
&testEvent{
aggregateType: "agg",
sequence: 8,
previousSequence: 7,
},
AddCreateStatement(
[]handler.Column{
{
Name: "col",
Value: "val",
},
},
),
AddCreateStatement(
[]handler.Column{
{
Name: "col",
Value: "val",
},
},
),
),
},
sequences: currentSequences{
"agg": 2,
@@ -962,8 +985,12 @@ func TestStatementHandler_executeStmts(t *testing.T) {
expectSavePoint(),
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
expectSavePointRelease(),
expectSavePoint(),
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
expectSavePointRelease(),
},
idx: 2,
idx: 3,
},
},
}
@@ -1025,7 +1052,7 @@ func TestStatementHandler_executeStmt(t *testing.T) {
projectionName string
}
type args struct {
stmt handler.Statement
stmt *handler.Statement
}
type want struct {
expectations []mockExpectation
@@ -1532,16 +1559,14 @@ func TestStatementHandler_updateCurrentSequence(t *testing.T) {
}
}
func testReduce(stmts ...handler.Statement) handler.Reduce {
return func(event eventstore.EventReader) ([]handler.Statement, error) {
return []handler.Statement{
NewNoOpStatement(event),
}, nil
func testReduce() handler.Reduce {
return func(event eventstore.EventReader) (*handler.Statement, error) {
return NewNoOpStatement(event), nil
}
}
func testReduceErr(err error) handler.Reduce {
return func(event eventstore.EventReader) ([]handler.Statement, error) {
return func(event eventstore.EventReader) (*handler.Statement, error) {
return nil, err
}
}

View File

@@ -6,10 +6,10 @@ import (
)
//reduce implements handler.Reduce function
func (h *StatementHandler) reduce(event eventstore.EventReader) ([]handler.Statement, error) {
func (h *StatementHandler) reduce(event eventstore.EventReader) (*handler.Statement, error) {
reduce, ok := h.reduces[event.Type()]
if !ok {
return []handler.Statement{NewNoOpStatement(event)}, nil
return NewNoOpStatement(event), nil
}
return reduce(event)

View File

@@ -23,7 +23,7 @@ func WithTableSuffix(name string) func(*execConfig) {
}
}
func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement {
func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
@@ -40,7 +40,7 @@ func NewCreateStatement(event eventstore.EventReader, values []handler.Column, o
return "INSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")"
}
return handler.Statement{
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
@@ -48,7 +48,7 @@ func NewCreateStatement(event eventstore.EventReader, values []handler.Column, o
}
}
func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement {
func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
@@ -65,7 +65,7 @@ func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, o
return "UPSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")"
}
return handler.Statement{
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
@@ -73,7 +73,7 @@ func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, o
}
}
func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) handler.Statement {
func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) *handler.Statement {
cols, params, args := columnsToQuery(values)
wheres, whereArgs := conditionsToWhere(conditions, len(params))
args = append(args, whereArgs...)
@@ -98,7 +98,7 @@ func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, c
return "UPDATE " + config.tableName + " SET (" + columnNames + ") = (" + valuesPlaceholder + ") WHERE " + wheresPlaceholders
}
return handler.Statement{
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
@@ -106,7 +106,7 @@ func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, c
}
}
func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) handler.Statement {
func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) *handler.Statement {
wheres, args := conditionsToWhere(conditions, 0)
wheresPlaceholders := strings.Join(wheres, " AND ")
@@ -123,7 +123,7 @@ func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condi
return "DELETE FROM " + config.tableName + " WHERE " + wheresPlaceholders
}
return handler.Statement{
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
@@ -131,14 +131,56 @@ func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condi
}
}
func NewNoOpStatement(event eventstore.EventReader) handler.Statement {
return handler.Statement{
func NewNoOpStatement(event eventstore.EventReader) *handler.Statement {
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
}
}
func NewMultiStatement(event eventstore.EventReader, opts ...func(eventstore.EventReader) Exec) *handler.Statement {
if len(opts) == 0 {
return NewNoOpStatement(event)
}
execs := make([]Exec, len(opts))
for i, opt := range opts {
execs[i] = opt(event)
}
return &handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
Execute: multiExec(execs),
}
}
type Exec func(ex handler.Executer, projectionName string) error
func AddCreateStatement(columns []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
return NewCreateStatement(event, columns, opts...).Execute
}
}
func AddUpsertStatement(values []handler.Column, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
return NewUpsertStatement(event, values, opts...).Execute
}
}
func AddUpdateStatement(values []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
return NewUpdateStatement(event, values, conditions, opts...).Execute
}
}
func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func(eventstore.EventReader) Exec {
return func(event eventstore.EventReader) Exec {
return NewDeleteStatement(event, conditions, opts...).Execute
}
}
func columnsToQuery(cols []handler.Column) (names []string, parameters []string, values []interface{}) {
names = make([]string, len(cols))
values = make([]interface{}, len(cols))
@@ -166,7 +208,7 @@ func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []stri
type query func(config execConfig) string
func exec(config execConfig, q query, opts []execOption) func(ex handler.Executer, projectionName string) error {
func exec(config execConfig, q query, opts []execOption) Exec {
return func(ex handler.Executer, projectionName string) error {
if projectionName == "" {
return handler.ErrNoProjection
@@ -188,3 +230,14 @@ func exec(config execConfig, q query, opts []execOption) func(ex handler.Execute
return nil
}
}
func multiExec(execList []Exec) Exec {
return func(ex handler.Executer, projectionName string) error {
for _, exec := range execList {
if err := exec(ex, projectionName); err != nil {
return err
}
}
return nil
}
}

View File

@@ -11,13 +11,18 @@ import (
)
type wantExecuter struct {
query string
args []interface{}
params []params
i int
t *testing.T
wasExecuted bool
shouldExecute bool
}
type params struct {
query string
args []interface{}
}
var errTestErr = errors.New("some error")
func (ex *wantExecuter) check(t *testing.T) {
@@ -34,12 +39,18 @@ func (ex *wantExecuter) check(t *testing.T) {
func (ex *wantExecuter) Exec(query string, args ...interface{}) (sql.Result, error) {
ex.t.Helper()
ex.wasExecuted = true
if query != ex.query {
ex.t.Errorf("wrong query:\n expected:\n %q\n got:\n %q", ex.query, query)
if ex.i >= len(ex.params) {
ex.t.Errorf("did not expect more exec, but got:\n %q with %q", query, args)
return nil, nil
}
if !reflect.DeepEqual(ex.args, args) {
ex.t.Errorf("wrong args:\n expected:\n %v\n got:\n %v", ex.args, args)
p := ex.params[ex.i]
if query != p.query {
ex.t.Errorf("wrong query:\n expected:\n %q\n got:\n %q", p.query, query)
}
if !reflect.DeepEqual(p.args, args) {
ex.t.Errorf("wrong args:\n expected:\n %v\n got:\n %v", p.args, args)
}
ex.i++
return nil, nil
}
@@ -137,9 +148,13 @@ func TestNewCreateStatement(t *testing.T) {
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "INSERT INTO my_table (col1) VALUES ($1)",
params: []params{
{
query: "INSERT INTO my_table (col1) VALUES ($1)",
args: []interface{}{"val"},
},
},
shouldExecute: true,
args: []interface{}{"val"},
},
isErr: func(err error) bool {
return err == nil
@@ -255,9 +270,13 @@ func TestNewUpsertStatement(t *testing.T) {
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "UPSERT INTO my_table (col1) VALUES ($1)",
params: []params{
{
query: "UPSERT INTO my_table (col1) VALUES ($1)",
args: []interface{}{"val"},
},
},
shouldExecute: true,
args: []interface{}{"val"},
},
isErr: func(err error) bool {
return err == nil
@@ -422,9 +441,13 @@ func TestNewUpdateStatement(t *testing.T) {
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)",
params: []params{
{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)",
args: []interface{}{"val", 1},
},
},
shouldExecute: true,
args: []interface{}{"val", 1},
},
isErr: func(err error) bool {
return err == nil
@@ -541,9 +564,13 @@ func TestNewDeleteStatement(t *testing.T) {
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "DELETE FROM my_table WHERE (col1 = $1)",
params: []params{
{
query: "DELETE FROM my_table WHERE (col1 = $1)",
args: []interface{}{1},
},
},
shouldExecute: true,
args: []interface{}{1},
},
isErr: func(err error) bool {
return err == nil
@@ -572,7 +599,7 @@ func TestNewNoOpStatement(t *testing.T) {
tests := []struct {
name string
args args
want handler.Statement
want *handler.Statement
}{
{
name: "generate correctly",
@@ -583,7 +610,7 @@ func TestNewNoOpStatement(t *testing.T) {
previousSequence: 3,
},
},
want: handler.Statement{
want: &handler.Statement{
AggregateType: "agg",
Execute: nil,
Sequence: 5,
@@ -600,6 +627,174 @@ func TestNewNoOpStatement(t *testing.T) {
}
}
func TestNewMultiStatement(t *testing.T) {
type args struct {
table string
event *testEvent
execs []func(eventstore.EventReader) Exec
}
type want struct {
table string
aggregateType eventstore.AggregateType
sequence uint64
previousSequence uint64
executer *wantExecuter
isErr func(error) bool
}
tests := []struct {
name string
args args
want want
}{
{
name: "no op",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
execs: nil,
},
want: want{
executer: nil,
},
},
{
name: "no condition",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
execs: []func(eventstore.EventReader) Exec{
AddDeleteStatement(
[]handler.Condition{},
),
AddCreateStatement(
[]handler.Column{
{
Name: "col1",
Value: 1,
},
}),
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoCondition)
},
},
},
{
name: "correct",
args: args{
table: "my_table",
event: &testEvent{
sequence: 1,
previousSequence: 0,
aggregateType: "agg",
},
execs: []func(eventstore.EventReader) Exec{
AddDeleteStatement(
[]handler.Condition{
{
Name: "col1",
Value: 1,
},
}),
AddCreateStatement(
[]handler.Column{
{
Name: "col1",
Value: 1,
},
}),
AddUpsertStatement(
[]handler.Column{
{
Name: "col1",
Value: 1,
},
}),
AddUpdateStatement(
[]handler.Column{
{
Name: "col1",
Value: 1,
},
},
[]handler.Condition{
{
Name: "col1",
Value: 1,
},
}),
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
params: []params{
{
query: "DELETE FROM my_table WHERE (col1 = $1)",
args: []interface{}{1},
},
{
query: "INSERT INTO my_table (col1) VALUES ($1)",
args: []interface{}{1},
},
{
query: "UPSERT INTO my_table (col1) VALUES ($1)",
args: []interface{}{1},
},
{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col1 = $2)",
args: []interface{}{1, 1},
},
},
shouldExecute: true,
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stmt := NewMultiStatement(tt.args.event, tt.args.execs...)
if tt.want.executer != nil && stmt.Execute == nil {
t.Error("expected executer, but was nil")
}
if stmt.Execute == nil {
return
}
tt.want.executer.t = t
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
tt.want.executer.check(t)
})
}
}
func TestStatement_Execute(t *testing.T) {
type fields struct {
execute func(ex handler.Executer, projectionName string) error