mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:37:32 +00:00
fix(eventstore): improve pagination of handler filter (#6968)
* fix(setup): add filter_offset to `projections.current_states` * fix(eventstore): allow offset in query * fix(handler): offset for already processed events
This commit is contained in:
@@ -47,7 +47,7 @@ func failureFromStatement(statement *Statement, err error) *failure {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handleFailedStmt(tx *sql.Tx, currentState *state, f *failure) (shouldContinue bool) {
|
||||
func (h *Handler) handleFailedStmt(tx *sql.Tx, f *failure) (shouldContinue bool) {
|
||||
failureCount, err := h.failureCount(tx, f)
|
||||
if err != nil {
|
||||
h.logFailure(f).WithError(err).Warn("unable to get failure count")
|
||||
|
@@ -331,7 +331,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
|
||||
var statements []*Statement
|
||||
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
|
||||
if err != nil || len(statements) == 0 {
|
||||
if err != nil {
|
||||
return additionalIteration, err
|
||||
}
|
||||
if len(statements) == 0 {
|
||||
err = h.setState(tx, currentState)
|
||||
return additionalIteration, err
|
||||
}
|
||||
|
||||
@@ -341,6 +345,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
||||
}
|
||||
|
||||
currentState.position = statements[lastProcessedIndex].Position
|
||||
currentState.offset = statements[lastProcessedIndex].offset
|
||||
currentState.aggregateID = statements[lastProcessedIndex].AggregateID
|
||||
currentState.aggregateType = statements[lastProcessedIndex].AggregateType
|
||||
currentState.sequence = statements[lastProcessedIndex].Sequence
|
||||
@@ -365,37 +370,44 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
|
||||
return nil, false, err
|
||||
}
|
||||
eventAmount := len(events)
|
||||
events = skipPreviouslyReduced(events, currentState)
|
||||
|
||||
if len(events) == 0 {
|
||||
h.updateLastUpdated(ctx, tx, currentState)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
statements, err := h.eventsToStatements(tx, events, currentState)
|
||||
if len(statements) == 0 {
|
||||
if err != nil || len(statements) == 0 {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
idx := skipPreviouslyReduced(statements, currentState)
|
||||
if idx+1 == len(statements) {
|
||||
currentState.position = statements[len(statements)-1].Position
|
||||
currentState.offset = statements[len(statements)-1].offset
|
||||
currentState.aggregateID = statements[len(statements)-1].AggregateID
|
||||
currentState.aggregateType = statements[len(statements)-1].AggregateType
|
||||
currentState.sequence = statements[len(statements)-1].Sequence
|
||||
currentState.eventTimestamp = statements[len(statements)-1].CreationDate
|
||||
|
||||
return nil, false, nil
|
||||
}
|
||||
statements = statements[idx+1:]
|
||||
|
||||
additionalIteration = eventAmount == int(h.bulkLimit)
|
||||
if len(statements) < len(events) {
|
||||
// retry imediatly if statements failed
|
||||
// retry immediately if statements failed
|
||||
additionalIteration = true
|
||||
}
|
||||
|
||||
return statements, additionalIteration, nil
|
||||
}
|
||||
|
||||
func skipPreviouslyReduced(events []eventstore.Event, currentState *state) []eventstore.Event {
|
||||
for i, event := range events {
|
||||
if event.Position() == currentState.position &&
|
||||
event.Aggregate().ID == currentState.aggregateID &&
|
||||
event.Aggregate().Type == currentState.aggregateType &&
|
||||
event.Sequence() == currentState.sequence {
|
||||
return events[i+1:]
|
||||
func skipPreviouslyReduced(statements []*Statement, currentState *state) int {
|
||||
for i, statement := range statements {
|
||||
if statement.Position == currentState.position &&
|
||||
statement.AggregateID == currentState.aggregateID &&
|
||||
statement.AggregateType == currentState.aggregateType &&
|
||||
statement.Sequence == currentState.sequence {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return events
|
||||
return -1
|
||||
}
|
||||
|
||||
func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentState *state, statements []*Statement) (lastProcessedIndex int, err error) {
|
||||
@@ -434,7 +446,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, currentState
|
||||
if err = statement.Execute(tx, h.projection.Name()); err != nil {
|
||||
h.log().WithError(err).Error("statement execution failed")
|
||||
|
||||
shouldContinue = h.handleFailedStmt(tx, currentState, failureFromStatement(statement, err))
|
||||
shouldContinue = h.handleFailedStmt(tx, failureFromStatement(statement, err))
|
||||
if shouldContinue {
|
||||
return nil
|
||||
}
|
||||
@@ -454,7 +466,11 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
||||
InstanceID(currentState.instanceID)
|
||||
|
||||
if currentState.position > 0 {
|
||||
// decrease position by 10 because builder.PositionAfter filters for position > and we need position >=
|
||||
builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10))
|
||||
if currentState.offset > 0 {
|
||||
builder = builder.Offset(currentState.offset)
|
||||
}
|
||||
}
|
||||
|
||||
for aggregateType, eventTypes := range h.eventTypes {
|
||||
@@ -469,7 +485,7 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
||||
return builder
|
||||
}
|
||||
|
||||
// ProjectionName returns the name of the unlying projection.
|
||||
// ProjectionName returns the name of the underlying projection.
|
||||
func (h *Handler) ProjectionName() string {
|
||||
return h.projection.Name()
|
||||
}
|
||||
|
@@ -19,6 +19,7 @@ type state struct {
|
||||
aggregateType eventstore.AggregateType
|
||||
aggregateID string
|
||||
sequence uint64
|
||||
offset uint16
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -30,8 +31,6 @@ var (
|
||||
updateStateStmt string
|
||||
//go:embed state_lock.sql
|
||||
lockStateStmt string
|
||||
//go:embed state_set_last_run.sql
|
||||
updateStateLastRunStmt string
|
||||
|
||||
errJustUpdated = errors.New("projection was just updated")
|
||||
)
|
||||
@@ -47,6 +46,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
||||
sequence = new(sql.NullInt64)
|
||||
timestamp = new(sql.NullTime)
|
||||
position = new(sql.NullFloat64)
|
||||
offset = new(sql.NullInt16)
|
||||
)
|
||||
|
||||
stateQuery := currentStateStmt
|
||||
@@ -61,6 +61,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
||||
sequence,
|
||||
timestamp,
|
||||
position,
|
||||
offset,
|
||||
)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
err = h.lockState(tx, currentState.instanceID)
|
||||
@@ -75,6 +76,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
||||
currentState.sequence = uint64(sequence.Int64)
|
||||
currentState.eventTimestamp = timestamp.Time
|
||||
currentState.position = position.Float64
|
||||
currentState.offset = uint16(offset.Int16)
|
||||
return currentState, nil
|
||||
}
|
||||
|
||||
@@ -87,6 +89,7 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error {
|
||||
updatedState.sequence,
|
||||
updatedState.eventTimestamp,
|
||||
updatedState.position,
|
||||
updatedState.offset,
|
||||
)
|
||||
if err != nil {
|
||||
h.log().WithError(err).Debug("unable to update state")
|
||||
@@ -99,11 +102,6 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) updateLastUpdated(ctx context.Context, tx *sql.Tx, updatedState *state) {
|
||||
_, err := tx.ExecContext(ctx, updateStateLastRunStmt, h.projection.Name(), updatedState.instanceID)
|
||||
h.log().OnError(err).Debug("unable to update last updated")
|
||||
}
|
||||
|
||||
func (h *Handler) lockState(tx *sql.Tx, instanceID string) error {
|
||||
res, err := tx.Exec(lockStateStmt,
|
||||
h.projection.Name(),
|
||||
|
@@ -4,6 +4,7 @@ SELECT
|
||||
, "sequence"
|
||||
, event_date
|
||||
, "position"
|
||||
, filter_offset
|
||||
FROM
|
||||
projections.current_states
|
||||
WHERE
|
||||
|
@@ -4,6 +4,7 @@ SELECT
|
||||
, "sequence"
|
||||
, event_date
|
||||
, "position"
|
||||
, filter_offset
|
||||
FROM
|
||||
projections.current_states
|
||||
WHERE
|
||||
|
@@ -7,6 +7,7 @@ INSERT INTO projections.current_states (
|
||||
, event_date
|
||||
, "position"
|
||||
, last_updated
|
||||
, filter_offset
|
||||
) VALUES (
|
||||
$1
|
||||
, $2
|
||||
@@ -16,6 +17,7 @@ INSERT INTO projections.current_states (
|
||||
, $6
|
||||
, $7
|
||||
, now()
|
||||
, $8
|
||||
) ON CONFLICT (
|
||||
projection_name
|
||||
, instance_id
|
||||
@@ -26,4 +28,5 @@ INSERT INTO projections.current_states (
|
||||
, event_date = $6
|
||||
, "position" = $7
|
||||
, last_updated = statement_timestamp()
|
||||
, filter_offset = $8
|
||||
;
|
@@ -1,2 +0,0 @@
|
||||
UPDATE projections.current_states SET last_updated = now() WHERE projection_name = $1 AND instance_id = $2;
|
||||
|
@@ -217,6 +217,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
|
||||
uint64(42),
|
||||
mock.AnyType[time.Time]{},
|
||||
float64(42),
|
||||
uint16(0),
|
||||
),
|
||||
mock.WithExecRowsAffected(1),
|
||||
),
|
||||
@@ -388,7 +389,7 @@ func TestHandler_currentState(t *testing.T) {
|
||||
"projection",
|
||||
),
|
||||
mock.WithQueryResult(
|
||||
[]string{"aggregate_id", "aggregate_type", "event_sequence", "event_date", "position"},
|
||||
[]string{"aggregate_id", "aggregate_type", "event_sequence", "event_date", "position", "offset"},
|
||||
[][]driver.Value{
|
||||
{
|
||||
"aggregate id",
|
||||
@@ -396,6 +397,7 @@ func TestHandler_currentState(t *testing.T) {
|
||||
int64(42),
|
||||
testTime,
|
||||
float64(42),
|
||||
uint16(10),
|
||||
},
|
||||
},
|
||||
),
|
||||
@@ -413,6 +415,7 @@ func TestHandler_currentState(t *testing.T) {
|
||||
aggregateType: "aggregate type",
|
||||
aggregateID: "aggregate id",
|
||||
sequence: 42,
|
||||
offset: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@@ -17,15 +17,26 @@ import (
|
||||
|
||||
func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, currentState *state) (statements []*Statement, err error) {
|
||||
statements = make([]*Statement, 0, len(events))
|
||||
|
||||
previousPosition := currentState.position
|
||||
offset := currentState.offset
|
||||
for _, event := range events {
|
||||
statement, err := h.reduce(event)
|
||||
if err != nil {
|
||||
h.logEvent(event).WithError(err).Error("reduce failed")
|
||||
if shouldContinue := h.handleFailedStmt(tx, currentState, failureFromEvent(event, err)); shouldContinue {
|
||||
if shouldContinue := h.handleFailedStmt(tx, failureFromEvent(event, err)); shouldContinue {
|
||||
continue
|
||||
}
|
||||
return statements, err
|
||||
}
|
||||
offset++
|
||||
if previousPosition != event.Position() {
|
||||
// offset is 1 because we want to skip this event
|
||||
offset = 1
|
||||
}
|
||||
statement.offset = offset
|
||||
statement.Position = event.Position()
|
||||
previousPosition = event.Position()
|
||||
statements = append(statements, statement)
|
||||
}
|
||||
return statements, nil
|
||||
@@ -54,6 +65,8 @@ type Statement struct {
|
||||
CreationDate time.Time
|
||||
InstanceID string
|
||||
|
||||
offset uint16
|
||||
|
||||
Execute Exec
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user