mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 06:07:33 +00:00
fix(eventstore): improve insert statement (#408)
This commit is contained in:
@@ -27,9 +27,8 @@ var (
|
|||||||
expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
|
expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
|
||||||
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence\) ` +
|
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence\) ` +
|
||||||
`SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10 ` +
|
`SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10 ` +
|
||||||
`WHERE EXISTS \(SELECT 1 WHERE ` +
|
`WHERE EXISTS \(` +
|
||||||
`EXISTS \(SELECT 1 FROM eventstore\.events WHERE event_sequence = COALESCE\(\$11, 0\) AND aggregate_type = \$12 AND aggregate_id = \$13\) OR ` +
|
`SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$11 AND aggregate_id = \$12 HAVING MAX\(event_sequence\) = \$13 OR \(\$14::BIGINT IS NULL AND COUNT\(\*\) = 0\)\) ` +
|
||||||
`NOT EXISTS \(SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$14 AND aggregate_id = \$15\) AND COALESCE\(\$16, 0\) = 0\) ` +
|
|
||||||
`RETURNING id, event_sequence, creation_date`).String()
|
`RETURNING id, event_sequence, creation_date`).String()
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -103,8 +102,7 @@ func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returned
|
|||||||
db.mock.ExpectQuery(expectedInsertStatement).
|
db.mock.ExpectQuery(expectedInsertStatement).
|
||||||
WithArgs(
|
WithArgs(
|
||||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
||||||
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
|
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence),
|
||||||
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
|
|
||||||
).
|
).
|
||||||
WillReturnRows(
|
WillReturnRows(
|
||||||
sqlmock.NewRows([]string{"id", "event_sequence", "creation_date"}).
|
sqlmock.NewRows([]string{"id", "event_sequence", "creation_date"}).
|
||||||
@@ -118,8 +116,7 @@ func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock {
|
|||||||
db.mock.ExpectQuery(expectedInsertStatement).
|
db.mock.ExpectQuery(expectedInsertStatement).
|
||||||
WithArgs(
|
WithArgs(
|
||||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
||||||
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
|
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence),
|
||||||
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
|
|
||||||
).
|
).
|
||||||
WillReturnError(sql.ErrTxDone)
|
WillReturnError(sql.ErrTxDone)
|
||||||
|
|
||||||
|
@@ -11,15 +11,14 @@ import (
|
|||||||
"github.com/cockroachdb/cockroach-go/v2/crdb"
|
"github.com/cockroachdb/cockroach-go/v2/crdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const insertStmt = "INSERT INTO eventstore.events " +
|
const (
|
||||||
|
insertStmt = "INSERT INTO eventstore.events " +
|
||||||
"(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " +
|
"(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " +
|
||||||
"SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " +
|
"SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " +
|
||||||
"WHERE EXISTS (SELECT 1 WHERE " +
|
"WHERE EXISTS (" +
|
||||||
// exactly one event of requested aggregate must have the given previous sequence as sequence (last inserted event)
|
"SELECT 1 FROM eventstore.events WHERE aggregate_type = $11 AND aggregate_id = $12 HAVING MAX(event_sequence) = $13 OR ($14::BIGINT IS NULL AND COUNT(*) = 0)) " +
|
||||||
"EXISTS (SELECT 1 FROM eventstore.events WHERE event_sequence = COALESCE($11, 0) AND aggregate_type = $12 AND aggregate_id = $13) OR " +
|
|
||||||
// if previous sequence = 0, no events must exist for the requested aggregate
|
|
||||||
"NOT EXISTS (SELECT 1 FROM eventstore.events WHERE aggregate_type = $14 AND aggregate_id = $15) AND COALESCE($16, 0) = 0) " +
|
|
||||||
"RETURNING id, event_sequence, creation_date"
|
"RETURNING id, event_sequence, creation_date"
|
||||||
|
)
|
||||||
|
|
||||||
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
|
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
|
||||||
err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
|
err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
|
||||||
@@ -70,8 +69,7 @@ func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error {
|
|||||||
func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error {
|
func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence,
|
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence,
|
||||||
previousSequence, event.AggregateType, event.AggregateID,
|
event.AggregateType, event.AggregateID, previousSequence, previousSequence)
|
||||||
event.AggregateType, event.AggregateID, previousSequence)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logging.Log("SQL-EXA0q").WithError(err).Info("query failed")
|
logging.Log("SQL-EXA0q").WithError(err).Info("query failed")
|
||||||
|
Reference in New Issue
Block a user