mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 20:37:30 +00:00
fix(handler): report error correctly (#9926)
# Which Problems Are Solved 1. The projection handler reported no error if an error happened but updating the current state was successful. This can lead to skipped projections during setup as soon as the projection has an error but does not correctly report if to the caller. 2. Mirror projections skipped as soon as an error occures, this leads to unprojected projections. 3. Mirror checked position wrongly in some cases # How the Problems Are Solved 1. the error returned by the `Trigger` method will will only be set to the error of updating current states if there occured an error. 2. triggering projections checks for the error type returned and retries if the error had code `23505` 3. Corrected to use the `Equal` method # Additional Changes unify logging on mirror projections
This commit is contained in:
@@ -2,9 +2,11 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/zitadel/logging"
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
|
||||||
@@ -63,9 +65,17 @@ func Start(ctx context.Context) {
|
|||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for i, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
|
||||||
_, err := projection.Trigger(ctx)
|
for {
|
||||||
if err != nil {
|
_, err := projection.Trigger(ctx)
|
||||||
return err
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var pgErr *pgconn.PgError
|
||||||
|
errors.As(err, &pgErr)
|
||||||
|
if pgErr.Code != database.PgUniqueConstraintErrorCode {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("admin projection failed because of unique constraint, retrying")
|
||||||
}
|
}
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
|
||||||
}
|
}
|
||||||
|
@@ -2,9 +2,11 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/zitadel/logging"
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/api/authz"
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
@@ -78,9 +80,17 @@ func Projections() []*handler2.Handler {
|
|||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for i, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
|
||||||
_, err := projection.Trigger(ctx)
|
for {
|
||||||
if err != nil {
|
_, err := projection.Trigger(ctx)
|
||||||
return err
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var pgErr *pgconn.PgError
|
||||||
|
errors.As(err, &pgErr)
|
||||||
|
if pgErr.Code != database.PgUniqueConstraintErrorCode {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("auth projection failed because of unique constraint, retrying")
|
||||||
}
|
}
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
|
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
|
||||||
}
|
}
|
||||||
|
@@ -64,6 +64,10 @@ func CloseTransaction(tx Tx, err error) error {
|
|||||||
return commitErr
|
return commitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
PgUniqueConstraintErrorCode = "23505"
|
||||||
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Dialects map[string]interface{} `mapstructure:",remain"`
|
Dialects map[string]interface{} `mapstructure:",remain"`
|
||||||
connector dialect.Connector
|
connector dialect.Connector
|
||||||
|
@@ -185,7 +185,7 @@ func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState
|
|||||||
fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events))
|
fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events))
|
||||||
highestPosition := events[len(events)-1].Position()
|
highestPosition := events[len(events)-1].Position()
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
if event.Position() == highestPosition {
|
if event.Position().Equal(highestPosition) {
|
||||||
offset++
|
offset++
|
||||||
}
|
}
|
||||||
fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent)
|
fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent)
|
||||||
@@ -202,7 +202,7 @@ func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state)
|
|||||||
position = event.Position()
|
position = event.Position()
|
||||||
}
|
}
|
||||||
offset++
|
offset++
|
||||||
if event.Position() == currentState.position &&
|
if event.Position().Equal(currentState.position) &&
|
||||||
event.Aggregate().ID == currentState.aggregateID &&
|
event.Aggregate().ID == currentState.aggregateID &&
|
||||||
event.Aggregate().Type == currentState.aggregateType &&
|
event.Aggregate().Type == currentState.aggregateType &&
|
||||||
event.Sequence() == currentState.sequence {
|
event.Sequence() == currentState.sequence {
|
||||||
|
@@ -567,7 +567,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
currentState.sequence = statements[lastProcessedIndex].Sequence
|
currentState.sequence = statements[lastProcessedIndex].Sequence
|
||||||
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
|
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
|
||||||
|
|
||||||
err = h.setState(tx, currentState)
|
setStateErr := h.setState(tx, currentState)
|
||||||
|
if setStateErr != nil {
|
||||||
|
err = setStateErr
|
||||||
|
}
|
||||||
|
|
||||||
return additionalIteration, err
|
return additionalIteration, err
|
||||||
}
|
}
|
||||||
|
@@ -53,7 +53,7 @@ func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, curr
|
|||||||
return statements, err
|
return statements, err
|
||||||
}
|
}
|
||||||
offset++
|
offset++
|
||||||
if previousPosition != event.Position() {
|
if !previousPosition.Equal(event.Position()) {
|
||||||
// offset is 1 because we want to skip this event
|
// offset is 1 because we want to skip this event
|
||||||
offset = 1
|
offset = 1
|
||||||
}
|
}
|
||||||
|
@@ -2,8 +2,10 @@ package projection
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/zitadel/logging"
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
|
||||||
@@ -209,10 +211,19 @@ func Start(ctx context.Context) {
|
|||||||
func ProjectInstance(ctx context.Context) error {
|
func ProjectInstance(ctx context.Context) error {
|
||||||
for i, projection := range projections {
|
for i, projection := range projections {
|
||||||
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
|
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting projection")
|
||||||
_, err := projection.Trigger(ctx)
|
for {
|
||||||
if err != nil {
|
_, err := projection.Trigger(ctx)
|
||||||
return err
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var pgErr *pgconn.PgError
|
||||||
|
errors.As(err, &pgErr)
|
||||||
|
if pgErr.Code != database.PgUniqueConstraintErrorCode {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("projection failed because of unique constraint, retrying")
|
||||||
}
|
}
|
||||||
|
logging.WithFields("name", projection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -220,11 +231,19 @@ func ProjectInstance(ctx context.Context) error {
|
|||||||
func ProjectInstanceFields(ctx context.Context) error {
|
func ProjectInstanceFields(ctx context.Context) error {
|
||||||
for i, fieldProjection := range fields {
|
for i, fieldProjection := range fields {
|
||||||
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
|
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
|
||||||
err := fieldProjection.Trigger(ctx)
|
for {
|
||||||
if err != nil {
|
err := fieldProjection.Trigger(ctx)
|
||||||
return err
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var pgErr *pgconn.PgError
|
||||||
|
errors.As(err, &pgErr)
|
||||||
|
if pgErr.Code != database.PgUniqueConstraintErrorCode {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying")
|
||||||
}
|
}
|
||||||
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).Info("fields projection done")
|
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user