mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-12 04:13:40 +00:00
bb756482c7
* document analytics config * rework configuration and docs * describe HandleActiveInstances better * describe active instances on quotas better * only projected events are considered * cleanup * describe changes at runtime * push milestones * stop tracking events * calculate and push 4 in 6 milestones * reduce milestone pushed * remove docs * fix scheduled pseudo event projection * push 5 in 6 milestones * push 6 in 6 milestones * ignore client ids * fix text array contains * push human readable milestone type * statement unit tests * improve dev and db performance * organize imports * cleanup * organize imports * test projection * check rows.Err() * test search query * pass linting * review * test 4 milestones * simplify milestone by instance ids query * use type NamespacedCondition * cleanup * lint * lint * dont overwrite original error * no opt-in in examples * cleanup * prerelease * enable request headers * make limit configurable * review fixes * only requeue special handlers secondly * include integration tests * Revert "include integration tests" This reverts commit 96db9504ecdb4e73451f09554fd749cd7c27341f. * pass reducers * test handlers * fix unit test * feat: increment version * lint * remove prerelease * fix integration tests
1811 lines
42 KiB
Go
1811 lines
42 KiB
Go
package crdb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/DATA-DOG/go-sqlmock"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/zitadel/zitadel/internal/database"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/eventstore/handler"
|
|
"github.com/zitadel/zitadel/internal/eventstore/repository"
|
|
es_repo_mock "github.com/zitadel/zitadel/internal/eventstore/repository/mock"
|
|
"github.com/zitadel/zitadel/internal/id"
|
|
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
|
)
|
|
|
|
var (
|
|
errFilter = errors.New("filter err")
|
|
errReduce = errors.New("reduce err")
|
|
)
|
|
|
|
var _ eventstore.Event = &testEvent{}
|
|
|
|
type testEvent struct {
|
|
eventstore.BaseEvent
|
|
sequence uint64
|
|
previousSequence uint64
|
|
aggregateType eventstore.AggregateType
|
|
instanceID string
|
|
}
|
|
|
|
func (e *testEvent) Sequence() uint64 {
|
|
return e.sequence
|
|
}
|
|
|
|
func (e *testEvent) Aggregate() eventstore.Aggregate {
|
|
return eventstore.Aggregate{
|
|
Type: e.aggregateType,
|
|
InstanceID: e.instanceID,
|
|
}
|
|
}
|
|
|
|
func (e *testEvent) PreviousAggregateTypeSequence() uint64 {
|
|
return e.previousSequence
|
|
}
|
|
|
|
func TestProjectionHandler_SearchQuery(t *testing.T) {
|
|
type want struct {
|
|
SearchQueryBuilder *eventstore.SearchQueryBuilder
|
|
limit uint64
|
|
isErr func(error) bool
|
|
expectations []mockExpectation
|
|
}
|
|
type fields struct {
|
|
sequenceTable string
|
|
projectionName string
|
|
reducers []handler.AggregateReducer
|
|
bulkLimit uint64
|
|
}
|
|
type args struct {
|
|
instanceIDs []string
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want want
|
|
}{
|
|
{
|
|
name: "error in current sequence",
|
|
fields: fields{
|
|
sequenceTable: "my_sequences",
|
|
projectionName: "my_projection",
|
|
reducers: failingAggregateReducers("testAgg"),
|
|
bulkLimit: 5,
|
|
},
|
|
args: args{
|
|
instanceIDs: []string{"instanceID1"},
|
|
},
|
|
want: want{
|
|
limit: 0,
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrTxDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequenceErr("my_sequences", "my_projection", []string{"instanceID1"}, sql.ErrTxDone),
|
|
},
|
|
SearchQueryBuilder: nil,
|
|
},
|
|
},
|
|
{
|
|
name: "only aggregates",
|
|
fields: fields{
|
|
sequenceTable: "my_sequences",
|
|
projectionName: "my_projection",
|
|
reducers: failingAggregateReducers("testAgg"),
|
|
bulkLimit: 5,
|
|
},
|
|
args: args{
|
|
instanceIDs: []string{"instanceID1"},
|
|
},
|
|
want: want{
|
|
limit: 5,
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID1"}),
|
|
},
|
|
SearchQueryBuilder: eventstore.
|
|
NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
|
AllowTimeTravel().
|
|
AddQuery().
|
|
AggregateTypes("testAgg").
|
|
SequenceGreater(5).
|
|
InstanceID("instanceID1").
|
|
Builder().
|
|
Limit(5),
|
|
},
|
|
},
|
|
{
|
|
name: "multiple instances",
|
|
fields: fields{
|
|
sequenceTable: "my_sequences",
|
|
projectionName: "my_projection",
|
|
reducers: failingAggregateReducers("testAgg"),
|
|
bulkLimit: 5,
|
|
},
|
|
args: args{
|
|
instanceIDs: []string{"instanceID1", "instanceID2"},
|
|
},
|
|
want: want{
|
|
limit: 5,
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID1", "instanceID2"}),
|
|
},
|
|
SearchQueryBuilder: eventstore.
|
|
NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
|
AllowTimeTravel().
|
|
AddQuery().
|
|
AggregateTypes("testAgg").
|
|
SequenceGreater(5).
|
|
InstanceID("instanceID1").
|
|
Or().
|
|
AggregateTypes("testAgg").
|
|
SequenceGreater(5).
|
|
InstanceID("instanceID2").
|
|
Builder().
|
|
Limit(5),
|
|
},
|
|
},
|
|
{
|
|
name: "scheduled pseudo event",
|
|
fields: fields{
|
|
sequenceTable: "my_sequences",
|
|
projectionName: "my_projection",
|
|
reducers: []handler.AggregateReducer{{
|
|
Aggregate: pseudo.AggregateType,
|
|
EventRedusers: []handler.EventReducer{
|
|
{
|
|
Event: pseudo.ScheduledEventType,
|
|
Reduce: testReduceErr(errors.New("should not be called")),
|
|
},
|
|
},
|
|
}},
|
|
bulkLimit: 5,
|
|
},
|
|
args: args{
|
|
instanceIDs: []string{"instanceID1", "instanceID2"},
|
|
},
|
|
want: want{
|
|
limit: 1,
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
id.Configure(&id.Config{Identification: id.Identification{PrivateIp: id.PrivateIp{Enabled: true}}})
|
|
h := NewStatementHandler(context.Background(), StatementHandlerConfig{
|
|
ProjectionHandlerConfig: handler.ProjectionHandlerConfig{
|
|
ProjectionName: tt.fields.projectionName,
|
|
},
|
|
SequenceTable: tt.fields.sequenceTable,
|
|
BulkLimit: tt.fields.bulkLimit,
|
|
Client: &database.DB{
|
|
DB: client,
|
|
},
|
|
Reducers: tt.fields.reducers,
|
|
})
|
|
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
|
|
query, limit, err := h.searchQuery(context.Background(), tt.args.instanceIDs)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err)
|
|
return
|
|
}
|
|
if !reflect.DeepEqual(query, tt.want.SearchQueryBuilder) {
|
|
t.Errorf("unexpected query: expected %v, got %v", tt.want.SearchQueryBuilder, query)
|
|
}
|
|
if tt.want.limit != limit {
|
|
t.Errorf("unexpected limit: got: %d want %d", limit, tt.want.limit)
|
|
}
|
|
|
|
mock.MatchExpectationsInOrder(true)
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestStatementHandler_Update(t *testing.T) {
|
|
type fields struct {
|
|
eventstore *eventstore.Eventstore
|
|
aggregates []eventstore.AggregateType
|
|
}
|
|
type want struct {
|
|
expectations []mockExpectation
|
|
isErr func(error) bool
|
|
stmtsLen int
|
|
}
|
|
type args struct {
|
|
ctx context.Context
|
|
stmts []*handler.Statement
|
|
reduce handler.Reduce
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
want want
|
|
args args
|
|
}{
|
|
{
|
|
name: "begin fails",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 0,
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBeginErr(sql.ErrConnDone),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "current sequence fails",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequenceErr("my_sequences", "my_projection", []string{"instanceID"}, sql.ErrTxDone),
|
|
expectRollback(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrTxDone)
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "fetch previous fails",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).
|
|
ExpectFilterEventsError(errFilter),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID"}),
|
|
expectRollback(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, errFilter)
|
|
},
|
|
stmtsLen: 1,
|
|
},
|
|
},
|
|
{
|
|
name: "no successful stmts",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "testAgg",
|
|
sequence: 7,
|
|
previousSequence: 6,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID"}),
|
|
expectCommit(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, handler.ErrSomeStmtsFailed)
|
|
},
|
|
stmtsLen: 1,
|
|
},
|
|
},
|
|
{
|
|
name: "update current sequence fails",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 7,
|
|
previousSequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "agg", []string{"instanceID"}),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectUpdateCurrentSequenceNoRows("my_sequences", "my_projection", 7, "agg", "instanceID"),
|
|
expectRollback(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, errSeqNotUpdated)
|
|
},
|
|
stmtsLen: 1,
|
|
},
|
|
},
|
|
{
|
|
name: "commit fails",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 7,
|
|
previousSequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "agg", []string{"instanceID"}),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectUpdateCurrentSequence("my_sequences", "my_projection", 7, "agg", "instanceID"),
|
|
expectCommitErr(sql.ErrConnDone),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
stmtsLen: 1,
|
|
},
|
|
},
|
|
{
|
|
name: "correct",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "testAgg",
|
|
sequence: 7,
|
|
previousSequence: 5,
|
|
instanceID: "instanceID",
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID"}),
|
|
expectUpdateCurrentSequence("my_sequences", "my_projection", 7, "testAgg", "instanceID"),
|
|
expectCommit(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
stmtsLen: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "fetch previous stmts no additional stmts",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "testAgg",
|
|
sequence: 7,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
}),
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID"}),
|
|
expectUpdateCurrentSequence("my_sequences", "my_projection", 7, "testAgg", "instanceID"),
|
|
expectCommit(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "fetch previous stmts additional events",
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
|
&repository.Event{
|
|
AggregateType: "testAgg",
|
|
Sequence: 6,
|
|
PreviousAggregateSequence: 5,
|
|
InstanceID: "instanceID",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
args: args{
|
|
ctx: context.Background(),
|
|
stmts: []*handler.Statement{
|
|
NewNoOpStatement(&testEvent{
|
|
aggregateType: "testAgg",
|
|
sequence: 7,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
}),
|
|
},
|
|
reduce: testReduce(),
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectBegin(),
|
|
expectCurrentSequence("my_sequences", "my_projection", 5, "testAgg", []string{"instanceID"}),
|
|
expectUpdateCurrentSequence("my_sequences", "my_projection", 7, "testAgg", "instanceID"),
|
|
expectCommit(),
|
|
},
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
stmtsLen: 1,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
h := &StatementHandler{
|
|
ProjectionHandler: &handler.ProjectionHandler{
|
|
Handler: handler.Handler{
|
|
Eventstore: tt.fields.eventstore,
|
|
},
|
|
ProjectionName: "my_projection",
|
|
},
|
|
sequenceTable: "my_sequences",
|
|
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, "my_sequences"),
|
|
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, "my_sequences"),
|
|
client: &database.DB{
|
|
DB: client,
|
|
},
|
|
}
|
|
|
|
h.aggregates = tt.fields.aggregates
|
|
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
|
|
index, err := h.Update(tt.args.ctx, tt.args.stmts, tt.args.reduce)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("StatementHandler.Update() error = %v", err)
|
|
}
|
|
if err == nil && tt.want.stmtsLen != index {
|
|
t.Errorf("wrong stmts length: want: %d got %d", tt.want.stmtsLen, index)
|
|
}
|
|
|
|
mock.MatchExpectationsInOrder(true)
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
|
|
type args struct {
|
|
ctx context.Context
|
|
stmtSeq uint64
|
|
sequences currentSequences
|
|
reduce handler.Reduce
|
|
}
|
|
type want struct {
|
|
stmtCount int
|
|
isErr func(error) bool
|
|
}
|
|
type fields struct {
|
|
eventstore *eventstore.Eventstore
|
|
aggregates []eventstore.AggregateType
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
args args
|
|
fields fields
|
|
want want
|
|
}{
|
|
{
|
|
name: "no queries",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
reduce: testReduce(),
|
|
},
|
|
fields: fields{
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
stmtCount: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "eventstore returns err",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
reduce: testReduce(),
|
|
sequences: currentSequences{
|
|
"testAgg": []*instanceSequence{
|
|
{sequence: 5},
|
|
},
|
|
},
|
|
stmtSeq: 6,
|
|
},
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEventsError(errFilter),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, errFilter)
|
|
},
|
|
stmtCount: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "no events found",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
reduce: testReduce(),
|
|
sequences: currentSequences{
|
|
"testAgg": []*instanceSequence{
|
|
{sequence: 5},
|
|
},
|
|
},
|
|
stmtSeq: 6,
|
|
},
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "found events",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
reduce: testReduce(),
|
|
sequences: currentSequences{
|
|
"testAgg": []*instanceSequence{
|
|
{sequence: 5},
|
|
},
|
|
},
|
|
stmtSeq: 10,
|
|
},
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
|
&repository.Event{
|
|
ID: "id",
|
|
Sequence: 7,
|
|
PreviousAggregateSequence: 0,
|
|
CreationDate: time.Now(),
|
|
Type: "test.added",
|
|
Version: "v1",
|
|
AggregateID: "testid",
|
|
AggregateType: "testAgg",
|
|
},
|
|
&repository.Event{
|
|
ID: "id",
|
|
Sequence: 9,
|
|
PreviousAggregateSequence: 7,
|
|
CreationDate: time.Now(),
|
|
Type: "test.changed",
|
|
Version: "v1",
|
|
AggregateID: "testid",
|
|
AggregateType: "testAgg",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
want: want{
|
|
stmtCount: 2,
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "reduce fails",
|
|
args: args{
|
|
ctx: context.Background(),
|
|
reduce: testReduceErr(errReduce),
|
|
sequences: currentSequences{
|
|
"testAgg": []*instanceSequence{
|
|
{sequence: 5},
|
|
},
|
|
},
|
|
stmtSeq: 10,
|
|
},
|
|
fields: fields{
|
|
eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
|
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
|
&repository.Event{
|
|
ID: "id",
|
|
Sequence: 7,
|
|
PreviousAggregateSequence: 0,
|
|
CreationDate: time.Now(),
|
|
Type: "test.added",
|
|
Version: "v1",
|
|
AggregateID: "testid",
|
|
AggregateType: "testAgg",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
aggregates: []eventstore.AggregateType{"testAgg"},
|
|
},
|
|
want: want{
|
|
stmtCount: 0,
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, errReduce)
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := &StatementHandler{
|
|
aggregates: tt.fields.aggregates,
|
|
}
|
|
h.ProjectionHandler = &handler.ProjectionHandler{
|
|
Handler: handler.Handler{
|
|
Eventstore: tt.fields.eventstore,
|
|
},
|
|
ProjectionName: "my_projection",
|
|
}
|
|
stmts, err := h.fetchPreviousStmts(tt.args.ctx, nil, tt.args.stmtSeq, "", tt.args.sequences, tt.args.reduce)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("ProjectionHandler.prepareBulkStmts() error = %v", err)
|
|
return
|
|
}
|
|
if tt.want.stmtCount != len(stmts) {
|
|
t.Errorf("unexpected length of stmts: got: %d want %d", len(stmts), tt.want.stmtCount)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestStatementHandler_executeStmts(t *testing.T) {
|
|
type fields struct {
|
|
projectionName string
|
|
maxFailureCount uint
|
|
failedEventsTable string
|
|
}
|
|
type args struct {
|
|
stmts []*handler.Statement
|
|
sequences currentSequences
|
|
}
|
|
type want struct {
|
|
expectations []mockExpectation
|
|
idx int
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want want
|
|
}{
|
|
{
|
|
name: "already inserted",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 5,
|
|
previousSequence: 2,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val1",
|
|
},
|
|
}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{sequence: 5},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{},
|
|
idx: -1,
|
|
},
|
|
},
|
|
{
|
|
name: "previous sequence higher than sequence",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 5,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col1",
|
|
Value: "val1",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 8,
|
|
previousSequence: 7,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col2",
|
|
Value: "val2",
|
|
},
|
|
}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{sequence: 2},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col1"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
},
|
|
idx: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "execute fails not continue",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
maxFailureCount: 5,
|
|
failedEventsTable: "failed_events",
|
|
},
|
|
args: args{
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 5,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 7,
|
|
previousSequence: 6,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{sequence: 2},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectSavePoint(),
|
|
expectCreateErr("my_projection", []string{"col"}, []string{"$1"}, sql.ErrConnDone),
|
|
expectSavePointRollback(),
|
|
expectFailureCount("failed_events", "my_projection", "instanceID", 6, 3),
|
|
expectUpdateFailureCount("failed_events", "my_projection", "instanceID", 6, 4),
|
|
},
|
|
idx: 0,
|
|
},
|
|
},
|
|
{
|
|
name: "execute fails continue",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
maxFailureCount: 5,
|
|
failedEventsTable: "failed_events",
|
|
},
|
|
args: args{
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 5,
|
|
previousSequence: 0,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col1",
|
|
Value: "val1",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col2",
|
|
Value: "val2",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 7,
|
|
previousSequence: 6,
|
|
instanceID: "instanceID",
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col3",
|
|
Value: "val3",
|
|
},
|
|
}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{sequence: 2},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col1"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectSavePoint(),
|
|
expectCreateErr("my_projection", []string{"col2"}, []string{"$1"}, sql.ErrConnDone),
|
|
expectSavePointRollback(),
|
|
expectFailureCount("failed_events", "my_projection", "instanceID", 6, 4),
|
|
expectUpdateFailureCount("failed_events", "my_projection", "instanceID", 6, 5),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col3"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
},
|
|
idx: 2,
|
|
},
|
|
},
|
|
{
|
|
name: "correct",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmts: []*handler.Statement{
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 5,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 6,
|
|
previousSequence: 5,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 7,
|
|
previousSequence: 6,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
NewMultiStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 8,
|
|
previousSequence: 7,
|
|
},
|
|
AddCreateStatement(
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
},
|
|
),
|
|
AddCreateStatement(
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
},
|
|
),
|
|
),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{sequence: 2},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
},
|
|
idx: 3,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
h := NewStatementHandler(
|
|
context.Background(),
|
|
StatementHandlerConfig{
|
|
ProjectionHandlerConfig: handler.ProjectionHandlerConfig{
|
|
HandlerConfig: handler.HandlerConfig{
|
|
Eventstore: nil,
|
|
},
|
|
ProjectionName: tt.fields.projectionName,
|
|
RequeueEvery: 0,
|
|
},
|
|
Client: &database.DB{
|
|
DB: client,
|
|
},
|
|
FailedEventsTable: tt.fields.failedEventsTable,
|
|
MaxFailureCount: tt.fields.maxFailureCount,
|
|
},
|
|
)
|
|
|
|
mock.ExpectBegin()
|
|
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
|
|
mock.ExpectCommit()
|
|
|
|
tx, err := client.Begin()
|
|
if err != nil {
|
|
t.Fatalf("unexpected err in begin: %v", err)
|
|
}
|
|
|
|
idx := h.executeStmts(tx, &tt.args.stmts, tt.args.sequences)
|
|
if idx != tt.want.idx {
|
|
t.Errorf("unexpected index want: %d got %d", tt.want.idx, idx)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
t.Fatalf("unexpected err in commit: %v", err)
|
|
}
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestStatementHandler_executeStmt(t *testing.T) {
|
|
type fields struct {
|
|
projectionName string
|
|
}
|
|
type args struct {
|
|
stmt *handler.Statement
|
|
}
|
|
type want struct {
|
|
expectations []mockExpectation
|
|
isErr func(error) bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want want
|
|
}{
|
|
{
|
|
name: "create savepoint fails",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 1,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectSavePointErr(sql.ErrConnDone),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "execute fails",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 1,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrNoRows)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreateErr("my_projection", []string{"col"}, []string{"$1"}, sql.ErrNoRows),
|
|
expectSavePointRollback(),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "rollback savepoint fails",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 1,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreateErr("my_projection", []string{"col"}, []string{"$1"}, sql.ErrNoRows),
|
|
expectSavePointRollbackErr(sql.ErrConnDone),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "no op",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: NewNoOpStatement(&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 1,
|
|
previousSequence: 0,
|
|
}),
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{},
|
|
},
|
|
},
|
|
{
|
|
name: "with op",
|
|
fields: fields{
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: NewCreateStatement(
|
|
&testEvent{
|
|
aggregateType: "agg",
|
|
sequence: 1,
|
|
previousSequence: 0,
|
|
},
|
|
[]handler.Column{
|
|
{
|
|
Name: "col",
|
|
Value: "val",
|
|
},
|
|
}),
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectSavePoint(),
|
|
expectCreate("my_projection", []string{"col"}, []string{"$1"}),
|
|
expectSavePointRelease(),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := &StatementHandler{
|
|
ProjectionHandler: &handler.ProjectionHandler{
|
|
ProjectionName: tt.fields.projectionName,
|
|
},
|
|
}
|
|
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
mock.ExpectBegin()
|
|
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
|
|
mock.ExpectCommit()
|
|
|
|
tx, err := client.Begin()
|
|
if err != nil {
|
|
t.Fatalf("unexpected err in begin: %v", err)
|
|
}
|
|
|
|
err = h.executeStmt(tx, tt.args.stmt)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
t.Fatalf("unexpected err in begin: %v", err)
|
|
}
|
|
|
|
mock.MatchExpectationsInOrder(true)
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestStatementHandler_currentSequence(t *testing.T) {
|
|
type fields struct {
|
|
sequenceTable string
|
|
projectionName string
|
|
aggregates []eventstore.AggregateType
|
|
}
|
|
type args struct {
|
|
stmt handler.Statement
|
|
instanceIDs []string
|
|
}
|
|
type want struct {
|
|
expectations []mockExpectation
|
|
isErr func(error) bool
|
|
sequences currentSequences
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want want
|
|
}{
|
|
{
|
|
name: "error in query",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
},
|
|
args: args{
|
|
stmt: handler.Statement{},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequenceErr("my_table", "my_projection", nil, sql.ErrConnDone),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "no rows",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
stmt: handler.Statement{},
|
|
instanceIDs: []string{"instanceID"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequenceNoRows("my_table", "my_projection", []string{"instanceID"}),
|
|
},
|
|
sequences: currentSequences{},
|
|
},
|
|
},
|
|
{
|
|
name: "scan err",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
stmt: handler.Statement{},
|
|
instanceIDs: []string{"instanceID"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrTxDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequenceScanErr("my_table", "my_projection", []string{"instanceID"}),
|
|
},
|
|
sequences: currentSequences{},
|
|
},
|
|
},
|
|
{
|
|
name: "found",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
stmt: handler.Statement{},
|
|
instanceIDs: []string{"instanceID"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequence("my_table", "my_projection", 5, "agg", []string{"instanceID"}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "multiple found",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
stmt: handler.Statement{},
|
|
instanceIDs: []string{"instanceID1", "instanceID2"},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, nil)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectCurrentSequence("my_table", "my_projection", 5, "agg", []string{"instanceID1", "instanceID2"}),
|
|
},
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID1",
|
|
},
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID2",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := &StatementHandler{
|
|
ProjectionHandler: &handler.ProjectionHandler{
|
|
ProjectionName: tt.fields.projectionName,
|
|
},
|
|
sequenceTable: tt.fields.sequenceTable,
|
|
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, tt.fields.sequenceTable),
|
|
}
|
|
|
|
h.aggregates = tt.fields.aggregates
|
|
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
mock.ExpectBegin()
|
|
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
|
|
mock.ExpectCommit()
|
|
|
|
tx, err := client.Begin()
|
|
if err != nil {
|
|
t.Fatalf("unexpected err in begin: %v", err)
|
|
}
|
|
|
|
seq, err := h.currentSequences(context.Background(), tx.QueryContext, tt.args.instanceIDs)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
t.Fatalf("unexpected err in commit: %v", err)
|
|
}
|
|
|
|
mock.MatchExpectationsInOrder(true)
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
|
|
for _, aggregateType := range tt.fields.aggregates {
|
|
assert.Equal(t, tt.want.sequences[aggregateType], seq[aggregateType])
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestStatementHandler_updateCurrentSequence(t *testing.T) {
|
|
type fields struct {
|
|
sequenceTable string
|
|
projectionName string
|
|
aggregates []eventstore.AggregateType
|
|
}
|
|
type args struct {
|
|
sequences currentSequences
|
|
}
|
|
type want struct {
|
|
expectations []mockExpectation
|
|
isErr func(error) bool
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want want
|
|
}{
|
|
{
|
|
name: "update sequence fails",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, sql.ErrConnDone)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectUpdateCurrentSequenceErr("my_table", "my_projection", 5, sql.ErrConnDone, "agg", "instanceID"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "update sequence returns no rows",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return errors.Is(err, errSeqNotUpdated)
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectUpdateCurrentSequenceNoRows("my_table", "my_projection", 5, "agg", "instanceID"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "correct",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectUpdateCurrentSequence("my_table", "my_projection", 5, "agg", "instanceID"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "multiple sequences",
|
|
fields: fields{
|
|
sequenceTable: "my_table",
|
|
projectionName: "my_projection",
|
|
aggregates: []eventstore.AggregateType{"agg"},
|
|
},
|
|
args: args{
|
|
sequences: currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
"agg2": []*instanceSequence{
|
|
{
|
|
sequence: 6,
|
|
instanceID: "instanceID",
|
|
},
|
|
{
|
|
sequence: 10,
|
|
instanceID: "instanceID2",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
want: want{
|
|
isErr: func(err error) bool {
|
|
return err == nil
|
|
},
|
|
expectations: []mockExpectation{
|
|
expectUpdateThreeCurrentSequence(t, "my_table", "my_projection", currentSequences{
|
|
"agg": []*instanceSequence{
|
|
{
|
|
sequence: 5,
|
|
instanceID: "instanceID",
|
|
},
|
|
},
|
|
"agg2": []*instanceSequence{
|
|
{
|
|
sequence: 6,
|
|
instanceID: "instanceID",
|
|
},
|
|
{
|
|
sequence: 10,
|
|
instanceID: "instanceID2",
|
|
},
|
|
},
|
|
}),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
|
|
h := &StatementHandler{
|
|
ProjectionHandler: &handler.ProjectionHandler{
|
|
ProjectionName: tt.fields.projectionName,
|
|
},
|
|
sequenceTable: tt.fields.sequenceTable,
|
|
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, tt.fields.sequenceTable),
|
|
}
|
|
|
|
h.aggregates = tt.fields.aggregates
|
|
|
|
client, mock, err := sqlmock.New()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
mock.ExpectBegin()
|
|
for _, expectation := range tt.want.expectations {
|
|
expectation(mock)
|
|
}
|
|
mock.ExpectCommit()
|
|
|
|
tx, err := client.Begin()
|
|
if err != nil {
|
|
t.Fatalf("unexpected error in begin: %v", err)
|
|
}
|
|
|
|
err = h.updateCurrentSequences(tx, tt.args.sequences)
|
|
if !tt.want.isErr(err) {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
t.Fatalf("unexpected error in commit: %v", err)
|
|
}
|
|
|
|
mock.MatchExpectationsInOrder(true)
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
t.Errorf("expectations not met: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func testReduce() handler.Reduce {
|
|
return func(event eventstore.Event) (*handler.Statement, error) {
|
|
return NewNoOpStatement(event), nil
|
|
}
|
|
}
|
|
|
|
func testReduceErr(err error) handler.Reduce {
|
|
return func(event eventstore.Event) (*handler.Statement, error) {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func failingAggregateReducers(aggregates ...eventstore.AggregateType) []handler.AggregateReducer {
|
|
reducers := make([]handler.AggregateReducer, len(aggregates))
|
|
for idx := range aggregates {
|
|
reducers[idx] = handler.AggregateReducer{
|
|
Aggregate: aggregates[idx],
|
|
EventRedusers: []handler.EventReducer{{
|
|
Event: "any.event",
|
|
Reduce: testReduceErr(errors.New("should not be called")),
|
|
}},
|
|
}
|
|
}
|
|
return reducers
|
|
}
|