zitadel/internal/eventstore/eventstore_test.go

1498 lines
32 KiB
Go
Raw Normal View History

2020-09-24 06:52:10 +00:00
package eventstore
2020-09-30 08:00:05 +00:00
import (
2020-10-21 17:00:41 +00:00
"context"
"encoding/json"
2020-10-21 17:00:41 +00:00
"fmt"
2020-09-30 08:00:05 +00:00
"reflect"
"testing"
"time"
2020-09-30 08:00:05 +00:00
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/zerrors"
2020-09-30 08:00:05 +00:00
)
2020-10-05 17:09:26 +00:00
// testEvent implements the Event interface
2020-09-30 08:00:05 +00:00
type testEvent struct {
2020-11-06 12:47:27 +00:00
BaseEvent
2020-09-30 08:00:05 +00:00
description string
shouldCheckPrevious bool
2020-10-05 20:02:59 +00:00
data func() interface{}
2020-09-30 08:00:05 +00:00
}
func newTestEvent(id, description string, data func() interface{}, checkPrevious bool) *testEvent {
2020-11-06 12:47:27 +00:00
return &testEvent{
description: description,
data: data,
shouldCheckPrevious: checkPrevious,
2020-11-06 16:25:07 +00:00
BaseEvent: *NewBaseEventForPush(
service.WithService(authz.NewMockContext("instanceID", "resourceOwner", "editorUser"), "editorService"),
NewAggregate(authz.NewMockContext("zitadel", "caos", "adlerhurst"), id, "test.aggregate", "v1"),
2020-11-06 16:25:07 +00:00
"test.event",
),
2020-11-06 12:47:27 +00:00
}
}
func (e *testEvent) Payload() interface{} {
2020-10-05 20:02:59 +00:00
return e.data()
2020-10-05 17:09:26 +00:00
}
func (e *testEvent) UniqueConstraints() []*UniqueConstraint {
return nil
}
func (e *testEvent) Assets() []*Asset {
return nil
}
func testFilterMapper(event Event) (Event, error) {
2020-11-06 12:47:27 +00:00
if event == nil {
return newTestEvent("testID", "hodor", nil, false), nil
2020-11-06 12:47:27 +00:00
}
return &testEvent{description: "hodor", BaseEvent: *BaseEventFromRepo(event)}, nil
2020-09-30 08:00:05 +00:00
}
func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
type fields struct {
eventMapper map[EventType]eventTypeInterceptors
}
type args struct {
eventType EventType
mapper func(Event) (Event, error)
2020-09-30 08:00:05 +00:00
}
type res struct {
event Event
2020-10-27 14:42:18 +00:00
mapperCount int
2020-09-30 08:00:05 +00:00
}
2020-09-24 06:52:10 +00:00
tests := []struct {
2020-09-30 08:00:05 +00:00
name string
fields fields
args args
res res
}{
{
name: "no event mapper",
args: args{
eventType: "event.type",
mapper: nil,
},
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
res: res{
2020-10-27 14:42:18 +00:00
mapperCount: 0,
2020-09-30 08:00:05 +00:00
},
},
{
name: "new interceptor",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
args: args{
eventType: "event.type",
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("testID", "hodor", nil, false),
2020-10-27 14:42:18 +00:00
mapperCount: 1,
2020-09-30 08:00:05 +00:00
},
},
{
name: "existing interceptor new filter mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {},
},
},
args: args{
eventType: "new.event",
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("testID", "hodor", nil, false),
2020-10-27 14:42:18 +00:00
mapperCount: 2,
2020-09-30 08:00:05 +00:00
},
},
{
name: "existing interceptor existing filter mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {
eventMapper: func(Event) (Event, error) {
return nil, zerrors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
2020-09-30 08:00:05 +00:00
},
},
},
},
args: args{
eventType: "new.event",
mapper: testFilterMapper,
},
res: res{
event: newTestEvent("testID", "hodor", nil, false),
2020-10-27 14:42:18 +00:00
mapperCount: 2,
2020-09-30 08:00:05 +00:00
},
},
}
2020-09-24 06:52:10 +00:00
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventInterceptors = tt.fields.eventMapper
RegisterFilterEventMapper("test", tt.args.eventType, tt.args.mapper)
if len(eventInterceptors) != tt.res.mapperCount {
t.Errorf("unexpected mapper count: want %d, got %d", tt.res.mapperCount, len(eventInterceptors))
2020-09-30 08:00:05 +00:00
return
}
2020-10-27 14:42:18 +00:00
if tt.res.mapperCount == 0 {
2020-09-30 08:00:05 +00:00
return
}
mapper := eventInterceptors[tt.args.eventType]
2020-10-23 14:16:46 +00:00
event, err := mapper.eventMapper(nil)
2020-09-30 08:00:05 +00:00
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(tt.res.event, event) {
2020-11-06 12:47:27 +00:00
t.Errorf("events should be deep equal. \ngot %#v\nwant %#v", event, tt.res.event)
2020-09-30 08:00:05 +00:00
}
2020-09-24 06:52:10 +00:00
})
}
}
2020-10-05 20:02:59 +00:00
func Test_eventData(t *testing.T) {
type args struct {
event Command
2020-10-05 20:02:59 +00:00
}
type res struct {
jsonText []byte
wantErr bool
}
tests := []struct {
name string
args args
res res
}{
{
name: "data as json bytes",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return []byte(`{"piff":"paff"}`)
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(`{"piff":"paff"}`),
wantErr: false,
},
},
{
name: "data as invalid json bytes",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return []byte(`{"piffpaff"}`)
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(nil),
wantErr: true,
},
},
{
name: "data as struct",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return struct {
Piff string `json:"piff"`
}{Piff: "paff"}
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(`{"piff":"paff"}`),
wantErr: false,
},
},
{
name: "data as ptr to struct",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return &struct {
Piff string `json:"piff"`
}{Piff: "paff"}
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(`{"piff":"paff"}`),
wantErr: false,
},
},
{
name: "no data",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return nil
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(nil),
wantErr: false,
},
},
{
name: "invalid because primitive",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
return ""
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(nil),
wantErr: true,
},
},
{
name: "invalid because pointer to primitive",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-05 20:02:59 +00:00
var s string
return &s
},
2020-11-06 12:47:27 +00:00
false),
2020-10-05 20:02:59 +00:00
},
res: res{
jsonText: []byte(nil),
wantErr: true,
},
},
2020-10-14 10:43:31 +00:00
{
name: "invalid because invalid struct for json",
args: args{
2020-11-06 12:47:27 +00:00
event: newTestEvent(
"id",
2020-11-06 12:47:27 +00:00
"hodor",
func() interface{} {
2020-10-14 10:43:31 +00:00
return struct {
Field chan string `json:"field"`
}{}
},
2020-11-06 12:47:27 +00:00
false),
2020-10-14 10:43:31 +00:00
},
res: res{
jsonText: []byte(nil),
wantErr: true,
},
},
2020-10-05 20:02:59 +00:00
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := EventData(tt.args.event)
2020-10-05 20:02:59 +00:00
if (err != nil) != tt.res.wantErr {
t.Errorf("EventData() error = %v, wantErr %v", err, tt.res.wantErr)
2020-10-05 20:02:59 +00:00
return
}
if !reflect.DeepEqual(got, tt.res.jsonText) {
t.Errorf("EventData() = %v, want %v", string(got), string(tt.res.jsonText))
2020-10-05 20:02:59 +00:00
}
})
}
}
2020-10-21 17:00:41 +00:00
var _ Pusher = (*testPusher)(nil)
func (repo *testPusher) Client() *database.DB {
return nil
}
type testPusher struct {
events []Event
errs []error
t *testing.T
}
func (repo *testPusher) Health(ctx context.Context) error {
return nil
}
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
func (repo *testPusher) Push(_ context.Context, _ database.ContextQueryExecuter, commands ...Command) (events []Event, err error) {
if len(repo.errs) != 0 {
err, repo.errs = repo.errs[0], repo.errs[1:]
return nil, err
2020-10-21 17:00:41 +00:00
}
if len(repo.events) != len(commands) {
repo.t.Errorf("length of events unequal want: %d got %d", len(repo.events), len(commands))
return nil, fmt.Errorf("")
2020-10-21 17:00:41 +00:00
}
events = make([]Event, len(commands))
2020-10-21 17:00:41 +00:00
for i := 0; i < len(repo.events); i++ {
2020-10-21 17:00:41 +00:00
var payload []byte
switch p := commands[i].Payload().(type) {
case []byte:
payload = p
case nil:
// don't do anything
default:
payload, err = json.Marshal(p)
if err != nil {
return nil, err
2020-10-21 17:00:41 +00:00
}
}
compareEvents(repo.t, repo.events[i], commands[i])
events[i] = &BaseEvent{
Seq: uint64(i),
Creation: time.Now(),
EventType: commands[i].Type(),
Data: payload,
User: commands[i].Creator(),
Agg: &Aggregate{
Version: commands[i].Aggregate().Version,
ID: commands[i].Aggregate().ID,
Type: commands[i].Aggregate().Type,
ResourceOwner: commands[i].Aggregate().ResourceOwner,
InstanceID: commands[i].Aggregate().InstanceID,
},
}
2020-10-21 17:00:41 +00:00
}
return events, nil
2020-10-21 17:00:41 +00:00
}
type testQuerier struct {
events []Event
sequence float64
instances []string
err error
t *testing.T
2020-10-21 17:00:41 +00:00
}
func (repo *testQuerier) Health(ctx context.Context) error {
return nil
}
func (repo *testQuerier) CreateInstance(ctx context.Context, instance string) error {
2020-10-21 17:00:41 +00:00
return nil
}
func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) {
2020-10-21 17:00:41 +00:00
if repo.err != nil {
2020-10-21 17:29:22 +00:00
return nil, repo.err
2020-10-21 17:00:41 +00:00
}
return repo.events, nil
}
func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, reduce Reducer) error {
if repo.err != nil {
return repo.err
}
for _, event := range repo.events {
if err := reduce(event); err != nil {
return err
}
}
return nil
}
func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
2020-10-21 17:00:41 +00:00
if repo.err != nil {
return 0, repo.err
2020-10-21 17:00:41 +00:00
}
return repo.sequence, nil
}
func (repo *testQuerier) InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error) {
if repo.err != nil {
return nil, repo.err
}
return repo.instances, nil
}
func (*testQuerier) Client() *database.DB {
return nil
}
2020-10-21 17:00:41 +00:00
func TestEventstore_Push(t *testing.T) {
type args struct {
events []Command
2020-10-21 17:00:41 +00:00
}
type fields struct {
maxRetries int
pusher *testPusher
eventMapper map[EventType]func(Event) (Event, error)
2020-10-21 17:00:41 +00:00
}
type res struct {
wantErr bool
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "one aggregate one event",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
2020-10-21 17:00:41 +00:00
},
false),
2020-10-21 17:00:41 +00:00
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
},
{
name: "one aggregate one event, retry disabled",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 0,
pusher: &testPusher{
2020-10-21 17:00:41 +00:00
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
2020-10-21 17:00:41 +00:00
},
},
},
},
{
name: "one aggregate multiple events",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
2020-10-21 17:00:41 +00:00
},
false),
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
2020-10-21 17:00:41 +00:00
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
2020-10-21 17:00:41 +00:00
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
},
2020-10-21 17:00:41 +00:00
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
2020-10-21 17:00:41 +00:00
},
},
},
res: res{
wantErr: false,
},
},
{
name: "multiple aggregates",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
2020-10-21 17:00:41 +00:00
},
false),
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
2020-10-21 17:00:41 +00:00
},
false),
newTestEvent(
"2",
"",
func() interface{} {
return []byte(nil)
},
true),
2020-10-21 17:00:41 +00:00
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
2020-10-21 17:00:41 +00:00
t: t,
events: combineEventLists(
[]Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
},
[]Event{
&BaseEvent{
Agg: &Aggregate{
ID: "2",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
2020-10-21 17:00:41 +00:00
},
},
),
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
2020-10-21 17:00:41 +00:00
},
},
},
res: res{
wantErr: false,
},
},
{
name: "push fails",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
2020-10-21 17:00:41 +00:00
},
false),
2020-10-21 17:00:41 +00:00
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
errs: []error{zerrors.ThrowInternal(nil, "V2-qaa4S", "test err")},
2020-10-21 17:00:41 +00:00
},
},
res: res{
wantErr: true,
},
},
{
name: "aggreagtes to events mapping fails",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return `{"data":""`
2020-10-21 17:00:41 +00:00
},
false),
2020-10-21 17:00:41 +00:00
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
errs: []error{zerrors.ThrowInternal(nil, "V2-qaa4S", "test err")},
2020-10-21 17:00:41 +00:00
},
},
res: res{
wantErr: true,
},
},
{
name: "retry succeeds",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
},
{
name: "retry fails",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 1,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
res: res{wantErr: true},
},
{
name: "retry disabled",
args: args{
events: []Command{
newTestEvent(
"1",
"",
func() interface{} {
return []byte(nil)
},
false),
},
},
fields: fields{
maxRetries: 0,
pusher: &testPusher{
t: t,
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "1",
Type: "test.aggregate",
ResourceOwner: "caos",
InstanceID: "zitadel",
refactor(eventstore): move push logic to sql (#8816) # Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/eventstore.go#L101) because [the locking behaviour](https://github.com/zitadel/zitadel/blob/48ffc902cc90237d693e7104fc742ee927478da7/internal/eventstore/v3/sequence.go#L25) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
2024-12-04 13:51:40 +00:00
Version: "v1",
},
Data: []byte(nil),
User: "editorUser",
EventType: "test.event",
},
},
errs: []error{
zerrors.ThrowInternal(&pgconn.PgError{
ConstraintName: "events2_pkey",
Code: "23505",
}, "foo-err", "Errors.Internal"),
},
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
return &testEvent{
BaseEvent: BaseEvent{
Agg: &Aggregate{
Type: e.Aggregate().Type,
},
},
}, nil
},
},
},
res: res{wantErr: true},
},
2020-10-21 17:00:41 +00:00
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventInterceptors = map[EventType]eventTypeInterceptors{}
2020-10-21 17:00:41 +00:00
es := &Eventstore{
maxRetries: tt.fields.maxRetries,
pusher: tt.fields.pusher,
2020-10-21 17:00:41 +00:00
}
for eventType, mapper := range tt.fields.eventMapper {
RegisterFilterEventMapper("test", eventType, mapper)
2020-10-27 14:42:18 +00:00
}
if len(eventInterceptors) != len(tt.fields.eventMapper) {
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
2020-10-27 14:42:18 +00:00
t.FailNow()
2020-10-21 17:00:41 +00:00
}
_, err := es.Push(context.Background(), tt.args.events...)
2020-10-21 17:00:41 +00:00
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
})
}
}
2020-10-21 17:29:22 +00:00
func TestEventstore_FilterEvents(t *testing.T) {
type args struct {
2020-12-01 13:44:19 +00:00
query *SearchQueryBuilder
2020-10-21 17:29:22 +00:00
}
type fields struct {
repo *testQuerier
eventMapper map[EventType]func(Event) (Event, error)
2020-10-21 17:29:22 +00:00
}
type res struct {
wantErr bool
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "no events",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
events: []Event{},
2020-10-21 17:29:22 +00:00
t: t,
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:29:22 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: false,
},
},
{
name: "repo error",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
2020-10-21 17:29:22 +00:00
t: t,
err: zerrors.ThrowInternal(nil, "V2-RfkBa", "test err"),
2020-10-21 17:29:22 +00:00
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:29:22 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: true,
},
},
{
name: "found events",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"test.aggregate"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "test.aggregate",
},
EventType: "test.event",
2020-10-21 17:29:22 +00:00
},
},
t: t,
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:29:22 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: false,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventInterceptors = map[EventType]eventTypeInterceptors{}
2020-10-21 17:29:22 +00:00
es := &Eventstore{
querier: tt.fields.repo,
2020-10-21 17:29:22 +00:00
}
2020-10-27 14:42:18 +00:00
2020-10-21 17:29:22 +00:00
for eventType, mapper := range tt.fields.eventMapper {
RegisterFilterEventMapper("test", eventType, mapper)
2020-10-27 14:42:18 +00:00
}
if len(eventInterceptors) != len(tt.fields.eventMapper) {
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
2020-10-27 14:42:18 +00:00
t.FailNow()
2020-10-21 17:29:22 +00:00
}
_, err := es.Filter(context.Background(), tt.args.query)
2020-10-21 17:29:22 +00:00
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
})
}
}
func TestEventstore_LatestSequence(t *testing.T) {
2020-10-21 17:29:22 +00:00
type args struct {
2020-12-01 13:44:19 +00:00
query *SearchQueryBuilder
2020-10-21 17:29:22 +00:00
}
type fields struct {
repo *testQuerier
2020-10-21 17:29:22 +00:00
}
type res struct {
wantErr bool
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "no events",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
events: []Event{},
2020-10-21 17:29:22 +00:00
t: t,
},
},
res: res{
wantErr: false,
},
},
{
name: "repo error",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
2020-10-21 17:29:22 +00:00
t: t,
err: zerrors.ThrowInternal(nil, "V2-RfkBa", "test err"),
2020-10-21 17:29:22 +00:00
},
},
res: res{
wantErr: true,
},
},
{
name: "found events",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsMaxSequence,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"test.aggregate"},
},
},
2020-10-21 17:29:22 +00:00
},
},
fields: fields{
repo: &testQuerier{
// sequence: time.Now(),
t: t,
2020-10-21 17:29:22 +00:00
},
},
res: res{
wantErr: false,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{
querier: tt.fields.repo,
2020-10-21 17:29:22 +00:00
}
_, err := es.LatestSequence(context.Background(), tt.args.query)
2020-10-21 17:29:22 +00:00
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
})
}
}
2020-10-21 17:45:23 +00:00
type testReducer struct {
t *testing.T
events []Event
2020-10-21 17:45:23 +00:00
expectedLength int
err error
}
func (r *testReducer) Reduce() error {
r.t.Helper()
if len(r.events) != r.expectedLength {
r.t.Errorf("wrong amount of append events wanted: %d, got %d", r.expectedLength, len(r.events))
}
if r.err != nil {
return r.err
}
return nil
}
func (r *testReducer) AppendEvents(e ...Event) {
2020-10-21 17:45:23 +00:00
r.events = append(r.events, e...)
}
func TestEventstore_FilterToReducer(t *testing.T) {
type args struct {
2020-12-01 13:44:19 +00:00
query *SearchQueryBuilder
2020-10-21 17:45:23 +00:00
readModel reducer
}
type fields struct {
repo *testQuerier
eventMapper map[EventType]func(Event) (Event, error)
2020-10-21 17:45:23 +00:00
}
type res struct {
wantErr bool
}
tests := []struct {
name string
args args
fields fields
res res
}{
{
name: "no events",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:45:23 +00:00
},
readModel: &testReducer{
t: t,
expectedLength: 0,
},
},
fields: fields{
repo: &testQuerier{
events: []Event{},
2020-10-21 17:45:23 +00:00
t: t,
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:45:23 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: false,
},
},
{
name: "repo error",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"no.aggregates"},
},
},
2020-10-21 17:45:23 +00:00
},
readModel: &testReducer{
t: t,
expectedLength: 0,
},
},
fields: fields{
repo: &testQuerier{
2020-10-21 17:45:23 +00:00
t: t,
err: zerrors.ThrowInternal(nil, "V2-RfkBa", "test err"),
2020-10-21 17:45:23 +00:00
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:45:23 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: true,
},
},
{
name: "found events",
args: args{
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"test.aggregate"},
},
},
},
2020-10-21 17:45:23 +00:00
readModel: &testReducer{
t: t,
expectedLength: 1,
},
},
fields: fields{
repo: &testQuerier{
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "test.aggregate",
},
EventType: "test.event",
2020-10-21 17:45:23 +00:00
},
},
t: t,
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:45:23 +00:00
return &testEvent{}, nil
},
},
},
},
{
name: "append in reducer fails",
args: args{
2020-12-01 13:44:19 +00:00
query: &SearchQueryBuilder{
columns: ColumnsEvent,
queries: []*SearchQuery{
{
builder: &SearchQueryBuilder{},
aggregateTypes: []AggregateType{"test.aggregate"},
},
},
2020-10-21 17:45:23 +00:00
},
readModel: &testReducer{
2020-11-26 08:19:14 +00:00
t: t,
err: zerrors.ThrowInvalidArgument(nil, "V2-W06TG", "test err"),
2020-11-26 08:19:14 +00:00
expectedLength: 1,
2020-10-21 17:45:23 +00:00
},
},
fields: fields{
repo: &testQuerier{
events: []Event{
&BaseEvent{
Agg: &Aggregate{
ID: "test.aggregate",
},
EventType: "test.event",
2020-10-21 17:45:23 +00:00
},
},
t: t,
},
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(e Event) (Event, error) {
2020-10-21 17:45:23 +00:00
return &testEvent{}, nil
},
},
},
res: res{
wantErr: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{
querier: tt.fields.repo,
2020-10-21 17:45:23 +00:00
}
for eventType, mapper := range tt.fields.eventMapper {
RegisterFilterEventMapper("test", eventType, mapper)
2020-10-27 14:42:18 +00:00
}
if len(eventInterceptors) != len(tt.fields.eventMapper) {
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
2020-10-27 14:42:18 +00:00
t.FailNow()
2020-10-21 17:45:23 +00:00
}
err := es.FilterToReducer(context.Background(), tt.args.query, tt.args.readModel)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
}
})
}
}
func combineEventLists(lists ...[]Event) []Event {
events := []Event{}
2020-10-21 17:00:41 +00:00
for _, list := range lists {
events = append(events, list...)
}
return events
}
func compareEvents(t *testing.T, want Event, got Command) {
2020-10-21 17:00:41 +00:00
t.Helper()
if want.Aggregate().ID != got.Aggregate().ID {
t.Errorf("wrong aggregateID got %q want %q", got.Aggregate().ID, want.Aggregate().ID)
2020-10-21 17:00:41 +00:00
}
if want.Aggregate().Type != got.Aggregate().Type {
t.Errorf("wrong aggregateType got %q want %q", got.Aggregate().Type, want.Aggregate().Type)
2020-10-21 17:00:41 +00:00
}
if !reflect.DeepEqual(want.DataAsBytes(), got.Payload()) {
t.Errorf("wrong data got %s want %s", string(got.Payload().([]byte)), string(want.DataAsBytes()))
2020-10-21 17:00:41 +00:00
}
if want.Creator() != got.Creator() {
t.Errorf("wrong creator got %q want %q", got.Creator(), want.Creator())
2020-10-21 17:00:41 +00:00
}
if want.Aggregate().ResourceOwner != got.Aggregate().ResourceOwner {
t.Errorf("wrong resource owner got %q want %q", got.Aggregate().ResourceOwner, want.Aggregate().ResourceOwner)
2020-10-21 17:00:41 +00:00
}
if want.Type() != got.Type() {
t.Errorf("wrong event type got %q want %q", got.Type(), want.Type())
2020-10-21 17:00:41 +00:00
}
if want.Revision() != got.Revision() {
t.Errorf("wrong version got %q want %q", got.Revision(), want.Revision())
2020-10-21 17:00:41 +00:00
}
}
2020-10-21 17:29:22 +00:00
func TestEventstore_mapEvents(t *testing.T) {
type fields struct {
eventMapper map[EventType]func(Event) (Event, error)
2020-10-21 17:29:22 +00:00
}
type args struct {
events []Event
2020-10-21 17:29:22 +00:00
}
type res struct {
events []Event
2020-10-21 17:29:22 +00:00
wantErr bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
name: "mapping failed",
args: args{
events: []Event{
&BaseEvent{
EventType: "test.event",
2020-10-21 17:29:22 +00:00
},
},
},
fields: fields{
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(Event) (Event, error) {
return nil, zerrors.ThrowInternal(nil, "V2-8FbQk", "test err")
2020-10-21 17:29:22 +00:00
},
},
},
res: res{
wantErr: true,
},
},
{
name: "mapping succeeded",
args: args{
events: []Event{
&BaseEvent{
EventType: "test.event",
2020-10-21 17:29:22 +00:00
},
},
},
fields: fields{
eventMapper: map[EventType]func(Event) (Event, error){
"test.event": func(Event) (Event, error) {
2020-10-21 17:29:22 +00:00
return &testEvent{}, nil
},
},
},
res: res{
events: []Event{
2020-10-21 17:29:22 +00:00
&testEvent{},
},
wantErr: false,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{}
2020-10-21 17:29:22 +00:00
for eventType, mapper := range tt.fields.eventMapper {
RegisterFilterEventMapper("test", eventType, mapper)
2020-10-27 14:42:18 +00:00
}
if len(eventInterceptors) != len(tt.fields.eventMapper) {
t.Errorf("register event mapper failed expected mapper amount: %d, got: %d", len(tt.fields.eventMapper), len(eventInterceptors))
2020-10-27 14:42:18 +00:00
t.FailNow()
2020-10-21 17:29:22 +00:00
}
gotMappedEvents, err := es.mapEvents(tt.args.events)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.mapEvents() error = %v, wantErr %v", err, tt.res.wantErr)
return
}
if !reflect.DeepEqual(gotMappedEvents, tt.res.events) {
t.Errorf("Eventstore.mapEvents() = %v, want %v", gotMappedEvents, tt.res.events)
}
})
}
}