fix: handle sequence correctly in subscription (#1209)

This commit is contained in:
Livio Amstutz
2021-01-27 10:35:01 +01:00
committed by GitHub
parent 6f6d59f380
commit 1d472bac51
129 changed files with 349 additions and 371 deletions

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
)
@@ -24,12 +25,12 @@ type Handler interface {
QueryLimit() uint64
AggregateTypes() []models.AggregateType
CurrentSequence(*models.Event) (uint64, error)
CurrentSequence() (uint64, error)
Eventstore() eventstore.Eventstore
}
func ReduceEvent(handler Handler, event *models.Event) {
currentSequence, err := handler.CurrentSequence(event)
currentSequence, err := handler.CurrentSequence()
if err != nil {
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
return
@@ -46,32 +47,23 @@ func ReduceEvent(handler Handler, event *models.Event) {
return
}
processedSequences := map[models.AggregateType]uint64{}
for _, unprocessedEvent := range unprocessedEvents {
currentSequence, err := handler.CurrentSequence(unprocessedEvent)
currentSequence, err := handler.CurrentSequence()
if err != nil {
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
return
}
_, ok := processedSequences[unprocessedEvent.AggregateType]
if !ok {
processedSequences[unprocessedEvent.AggregateType] = currentSequence
}
if processedSequences[unprocessedEvent.AggregateType] != currentSequence {
if currentSequence < processedSequences[unprocessedEvent.AggregateType] {
logging.LogWithFields("QUERY-DOYVN",
"processed", processedSequences[unprocessedEvent.AggregateType],
"current", currentSequence,
"view", handler.ViewModel()).
Warn("sequence not matching")
}
if unprocessedEvent.Sequence < currentSequence {
logging.LogWithFields("QUERY-DOYVN",
"unprocessed", unprocessedEvent.Sequence,
"current", currentSequence,
"view", handler.ViewModel()).
Warn("sequence not matching")
return
}
err = handler.Reduce(unprocessedEvent)
logging.LogWithFields("HANDL-V42TI", "seq", unprocessedEvent.Sequence).OnError(err).Warn("reduce failed")
processedSequences[unprocessedEvent.AggregateType] = unprocessedEvent.Sequence
}
if len(unprocessedEvents) == eventLimit {
logging.LogWithFields("QUERY-BSqe9", "seq", event.Sequence).Warn("didnt process event")