fix(eventstore): resource owner from previous event

This commit is contained in:
adlerhurst 2020-10-22 18:13:31 +02:00
parent 0f855c86cf
commit b6ed7a396c
2 changed files with 221 additions and 21 deletions

View File

@ -31,25 +31,46 @@ const (
" check_previous, " +
// variables below are calculated
" max_event_seq " +
") " +
" AS( " +
" SELECT " +
" $1::VARCHAR," +
" $2::VARCHAR," +
" $3::VARCHAR," +
" $4::VARCHAR," +
" COALESCE($5::TIMESTAMPTZ, NOW()), " +
" $6::JSONB, " +
" $7::VARCHAR, " +
" $8::VARCHAR, " +
" $9::VARCHAR, " +
" $10::BIGINT, " +
" $11::BOOLEAN," +
" MAX(event_sequence) AS max_event_seq " +
" FROM eventstore.events " +
" WHERE " +
" aggregate_type = $2::VARCHAR " +
" AND aggregate_id = $3::VARCHAR " +
") AS ( " +
" ( " +
//the following select will return no row if no previous event defined
" SELECT " +
" $1::VARCHAR, " +
" $2::VARCHAR, " +
" $3::VARCHAR, " +
" $4::VARCHAR, " +
" COALESCE($5::TIMESTAMPTZ, NOW()), " +
" $6::JSONB, " +
" $7::VARCHAR, " +
" $8::VARCHAR, " +
" resource_owner, " +
" $10::BIGINT, " +
" $11::BOOLEAN," +
" MAX(event_sequence) AS max_event_seq " +
" FROM eventstore.events " +
" WHERE " +
" aggregate_type = $2::VARCHAR " +
" AND aggregate_id = $3::VARCHAR " +
" GROUP BY resource_owner " +
" ) UNION ALL (" +
// if no previous event we use the given data
" VALUES (" +
" $1::VARCHAR, " +
" $2::VARCHAR, " +
" $3::VARCHAR, " +
" $4::VARCHAR, " +
" COALESCE($5::TIMESTAMPTZ, NOW()), " +
" $6::JSONB, " +
" $7::VARCHAR, " +
" $8::VARCHAR, " +
" $9::VARCHAR, " +
" $10::BIGINT, " +
" $11::BOOLEAN, " +
" NULL::BIGINT " +
" ) " +
" ) " +
// ensure only 1 row in input_event
" LIMIT 1 " +
") " +
"INSERT INTO eventstore.events " +
" ( " +
@ -94,7 +115,7 @@ const (
" ) " +
" END " +
" ) " +
"RETURNING id, event_sequence, previous_sequence, creation_date "
"RETURNING id, event_sequence, previous_sequence, creation_date, resource_owner "
)
type CRDB struct {
@ -136,7 +157,7 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
event.ResourceOwner,
previousSequence,
event.CheckPreviousSequence,
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate)
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate, &event.ResourceOwner)
event.PreviousSequence = uint64(previousSequence)

View File

@ -995,6 +995,185 @@ func TestCRDB_LatestSequence(t *testing.T) {
}
}
func TestCRDB_Push_ResourceOwner(t *testing.T) {
type args struct {
events []*repository.Event
}
type res struct {
resourceOwners []string
}
type fields struct {
aggregateIDs []string
aggregateType string
}
tests := []struct {
name string
args args
res res
fields fields
}{
{
name: "two events of same aggregate same resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "500", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "500", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
},
},
fields: fields{
aggregateIDs: []string{"500"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "caos"},
},
},
{
name: "two events of different aggregate same resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "501", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "502", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
},
},
fields: fields{
aggregateIDs: []string{"501", "502"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "caos"},
},
},
{
name: "two events of different aggregate different resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "503", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "504", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
},
},
fields: fields{
aggregateIDs: []string{"503", "504"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "zitadel"},
},
},
{
name: "events of different aggregate different resource owner",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "505", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "505", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
),
linkEvents(
generateEvent(t, "506", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "506", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
),
),
},
fields: fields{
aggregateIDs: []string{"505", "506"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "caos", "zitadel", "zitadel"},
},
},
{
name: "events of different aggregate different resource owner per event",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "507", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "507", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
linkEvents(
generateEvent(t, "508", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "508", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
),
},
fields: fields{
aggregateIDs: []string{"507", "508"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "caos", "zitadel", "zitadel"},
},
},
{
name: "events of one aggregate different resource owner per event",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
linkEvents(
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
),
},
fields: fields{
aggregateIDs: []string{"509"},
aggregateType: t.Name(),
},
res: res{
resourceOwners: []string{"caos", "caos", "caos", "caos"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
if err := db.Push(context.Background(), tt.args.events...); err != nil {
t.Errorf("CRDB.Push() error = %v", err)
}
if len(tt.args.events) != len(tt.res.resourceOwners) {
t.Errorf("length of events (%d) and resource owners (%d) must be equal", len(tt.args.events), len(tt.res.resourceOwners))
return
}
for i, event := range tt.args.events {
if event.ResourceOwner != tt.res.resourceOwners[i] {
t.Errorf("resource owner not expected want: %q got: %q", tt.res.resourceOwners[i], event.ResourceOwner)
}
}
rows, err := testCRDBClient.Query("SELECT resource_owner FROM eventstore.events WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY event_sequence", tt.fields.aggregateType, pq.Array(tt.fields.aggregateIDs))
if err != nil {
t.Error("unable to query inserted rows: ", err)
return
}
eventCount := 0
for i := 0; rows.Next(); i++ {
var resourceOwner string
err = rows.Scan(&resourceOwner)
if err != nil {
t.Error("unable to scan row: ", err)
return
}
if resourceOwner != tt.res.resourceOwners[i] {
t.Errorf("unexpected resource owner in queried event. want %q, got: %q", tt.res.resourceOwners[i], resourceOwner)
}
eventCount++
}
if eventCount != len(tt.res.resourceOwners) {
t.Errorf("wrong queried event count: want %d, got %d", len(tt.res.resourceOwners), eventCount)
}
})
}
}
func canceledCtx() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()