fix: eventstore v2 insert statement

This commit is contained in:
adlerhurst
2020-10-06 20:20:23 +02:00
parent 9342efa834
commit 46a68c15bf
2 changed files with 117 additions and 58 deletions

View File

@@ -30,8 +30,7 @@ const (
" previous_sequence, " + " previous_sequence, " +
" check_previous, " + " check_previous, " +
// variables below are calculated // variables below are calculated
" max_event_seq, " + " max_event_seq " +
" event_count " +
") " + ") " +
" AS( " + " AS( " +
" SELECT " + " SELECT " +
@@ -46,8 +45,7 @@ const (
" $9::VARCHAR, " + " $9::VARCHAR, " +
" $10::BIGINT, " + " $10::BIGINT, " +
" $11::BOOLEAN," + " $11::BOOLEAN," +
" MAX(event_sequence) AS max_event_seq, " + " MAX(event_sequence) AS max_event_seq " +
" COUNT(*) AS event_count " +
" FROM eventstore.events " + " FROM eventstore.events " +
" WHERE " + " WHERE " +
" aggregate_type = $2::VARCHAR " + " aggregate_type = $2::VARCHAR " +
@@ -80,25 +78,23 @@ const (
" ( " + " ( " +
" SELECT " + " SELECT " +
" CASE " + " CASE " +
" WHEN NOT check_previous THEN " + " WHEN NOT check_previous " +
" max_event_seq " + " THEN max_event_seq " +
" ELSE " + " ELSE previous_sequence " +
" previous_sequence " +
" END" + " END" +
" ) " + " ) " +
" FROM input_event " + " FROM input_event " +
" WHERE EXISTS ( " + " WHERE 1 = " +
" SELECT " +
" CASE " + " CASE " +
" WHEN NOT check_previous THEN 1 " + " WHEN NOT check_previous " +
" THEN 1 " +
" ELSE ( " + " ELSE ( " +
" SELECT 1 FROM input_event " + " SELECT 1 FROM input_event " +
" WHERE max_event_seq = previous_sequence OR (previous_sequence IS NULL AND event_count = 0) " + " WHERE (max_event_seq IS NULL AND previous_sequence IS NULL) OR (max_event_seq IS NOT NULL AND max_event_seq = previous_sequence) " +
" ) " + " ) " +
" END " + " END " +
" ) " +
" ) " + " ) " +
"RETURNING id, event_sequence, creation_date " "RETURNING id, event_sequence, previous_sequence, creation_date "
) )
type CRDB struct { type CRDB struct {
@@ -118,9 +114,9 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed") return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed")
} }
for _, event := range events { for _, event := range events {
previousSequence := event.PreviousSequence previousSequence := Sequence(event.PreviousSequence)
if event.PreviousEvent != nil { if event.PreviousEvent != nil {
previousSequence = event.PreviousSequence previousSequence = Sequence(event.PreviousEvent.Sequence)
} }
err = stmt.QueryRowContext(ctx, err = stmt.QueryRowContext(ctx,
event.Type, event.Type,
@@ -135,9 +131,11 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
event.EditorUser, event.EditorUser,
event.EditorService, event.EditorService,
event.ResourceOwner, event.ResourceOwner,
Sequence(previousSequence), previousSequence,
event.CheckPreviousSequence, event.CheckPreviousSequence,
).Scan(&event.ID, &event.Sequence, &event.CreationDate) ).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate)
event.PreviousSequence = uint64(previousSequence)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
@@ -180,7 +178,7 @@ func (db *CRDB) LatestSequence(ctx context.Context, searchQuery *repository.Sear
return uint64(seq), nil return uint64(seq), nil
} }
func (db *CRDB) query(searchQuery *repository.SearchQuery, data interface{}) error { func (db *CRDB) query(searchQuery *repository.SearchQuery, dest interface{}) error {
query, values, rowScanner := buildQuery(db, searchQuery) query, values, rowScanner := buildQuery(db, searchQuery)
if query == "" { if query == "" {
return caos_errs.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory") return caos_errs.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
@@ -194,7 +192,7 @@ func (db *CRDB) query(searchQuery *repository.SearchQuery, data interface{}) err
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
err = rowScanner(rows.Scan, nil) err = rowScanner(rows.Scan, dest)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -48,27 +48,29 @@ func TestMain(m *testing.M) {
func TestInsert(t *testing.T) { func TestInsert(t *testing.T) {
crdb := &CRDB{client: db} crdb := &CRDB{client: db}
e1 := &repository.Event{
AggregateID: "agg.id",
AggregateType: "agg.type",
CheckPreviousSequence: true,
EditorService: "edi.svc",
EditorUser: "edi",
ResourceOwner: "edit",
Type: "type",
Version: "v1",
}
events := []*repository.Event{ events := []*repository.Event{
e1,
{ {
AggregateID: "agg.id", AggregateID: "agg.id",
AggregateType: "agg.type", AggregateType: "agg.type",
CheckPreviousSequence: false, CheckPreviousSequence: true,
EditorService: "edi.svc",
EditorUser: "edi",
ResourceOwner: "edit",
Type: "type",
Version: "v1",
},
{
AggregateID: "agg.id",
AggregateType: "agg.type",
CheckPreviousSequence: false,
EditorService: "edi.svc", EditorService: "edi.svc",
EditorUser: "edi", EditorUser: "edi",
ResourceOwner: "edit", ResourceOwner: "edit",
Type: "type", Type: "type",
Version: "v1", Version: "v1",
CreationDate: time.Now().Add(-2 * time.Second), CreationDate: time.Now().Add(-2 * time.Second),
PreviousEvent: e1,
}, },
{ {
AggregateID: "agg.id", AggregateID: "agg.id",
@@ -81,12 +83,66 @@ func TestInsert(t *testing.T) {
Version: "v1", Version: "v1",
CreationDate: time.Now().Add(-500 * time.Millisecond), CreationDate: time.Now().Add(-500 * time.Millisecond),
}, },
{
AggregateID: "agg.id2",
AggregateType: "agg.type",
CheckPreviousSequence: true,
EditorService: "edi.svc",
EditorUser: "edi",
ResourceOwner: "edit",
Type: "type",
Version: "v1",
CreationDate: time.Now().Add(-500 * time.Millisecond),
},
{
AggregateID: "agg.id3",
AggregateType: "agg.type",
CheckPreviousSequence: false,
EditorService: "edi.svc",
EditorUser: "edi",
ResourceOwner: "edit",
Type: "type",
Version: "v1",
CreationDate: time.Now().Add(-500 * time.Millisecond),
},
{
AggregateID: "agg.id",
AggregateType: "agg.type",
CheckPreviousSequence: false,
EditorService: "edi.svc",
EditorUser: "edi",
ResourceOwner: "edit",
Type: "type",
Version: "v1",
CreationDate: time.Now().Add(-500 * time.Millisecond),
},
// {
// AggregateID: "agg.id4",
// AggregateType: "agg.type",
// CheckPreviousSequence: false,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// PreviousEvent: e1,
// },
//fail because wrong previous event
// {
// AggregateID: "agg.id2",
// AggregateType: "agg.type",
// CheckPreviousSequence: true,
// EditorService: "edi.svc",
// EditorUser: "edi",
// ResourceOwner: "edit",
// Type: "type",
// Version: "v1",
// CreationDate: time.Now().Add(-500 * time.Millisecond),
// PreviousEvent: e1,
// },
} }
fmt.Println("==============") fmt.Println("==============")
fmt.Println("will insert ts:")
for _, event := range events {
fmt.Printf("%v | %v\n", event.CreationDate, event.ID)
}
err := crdb.Push(context.Background(), events...) err := crdb.Push(context.Background(), events...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@@ -95,35 +151,14 @@ func TestInsert(t *testing.T) {
fmt.Println("inserted ts:") fmt.Println("inserted ts:")
for _, event := range events { for _, event := range events {
fmt.Printf("%v | %v\n", event.CreationDate, event.ID) fmt.Printf("%+v\n", event)
} }
fmt.Println("==============") fmt.Println("====================")
for _, event := range events {
fmt.Printf("%+v", event)
}
// tx, _ := db.Begin()
// var seq Sequence
// var d time.Time
// row := tx.QueryRow(crdbInsert, "event.type", "aggregate.type", "aggregate.id", repository.Version("v1"), nil, Data(nil), "editor.user", "editor.service", "resource.owner", Sequence(0), false)
// err := row.Scan(&seq, &d)
// row = tx.QueryRow(crdbInsert, "event.type", "aggregate.type", "aggregate.id", repository.Version("v1"), nil, Data(nil), "editor.user", "editor.service", "resource.owner", Sequence(1), true)
// err = row.Scan(&seq, &d)
// row = tx.QueryRow(crdbInsert, "event.type", "aggregate.type", "aggregate.id", repository.Version("v1"), nil, Data(nil), "editor.user", "editor.service", "resource.owner", Sequence(0), false)
// err = row.Scan(&seq, &d)
// tx.Commit()
rows, err := db.Query("select * from eventstore.events order by event_sequence") rows, err := db.Query("select * from eventstore.events order by event_sequence")
defer rows.Close() defer rows.Close()
fmt.Println(err) fmt.Println(err)
fmt.Println(rows.Columns())
for rows.Next() { for rows.Next() {
i := make([]interface{}, 12) i := make([]interface{}, 12)
var id string var id string
@@ -132,7 +167,33 @@ func TestInsert(t *testing.T) {
fmt.Println(i) fmt.Println(i)
} }
fmt.Println("====================")
filtededEvents, err := crdb.Filter(context.Background(), &repository.SearchQuery{
Columns: repository.Columns_Event,
Filters: []*repository.Filter{
{
Field: repository.Field_AggregateType,
Operation: repository.Operation_Equals,
Value: repository.AggregateType("agg.type"),
},
},
})
fmt.Println(err)
for _, event := range filtededEvents {
fmt.Printf("%+v\n", event)
}
fmt.Println("====================")
rows, err = db.Query("select max(event_sequence), count(*) from eventstore.events where aggregate_type = 'agg.type' and aggregate_id = 'agg.id'")
defer rows.Close()
fmt.Println(err)
for rows.Next() {
i := make([]interface{}, 2)
rows.Scan(&i[0], &i[1])
fmt.Println(i)
}
t.Fail() t.Fail()
} }