From 046a6b110835b23c1fcbccbfa6e5dde7e3bb45b7 Mon Sep 17 00:00:00 2001 From: Silvan <27845747+adlerhurst@users.noreply.github.com> Date: Mon, 26 May 2025 12:00:16 +0200 Subject: [PATCH] fix(handler): report error correctly (#9921) # Which Problems Are Solved 1. The projection handler reported no error if an error happened but updating the current state was successful. This can lead to skipped projections during setup as soon as the projection has an error but does not correctly report if to the caller. 2. Mirror projections skipped as soon as an error occures, this leads to unprojected projections. 3. Mirror checked position wrongly in some cases # How the Problems Are Solved 1. the error returned by the `Trigger` method will will only be set to the error of updating current states if there occured an error. 2. triggering projections checks for the error type returned and retries if the error had code `23505` 3. Corrected to use the `Equal` method # Additional Changes unify logging on mirror projections --- .../eventsourcing/handler/handler.go | 16 +++++++-- .../eventsourcing/handler/handler.go | 16 +++++++-- internal/database/database.go | 4 +++ .../eventstore/handler/v2/field_handler.go | 4 +-- internal/eventstore/handler/v2/handler.go | 5 ++- internal/eventstore/handler/v2/statement.go | 2 +- internal/query/projection/projection.go | 33 +++++++++++++++---- 7 files changed, 63 insertions(+), 17 deletions(-) diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index 76584b55b0..b38e890e66 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -2,9 +2,11 @@ package handler import ( "context" + "errors" "fmt" "time" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" @@ -63,9 +65,17 @@ func Start(ctx context.Context) { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("admin projection failed because of unique constraint, retrying") } logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done") } diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 74a27a8312..0c151bb412 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -2,9 +2,11 @@ package handler import ( "context" + "errors" "fmt" "time" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -78,9 +80,17 @@ func Projections() []*handler2.Handler { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("auth projection failed because of unique constraint, retrying") } logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done") } diff --git a/internal/database/database.go b/internal/database/database.go index b86a9f247c..37e641c0b3 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -64,6 +64,10 @@ func CloseTransaction(tx Tx, err error) error { return commitErr } +const ( + PgUniqueConstraintErrorCode = "23505" +) + type Config struct { Dialects map[string]interface{} `mapstructure:",remain"` EventPushConnRatio float64 diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index a34e8f8731..c8bf97b37c 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -181,7 +181,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events)) highestPosition := events[len(events)-1].Position() for i, event := range events { - if event.Position() == highestPosition { + if event.Position().Equal(highestPosition) { offset++ } fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent) @@ -198,7 +198,7 @@ func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) position = event.Position() } offset++ - if event.Position() == currentState.position && + if event.Position().Equal(currentState.position) && event.Aggregate().ID == currentState.aggregateID && event.Aggregate().Type == currentState.aggregateType && event.Sequence() == currentState.sequence { diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 73754791c2..667d96b2a3 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -563,7 +563,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add currentState.sequence = statements[lastProcessedIndex].Sequence currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate - err = h.setState(tx, currentState) + setStateErr := h.setState(tx, currentState) + if setStateErr != nil { + err = setStateErr + } return additionalIteration, err } diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index 8fc686b6fc..9ae1bc01ea 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -53,7 +53,7 @@ func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, curr return statements, err } offset++ - if previousPosition != event.Position() { + if !previousPosition.Equal(event.Position()) { // offset is 1 because we want to skip this event offset = 1 } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index cdba791c29..31e476a1d0 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -2,8 +2,10 @@ package projection import ( "context" + "errors" "fmt" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" internal_authz "github.com/zitadel/zitadel/internal/api/authz" @@ -207,10 +209,19 @@ func Start(ctx context.Context) { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("projection failed because of unique constraint, retrying") } + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("projection done") } return nil } @@ -218,11 +229,19 @@ func ProjectInstance(ctx context.Context) error { func ProjectInstanceFields(ctx context.Context) error { for i, fieldProjection := range fields { logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection") - err := fieldProjection.Trigger(ctx) - if err != nil { - return err + for { + err := fieldProjection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying") } - logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done") + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done") } return nil }