diff --git a/internal/eventstore/internal/repository/sql/db_mock_test.go b/internal/eventstore/internal/repository/sql/db_mock_test.go index be704c6d1d..328535162f 100644 --- a/internal/eventstore/internal/repository/sql/db_mock_test.go +++ b/internal/eventstore/internal/repository/sql/db_mock_test.go @@ -27,9 +27,8 @@ var ( expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` + `\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence\) ` + `SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10 ` + - `WHERE EXISTS \(SELECT 1 WHERE ` + - `EXISTS \(SELECT 1 FROM eventstore\.events WHERE event_sequence = COALESCE\(\$11, 0\) AND aggregate_type = \$12 AND aggregate_id = \$13\) OR ` + - `NOT EXISTS \(SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$14 AND aggregate_id = \$15\) AND COALESCE\(\$16, 0\) = 0\) ` + + `WHERE EXISTS \(` + + `SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$11 AND aggregate_id = \$12 HAVING MAX\(event_sequence\) = \$13 OR \(\$14::BIGINT IS NULL AND COUNT\(\*\) = 0\)\) ` + `RETURNING id, event_sequence, creation_date`).String() ) @@ -103,8 +102,7 @@ func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returned db.mock.ExpectQuery(expectedInsertStatement). WithArgs( e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence), - Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID, - e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), + e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence), ). WillReturnRows( sqlmock.NewRows([]string{"id", "event_sequence", "creation_date"}). @@ -118,8 +116,7 @@ func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock { db.mock.ExpectQuery(expectedInsertStatement). WithArgs( e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence), - Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID, - e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), + e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence), ). WillReturnError(sql.ErrTxDone) diff --git a/internal/eventstore/internal/repository/sql/push.go b/internal/eventstore/internal/repository/sql/push.go index cb19ac4d40..c30f02b9b7 100644 --- a/internal/eventstore/internal/repository/sql/push.go +++ b/internal/eventstore/internal/repository/sql/push.go @@ -11,15 +11,14 @@ import ( "github.com/cockroachdb/cockroach-go/v2/crdb" ) -const insertStmt = "INSERT INTO eventstore.events " + - "(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " + - "SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " + - "WHERE EXISTS (SELECT 1 WHERE " + - // exactly one event of requested aggregate must have the given previous sequence as sequence (last inserted event) - "EXISTS (SELECT 1 FROM eventstore.events WHERE event_sequence = COALESCE($11, 0) AND aggregate_type = $12 AND aggregate_id = $13) OR " + - // if previous sequence = 0, no events must exist for the requested aggregate - "NOT EXISTS (SELECT 1 FROM eventstore.events WHERE aggregate_type = $14 AND aggregate_id = $15) AND COALESCE($16, 0) = 0) " + - "RETURNING id, event_sequence, creation_date" +const ( + insertStmt = "INSERT INTO eventstore.events " + + "(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " + + "SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " + + "WHERE EXISTS (" + + "SELECT 1 FROM eventstore.events WHERE aggregate_type = $11 AND aggregate_id = $12 HAVING MAX(event_sequence) = $13 OR ($14::BIGINT IS NULL AND COUNT(*) = 0)) " + + "RETURNING id, event_sequence, creation_date" +) func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) { err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error { @@ -70,8 +69,7 @@ func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error { func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error { for _, event := range events { rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence, - previousSequence, event.AggregateType, event.AggregateID, - event.AggregateType, event.AggregateID, previousSequence) + event.AggregateType, event.AggregateID, previousSequence, previousSequence) if err != nil { logging.Log("SQL-EXA0q").WithError(err).Info("query failed")