fix: pubsub (#1122)

* start sub

* start implement subsciptions

* start subscription

* implementation for member done

* admin done

* fix: tests

* extend handlers

* prepary notification

* no errors in adminapi

* changed current sequence in all packages

* ignore mocks

* works

* subscriptions as singleton

* tests

* refactor: rename function scope var

* fix: process ALL previous sequences

* fix: spooler and pubsub

* handler check

* fix: process events until all done

* fix break on query err

* fix: handler

* fix: process sequence or return error

* check aggregate id

* fix: log only in error case

* fix tests

* fix: handlers

* fix: spooler

* fix: spooler

* fix: tests

* fix: continue

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Silvan
2020-12-21 18:42:34 +01:00
committed by GitHub
parent dd5e4acd24
commit 3118a99c1e
49 changed files with 256 additions and 193 deletions

View File

@@ -15,6 +15,8 @@ type ObjectRoot struct {
func (o *ObjectRoot) AppendEvent(event *Event) {
if o.AggregateID == "" {
o.AggregateID = event.AggregateID
} else if o.AggregateID != event.AggregateID {
return
}
if o.ResourceOwner == "" {
o.ResourceOwner = event.ResourceOwner

View File

@@ -70,7 +70,7 @@ func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery {
func (q *SearchQuery) setFilter(filter *Filter) *SearchQuery {
for i, f := range q.Filters {
if f.field == filter.field {
if f.field == filter.field && f.field != Field_LatestSequence {
q.Filters[i] = filter
return q
}

View File

@@ -9,6 +9,10 @@ import (
"github.com/caos/zitadel/internal/eventstore/models"
)
const (
eventLimit = 10000
)
type Handler interface {
ViewModel() string
EventQuery() (*models.SearchQuery, error)
@@ -29,28 +33,47 @@ func ReduceEvent(handler Handler, event *models.Event) {
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
return
}
if event.PreviousSequence > currentSequence {
searchQuery := models.NewSearchQuery().
AggregateTypeFilter(handler.AggregateTypes()...).
SequenceBetween(currentSequence, event.PreviousSequence)
events, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery)
searchQuery := models.NewSearchQuery().
AggregateTypeFilter(handler.AggregateTypes()...).
SequenceBetween(currentSequence, event.Sequence).
SetLimit(eventLimit)
unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery)
if err != nil {
logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed")
return
}
processedSequences := map[models.AggregateType]uint64{}
for _, unprocessedEvent := range unprocessedEvents {
currentSequence, err := handler.CurrentSequence(unprocessedEvent)
if err != nil {
logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed")
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
return
}
for _, previousEvent := range events {
//if other process already updated view
//TODO: correct?
if event.PreviousSequence > previousEvent.Sequence {
continue
_, ok := processedSequences[unprocessedEvent.AggregateType]
if !ok {
processedSequences[unprocessedEvent.AggregateType] = currentSequence
}
if processedSequences[unprocessedEvent.AggregateType] != currentSequence {
if currentSequence < processedSequences[unprocessedEvent.AggregateType] {
logging.LogWithFields("QUERY-DOYVN",
"processed", processedSequences[unprocessedEvent.AggregateType],
"current", currentSequence,
"view", handler.ViewModel()).
Warn("sequence not matching")
}
err = handler.Reduce(previousEvent)
logging.LogWithFields("HANDL-V42TI", "seq", previousEvent.Sequence).OnError(err).Warn("reduce failed")
return
}
} else if event.PreviousSequence > 0 && event.PreviousSequence < currentSequence {
logging.LogWithFields("HANDL-w9Bdy", "previousSeq", event.PreviousSequence, "currentSeq", currentSequence).Debug("already processed")
err = handler.Reduce(unprocessedEvent)
logging.LogWithFields("HANDL-V42TI", "seq", unprocessedEvent.Sequence).OnError(err).Warn("reduce failed")
processedSequences[unprocessedEvent.AggregateType] = unprocessedEvent.Sequence
}
if len(unprocessedEvents) == eventLimit {
logging.LogWithFields("QUERY-BSqe9", "seq", event.Sequence).Warn("didnt process event")
return
}
err = handler.Reduce(event)

View File

@@ -4,6 +4,7 @@ import (
"context"
"strconv"
"sync"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/eventstore"
@@ -11,8 +12,6 @@ import (
"github.com/caos/zitadel/internal/eventstore/query"
"github.com/caos/zitadel/internal/telemetry/tracing"
"github.com/caos/zitadel/internal/view/repository"
"time"
)
type Spooler struct {
@@ -71,14 +70,26 @@ func (s *spooledHandler) load(workerID string) {
hasLocked := s.lock(ctx, errs, workerID)
if <-hasLocked {
events, err := s.query(ctx)
if err != nil {
errs <- err
} else {
errs <- s.process(ctx, events, workerID)
logging.Log("SPOOL-0pV8o").WithField("view", s.ViewModel()).WithField("worker", workerID).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("process done")
for {
events, err := s.query(ctx)
if err != nil {
errs <- err
break
}
err = s.process(ctx, events, workerID)
if err != nil {
errs <- err
break
}
if uint64(len(events)) < s.QueryLimit() {
// no more events to process
// stop chan
if ctx.Err() == nil {
errs <- nil
}
break
}
}
}
<-ctx.Done()
}
@@ -92,14 +103,19 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str
}
func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error {
for _, event := range events {
for i, event := range events {
select {
case <-ctx.Done():
logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled")
return nil
default:
if err := s.Reduce(event); err != nil {
return s.OnError(event, err)
err = s.OnError(event, err)
if err == nil {
continue
}
time.Sleep(100 * time.Millisecond)
return s.process(ctx, events[i:], workerID)
}
}
}
@@ -167,7 +183,8 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s
func HandleError(event *models.Event, failedErr error,
latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error),
processFailedEvent func(*repository.FailedEvent) error,
processSequence func(*models.Event) error, errorCountUntilSkip uint64) error {
processSequence func(*models.Event) error,
errorCountUntilSkip uint64) error {
failedEvent, err := latestFailedEvent(event.Sequence)
if err != nil {
return err
@@ -181,7 +198,7 @@ func HandleError(event *models.Event, failedErr error,
if errorCountUntilSkip <= failedEvent.FailureCount {
return processSequence(event)
}
return nil
return failedErr
}
func HandleSuccess(updateSpoolerRunTimestamp func() error) error {

View File

@@ -22,6 +22,7 @@ type testHandler struct {
queryError error
viewModel string
bulkLimit uint64
maxErrCount int
}
func (h *testHandler) AggregateTypes() []models.AggregateType {
@@ -50,6 +51,10 @@ func (h *testHandler) Reduce(*models.Event) error {
return h.processError
}
func (h *testHandler) OnError(event *models.Event, err error) error {
if h.maxErrCount == 2 {
return nil
}
h.maxErrCount++
return err
}
func (h *testHandler) OnSuccess() error {
@@ -93,17 +98,18 @@ func (es *eventstoreStub) LatestSequence(ctx context.Context, in *models.SearchQ
func TestSpooler_process(t *testing.T) {
type fields struct {
currentHandler query.Handler
currentHandler *testHandler
}
type args struct {
timeout time.Duration
events []*models.Event
}
tests := []struct {
name string
fields fields
args args
wantErr bool
name string
fields fields
args args
wantErr bool
wantRetries int
}{
{
name: "process all events",
@@ -135,7 +141,8 @@ func TestSpooler_process(t *testing.T) {
args: args{
events: []*models.Event{{}, {}},
},
wantErr: true,
wantErr: false,
wantRetries: 2,
},
}
for _, tt := range tests {
@@ -154,6 +161,9 @@ func TestSpooler_process(t *testing.T) {
if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr {
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.fields.currentHandler.maxErrCount != tt.wantRetries {
t.Errorf("Spooler.process() wrong retry count got: %d want %d", tt.fields.currentHandler.maxErrCount, tt.wantRetries)
}
elapsed := time.Since(start).Round(1 * time.Second)
if tt.args.timeout != 0 && elapsed != tt.args.timeout {
@@ -222,14 +232,14 @@ func TestSpooler_load(t *testing.T) {
{
"lock exists",
fields{
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond),
},
},
{
"lock fails",
fields{
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond),
eventstore: &eventstoreStub{events: []*models.Event{{}}},
},
@@ -237,7 +247,7 @@ func TestSpooler_load(t *testing.T) {
{
"query fails",
fields{
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second},
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
},
@@ -245,8 +255,8 @@ func TestSpooler_load(t *testing.T) {
{
"process event fails",
fields{
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond),
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond, bulkLimit: 10},
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond).expectRenew(t, nil, 1000*time.Millisecond),
eventstore: &eventstoreStub{events: []*models.Event{{}}},
},
},
@@ -433,6 +443,7 @@ func TestHandleError(t *testing.T) {
},
res: res{
shouldProcessSequence: false,
wantErr: true,
},
},
}