diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index 76584b55b0..b38e890e66 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -2,9 +2,11 @@ package handler import ( "context" + "errors" "fmt" "time" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" @@ -63,9 +65,17 @@ func Start(ctx context.Context) { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("admin projection failed because of unique constraint, retrying") } logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done") } diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 74a27a8312..0c151bb412 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -2,9 +2,11 @@ package handler import ( "context" + "errors" "fmt" "time" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -78,9 +80,17 @@ func Projections() []*handler2.Handler { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("auth projection failed because of unique constraint, retrying") } logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done") } diff --git a/internal/database/database.go b/internal/database/database.go index e254edadc1..29bbb657c2 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -64,6 +64,10 @@ func CloseTransaction(tx Tx, err error) error { return commitErr } +const ( + PgUniqueConstraintErrorCode = "23505" +) + type Config struct { Dialects map[string]interface{} `mapstructure:",remain"` connector dialect.Connector diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go index f780b83cc6..3c25731c83 100644 --- a/internal/eventstore/handler/v2/field_handler.go +++ b/internal/eventstore/handler/v2/field_handler.go @@ -185,7 +185,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events)) highestPosition := events[len(events)-1].Position() for i, event := range events { - if event.Position() == highestPosition { + if event.Position().Equal(highestPosition) { offset++ } fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent) @@ -202,7 +202,7 @@ func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) position = event.Position() } offset++ - if event.Position() == currentState.position && + if event.Position().Equal(currentState.position) && event.Aggregate().ID == currentState.aggregateID && event.Aggregate().Type == currentState.aggregateType && event.Sequence() == currentState.sequence { diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index d6419c71ed..3e2a92bbc7 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -563,7 +563,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add currentState.sequence = statements[lastProcessedIndex].Sequence currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate - err = h.setState(tx, currentState) + setStateErr := h.setState(tx, currentState) + if setStateErr != nil { + err = setStateErr + } return additionalIteration, err } diff --git a/internal/eventstore/handler/v2/statement.go b/internal/eventstore/handler/v2/statement.go index b185c58221..5024c8c945 100644 --- a/internal/eventstore/handler/v2/statement.go +++ b/internal/eventstore/handler/v2/statement.go @@ -53,7 +53,7 @@ func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, curr return statements, err } offset++ - if previousPosition != event.Position() { + if !previousPosition.Equal(event.Position()) { // offset is 1 because we want to skip this event offset = 1 } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index f10cc1d25e..e26fc00a50 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -2,8 +2,10 @@ package projection import ( "context" + "errors" "fmt" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/logging" internal_authz "github.com/zitadel/zitadel/internal/api/authz" @@ -209,10 +211,19 @@ func Start(ctx context.Context) { func ProjectInstance(ctx context.Context) error { for i, projection := range projections { logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection") - _, err := projection.Trigger(ctx) - if err != nil { - return err + for { + _, err := projection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("projection failed because of unique constraint, retrying") } + logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("projection done") } return nil } @@ -220,11 +231,19 @@ func ProjectInstance(ctx context.Context) error { func ProjectInstanceFields(ctx context.Context) error { for i, fieldProjection := range fields { logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection") - err := fieldProjection.Trigger(ctx) - if err != nil { - return err + for { + err := fieldProjection.Trigger(ctx) + if err == nil { + break + } + var pgErr *pgconn.PgError + errors.As(err, &pgErr) + if pgErr.Code != database.PgUniqueConstraintErrorCode { + return err + } + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying") } - logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done") + logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done") } return nil }