fix(handler): report error correctly (#9925)

# Which Problems Are Solved

1. The projection handler reported no error if an error happened but
updating the current state was successful. This can lead to skipped
projections during setup as soon as the projection has an error but does
not correctly report if to the caller.

2. Mirror projections skipped as soon as an error occures, this leads to
unprojected projections.

3. Mirror checked position wrongly in some cases

# How the Problems Are Solved

1. the error returned by the `Trigger` method will will only be set to
the error of updating current states if there occured an error.

2. triggering projections checks for the error type returned and retries
if the error had code `23505`

3. Corrected to use the `Equal` method

# Additional Changes

unify logging on mirror projections
This commit is contained in:
Silvan
2025-05-26 12:02:30 +02:00
committed by GitHub
parent f3338cdb9a
commit b8955590b4
7 changed files with 63 additions and 17 deletions

View File

@@ -2,9 +2,11 @@ package handler
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
@@ -63,9 +65,17 @@ func Start(ctx context.Context) {
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
_, err := projection.Trigger(ctx)
if err != nil {
return err
for {
_, err := projection.Trigger(ctx)
if err == nil {
break
}
var pgErr *pgconn.PgError
errors.As(err, &pgErr)
if pgErr.Code != database.PgUniqueConstraintErrorCode {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("admin projection failed because of unique constraint, retrying")
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
}

View File

@@ -2,9 +2,11 @@ package handler
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
@@ -78,9 +80,17 @@ func Projections() []*handler2.Handler {
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
_, err := projection.Trigger(ctx)
if err != nil {
return err
for {
_, err := projection.Trigger(ctx)
if err == nil {
break
}
var pgErr *pgconn.PgError
errors.As(err, &pgErr)
if pgErr.Code != database.PgUniqueConstraintErrorCode {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("auth projection failed because of unique constraint, retrying")
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
}

View File

@@ -64,6 +64,10 @@ func CloseTransaction(tx Tx, err error) error {
return commitErr
}
const (
PgUniqueConstraintErrorCode = "23505"
)
type Config struct {
Dialects map[string]interface{} `mapstructure:",remain"`
connector dialect.Connector

View File

@@ -185,7 +185,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events))
highestPosition := events[len(events)-1].Position()
for i, event := range events {
if event.Position() == highestPosition {
if event.Position().Equal(highestPosition) {
offset++
}
fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent)
@@ -202,7 +202,7 @@ func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state)
position = event.Position()
}
offset++
if event.Position() == currentState.position &&
if event.Position().Equal(currentState.position) &&
event.Aggregate().ID == currentState.aggregateID &&
event.Aggregate().Type == currentState.aggregateType &&
event.Sequence() == currentState.sequence {

View File

@@ -563,7 +563,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
currentState.sequence = statements[lastProcessedIndex].Sequence
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
err = h.setState(tx, currentState)
setStateErr := h.setState(tx, currentState)
if setStateErr != nil {
err = setStateErr
}
return additionalIteration, err
}

View File

@@ -53,7 +53,7 @@ func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, curr
return statements, err
}
offset++
if previousPosition != event.Position() {
if !previousPosition.Equal(event.Position()) {
// offset is 1 because we want to skip this event
offset = 1
}

View File

@@ -2,8 +2,10 @@ package projection
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
@@ -209,10 +211,19 @@ func Start(ctx context.Context) {
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
_, err := projection.Trigger(ctx)
if err != nil {
return err
for {
_, err := projection.Trigger(ctx)
if err == nil {
break
}
var pgErr *pgconn.PgError
errors.As(err, &pgErr)
if pgErr.Code != database.PgUniqueConstraintErrorCode {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("projection failed because of unique constraint, retrying")
}
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("projection done")
}
return nil
}
@@ -220,11 +231,19 @@ func ProjectInstance(ctx context.Context) error {
func ProjectInstanceFields(ctx context.Context) error {
for i, fieldProjection := range fields {
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
err := fieldProjection.Trigger(ctx)
if err != nil {
return err
for {
err := fieldProjection.Trigger(ctx)
if err == nil {
break
}
var pgErr *pgconn.PgError
errors.As(err, &pgErr)
if pgErr.Code != database.PgUniqueConstraintErrorCode {
return err
}
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying")
}
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done")
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done")
}
return nil
}