mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 01:37:31 +00:00
Merge branch 'master' into new-eventstore
# Conflicts: # go.mod # internal/admin/repository/eventsourcing/eventstore/iam.go # internal/authz/repository/eventsourcing/repository.go # internal/eventstore/eventstore.go # internal/setup/config.go # pkg/grpc/management/mock/management.proto.mock.go
This commit is contained in:
@@ -16,6 +16,7 @@ type Eventstore interface {
|
||||
FilterEvents(ctx context.Context, searchQuery *models.SearchQuery) (events []*models.Event, err error)
|
||||
LatestSequence(ctx context.Context, searchQuery *models.SearchQueryFactory) (uint64, error)
|
||||
V2() *es_v2.Eventstore
|
||||
Subscribe(aggregates ...models.AggregateType) *Subscription
|
||||
}
|
||||
|
||||
var _ Eventstore = (*eventstore)(nil)
|
||||
@@ -46,6 +47,8 @@ func (es *eventstore) PushAggregates(ctx context.Context, aggregates ...*models.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go notify(aggregates)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -6,6 +6,7 @@ package mock
|
||||
|
||||
import (
|
||||
context "context"
|
||||
eventstore "github.com/caos/zitadel/internal/eventstore"
|
||||
models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
reflect "reflect"
|
||||
@@ -110,3 +111,21 @@ func (mr *MockEventstoreMockRecorder) PushAggregates(arg0 interface{}, arg1 ...i
|
||||
varargs := append([]interface{}{arg0}, arg1...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushAggregates", reflect.TypeOf((*MockEventstore)(nil).PushAggregates), varargs...)
|
||||
}
|
||||
|
||||
// Subscribe mocks base method
|
||||
func (m *MockEventstore) Subscribe(arg0 ...models.AggregateType) *eventstore.Subscription {
|
||||
m.ctrl.T.Helper()
|
||||
varargs := []interface{}{}
|
||||
for _, a := range arg0 {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "Subscribe", varargs...)
|
||||
ret0, _ := ret[0].(*eventstore.Subscription)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Subscribe indicates an expected call of Subscribe
|
||||
func (mr *MockEventstoreMockRecorder) Subscribe(arg0 ...interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventstore)(nil).Subscribe), arg0...)
|
||||
}
|
||||
|
@@ -27,6 +27,10 @@ type Aggregate struct {
|
||||
Precondition *precondition
|
||||
}
|
||||
|
||||
func (a *Aggregate) Type() AggregateType {
|
||||
return a.typ
|
||||
}
|
||||
|
||||
type precondition struct {
|
||||
Query *SearchQuery
|
||||
Validation func(...*Event) error
|
||||
|
@@ -15,6 +15,8 @@ type ObjectRoot struct {
|
||||
func (o *ObjectRoot) AppendEvent(event *Event) {
|
||||
if o.AggregateID == "" {
|
||||
o.AggregateID = event.AggregateID
|
||||
} else if o.AggregateID != event.AggregateID {
|
||||
return
|
||||
}
|
||||
if o.ResourceOwner == "" {
|
||||
o.ResourceOwner = event.ResourceOwner
|
||||
|
@@ -11,7 +11,8 @@ type SearchQueryFactory struct {
|
||||
desc bool
|
||||
aggregateTypes []AggregateType
|
||||
aggregateIDs []string
|
||||
eventSequence uint64
|
||||
sequenceFrom uint64
|
||||
sequenceTo uint64
|
||||
eventTypes []EventType
|
||||
resourceOwner string
|
||||
}
|
||||
@@ -51,7 +52,11 @@ func FactoryFromSearchQuery(query *SearchQuery) *SearchQueryFactory {
|
||||
factory = factory.AggregateIDs(aggregateIDs...)
|
||||
}
|
||||
case Field_LatestSequence:
|
||||
factory = factory.SequenceGreater(filter.value.(uint64))
|
||||
if filter.operation == Operation_Greater {
|
||||
factory = factory.SequenceGreater(filter.value.(uint64))
|
||||
} else {
|
||||
factory = factory.SequenceLess(filter.value.(uint64))
|
||||
}
|
||||
case Field_ResourceOwner:
|
||||
factory = factory.ResourceOwner(filter.value.(string))
|
||||
case Field_EventType:
|
||||
@@ -82,7 +87,12 @@ func (factory *SearchQueryFactory) Limit(limit uint64) *SearchQueryFactory {
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) SequenceGreater(sequence uint64) *SearchQueryFactory {
|
||||
factory.eventSequence = sequence
|
||||
factory.sequenceFrom = sequence
|
||||
return factory
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) SequenceLess(sequence uint64) *SearchQueryFactory {
|
||||
factory.sequenceTo = sequence
|
||||
return factory
|
||||
}
|
||||
|
||||
@@ -128,7 +138,8 @@ func (factory *SearchQueryFactory) Build() (*searchQuery, error) {
|
||||
|
||||
for _, f := range []func() *Filter{
|
||||
factory.aggregateIDFilter,
|
||||
factory.eventSequenceFilter,
|
||||
factory.sequenceFromFilter,
|
||||
factory.sequenceToFilter,
|
||||
factory.eventTypeFilter,
|
||||
factory.resourceOwnerFilter,
|
||||
} {
|
||||
@@ -172,15 +183,26 @@ func (factory *SearchQueryFactory) aggregateTypeFilter() *Filter {
|
||||
return NewFilter(Field_AggregateType, factory.aggregateTypes, Operation_In)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) eventSequenceFilter() *Filter {
|
||||
if factory.eventSequence == 0 {
|
||||
func (factory *SearchQueryFactory) sequenceFromFilter() *Filter {
|
||||
if factory.sequenceFrom == 0 {
|
||||
return nil
|
||||
}
|
||||
sortOrder := Operation_Greater
|
||||
if factory.desc {
|
||||
sortOrder = Operation_Less
|
||||
}
|
||||
return NewFilter(Field_LatestSequence, factory.eventSequence, sortOrder)
|
||||
return NewFilter(Field_LatestSequence, factory.sequenceFrom, sortOrder)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) sequenceToFilter() *Filter {
|
||||
if factory.sequenceTo == 0 {
|
||||
return nil
|
||||
}
|
||||
sortOrder := Operation_Less
|
||||
if factory.desc {
|
||||
sortOrder = Operation_Greater
|
||||
}
|
||||
return NewFilter(Field_LatestSequence, factory.sequenceTo, sortOrder)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) resourceOwnerFilter() *Filter {
|
||||
|
@@ -58,13 +58,19 @@ func (q *SearchQuery) LatestSequenceFilter(sequence uint64) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_LatestSequence, sequence, sortOrder))
|
||||
}
|
||||
|
||||
func (q *SearchQuery) SequenceBetween(from, to uint64) *SearchQuery {
|
||||
q.setFilter(NewFilter(Field_LatestSequence, from, Operation_Greater))
|
||||
q.setFilter(NewFilter(Field_LatestSequence, to, Operation_Less))
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_ResourceOwner, resourceOwner, Operation_Equals))
|
||||
}
|
||||
|
||||
func (q *SearchQuery) setFilter(filter *Filter) *SearchQuery {
|
||||
for i, f := range q.Filters {
|
||||
if f.field == filter.field {
|
||||
if f.field == filter.field && f.field != Field_LatestSequence {
|
||||
q.Filters[i] = filter
|
||||
return q
|
||||
}
|
||||
|
@@ -103,7 +103,7 @@ func TestSearchQueryFactorySetters(t *testing.T) {
|
||||
setters: []func(*SearchQueryFactory) *SearchQueryFactory{testSetSequence(90)},
|
||||
},
|
||||
res: &SearchQueryFactory{
|
||||
eventSequence: 90,
|
||||
sequenceFrom: 90,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@@ -1,11 +1,18 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
)
|
||||
|
||||
const (
|
||||
eventLimit = 10000
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
ViewModel() string
|
||||
EventQuery() (*models.SearchQuery, error)
|
||||
@@ -13,5 +20,63 @@ type Handler interface {
|
||||
OnError(event *models.Event, err error) error
|
||||
OnSuccess() error
|
||||
MinimumCycleDuration() time.Duration
|
||||
LockDuration() time.Duration
|
||||
QueryLimit() uint64
|
||||
|
||||
AggregateTypes() []models.AggregateType
|
||||
CurrentSequence(*models.Event) (uint64, error)
|
||||
Eventstore() eventstore.Eventstore
|
||||
}
|
||||
|
||||
func ReduceEvent(handler Handler, event *models.Event) {
|
||||
currentSequence, err := handler.CurrentSequence(event)
|
||||
if err != nil {
|
||||
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
|
||||
return
|
||||
}
|
||||
|
||||
searchQuery := models.NewSearchQuery().
|
||||
AggregateTypeFilter(handler.AggregateTypes()...).
|
||||
SequenceBetween(currentSequence, event.Sequence).
|
||||
SetLimit(eventLimit)
|
||||
|
||||
unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed")
|
||||
return
|
||||
}
|
||||
|
||||
processedSequences := map[models.AggregateType]uint64{}
|
||||
|
||||
for _, unprocessedEvent := range unprocessedEvents {
|
||||
currentSequence, err := handler.CurrentSequence(unprocessedEvent)
|
||||
if err != nil {
|
||||
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
|
||||
return
|
||||
}
|
||||
_, ok := processedSequences[unprocessedEvent.AggregateType]
|
||||
if !ok {
|
||||
processedSequences[unprocessedEvent.AggregateType] = currentSequence
|
||||
}
|
||||
if processedSequences[unprocessedEvent.AggregateType] != currentSequence {
|
||||
if currentSequence < processedSequences[unprocessedEvent.AggregateType] {
|
||||
logging.LogWithFields("QUERY-DOYVN",
|
||||
"processed", processedSequences[unprocessedEvent.AggregateType],
|
||||
"current", currentSequence,
|
||||
"view", handler.ViewModel()).
|
||||
Warn("sequence not matching")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = handler.Reduce(unprocessedEvent)
|
||||
logging.LogWithFields("HANDL-V42TI", "seq", unprocessedEvent.Sequence).OnError(err).Warn("reduce failed")
|
||||
processedSequences[unprocessedEvent.AggregateType] = unprocessedEvent.Sequence
|
||||
}
|
||||
if len(unprocessedEvents) == eventLimit {
|
||||
logging.LogWithFields("QUERY-BSqe9", "seq", event.Sequence).Warn("didnt process event")
|
||||
return
|
||||
}
|
||||
err = handler.Reduce(event)
|
||||
logging.LogWithFields("HANDL-wQDL2", "seq", event.Sequence).OnError(err).Warn("reduce failed")
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
|
||||
"github.com/caos/logging"
|
||||
@@ -23,6 +24,11 @@ func (c *Config) New() *Spooler {
|
||||
logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID")
|
||||
}
|
||||
|
||||
//shuffle the handlers for better balance when running multiple pods
|
||||
rand.Shuffle(len(c.ViewHandlers), func(i, j int) {
|
||||
c.ViewHandlers[i], c.ViewHandlers[j] = c.ViewHandlers[j], c.ViewHandlers[i]
|
||||
})
|
||||
|
||||
return &Spooler{
|
||||
handlers: c.ViewHandlers,
|
||||
lockID: lockID,
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
@@ -11,8 +12,6 @@ import (
|
||||
"github.com/caos/zitadel/internal/eventstore/query"
|
||||
"github.com/caos/zitadel/internal/telemetry/tracing"
|
||||
"github.com/caos/zitadel/internal/view/repository"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
type Spooler struct {
|
||||
@@ -71,14 +70,26 @@ func (s *spooledHandler) load(workerID string) {
|
||||
hasLocked := s.lock(ctx, errs, workerID)
|
||||
|
||||
if <-hasLocked {
|
||||
events, err := s.query(ctx)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
} else {
|
||||
errs <- s.process(ctx, events, workerID)
|
||||
logging.Log("SPOOL-0pV8o").WithField("view", s.ViewModel()).WithField("worker", workerID).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("process done")
|
||||
for {
|
||||
events, err := s.query(ctx)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
break
|
||||
}
|
||||
err = s.process(ctx, events, workerID)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
break
|
||||
}
|
||||
if uint64(len(events)) < s.QueryLimit() {
|
||||
// no more events to process
|
||||
// stop chan
|
||||
if ctx.Err() == nil {
|
||||
errs <- nil
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
<-ctx.Done()
|
||||
}
|
||||
@@ -92,14 +103,19 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str
|
||||
}
|
||||
|
||||
func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error {
|
||||
for _, event := range events {
|
||||
for i, event := range events {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled")
|
||||
return nil
|
||||
default:
|
||||
if err := s.Reduce(event); err != nil {
|
||||
return s.OnError(event, err)
|
||||
err = s.OnError(event, err)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return s.process(ctx, events[i:], workerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -144,12 +160,12 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-renewTimer:
|
||||
err := s.locker.Renew(workerID, s.ViewModel(), s.MinimumCycleDuration()*2)
|
||||
err := s.locker.Renew(workerID, s.ViewModel(), s.LockDuration())
|
||||
firstLock.Do(func() {
|
||||
locked <- err == nil
|
||||
})
|
||||
if err == nil {
|
||||
renewTimer = time.After(s.MinimumCycleDuration())
|
||||
renewTimer = time.After(s.LockDuration())
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -167,7 +183,8 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s
|
||||
func HandleError(event *models.Event, failedErr error,
|
||||
latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error),
|
||||
processFailedEvent func(*repository.FailedEvent) error,
|
||||
processSequence func(uint64, time.Time) error, errorCountUntilSkip uint64) error {
|
||||
processSequence func(*models.Event) error,
|
||||
errorCountUntilSkip uint64) error {
|
||||
failedEvent, err := latestFailedEvent(event.Sequence)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -179,9 +196,9 @@ func HandleError(event *models.Event, failedErr error,
|
||||
return err
|
||||
}
|
||||
if errorCountUntilSkip <= failedEvent.FailureCount {
|
||||
return processSequence(event.Sequence, event.CreationDate)
|
||||
return processSequence(event)
|
||||
}
|
||||
return nil
|
||||
return failedErr
|
||||
}
|
||||
|
||||
func HandleSuccess(updateSpoolerRunTimestamp func() error) error {
|
||||
|
@@ -22,30 +22,57 @@ type testHandler struct {
|
||||
queryError error
|
||||
viewModel string
|
||||
bulkLimit uint64
|
||||
maxErrCount int
|
||||
}
|
||||
|
||||
func (h *testHandler) AggregateTypes() []models.AggregateType {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *testHandler) CurrentSequence(event *models.Event) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (h *testHandler) Eventstore() eventstore.Eventstore {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *testHandler) ViewModel() string {
|
||||
return h.viewModel
|
||||
}
|
||||
|
||||
func (h *testHandler) EventQuery() (*models.SearchQuery, error) {
|
||||
if h.queryError != nil {
|
||||
return nil, h.queryError
|
||||
}
|
||||
return &models.SearchQuery{}, nil
|
||||
}
|
||||
|
||||
func (h *testHandler) Reduce(*models.Event) error {
|
||||
<-time.After(h.processSleep)
|
||||
return h.processError
|
||||
}
|
||||
|
||||
func (h *testHandler) OnError(event *models.Event, err error) error {
|
||||
if h.maxErrCount == 2 {
|
||||
return nil
|
||||
}
|
||||
h.maxErrCount++
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *testHandler) OnSuccess() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *testHandler) MinimumCycleDuration() time.Duration {
|
||||
return h.cycleDuration
|
||||
}
|
||||
|
||||
func (h *testHandler) LockDuration() time.Duration {
|
||||
return h.cycleDuration / 2
|
||||
}
|
||||
|
||||
func (h *testHandler) QueryLimit() uint64 {
|
||||
return h.bulkLimit
|
||||
}
|
||||
@@ -55,6 +82,8 @@ type eventstoreStub struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (es *eventstoreStub) Subscribe(...models.AggregateType) *eventstore.Subscription { return nil }
|
||||
|
||||
func (es *eventstoreStub) Health(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@@ -79,17 +108,18 @@ func (es *eventstoreStub) LatestSequence(ctx context.Context, in *models.SearchQ
|
||||
|
||||
func TestSpooler_process(t *testing.T) {
|
||||
type fields struct {
|
||||
currentHandler query.Handler
|
||||
currentHandler *testHandler
|
||||
}
|
||||
type args struct {
|
||||
timeout time.Duration
|
||||
events []*models.Event
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
wantRetries int
|
||||
}{
|
||||
{
|
||||
name: "process all events",
|
||||
@@ -121,7 +151,8 @@ func TestSpooler_process(t *testing.T) {
|
||||
args: args{
|
||||
events: []*models.Event{{}, {}},
|
||||
},
|
||||
wantErr: true,
|
||||
wantErr: false,
|
||||
wantRetries: 2,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@@ -140,6 +171,9 @@ func TestSpooler_process(t *testing.T) {
|
||||
if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr {
|
||||
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if tt.fields.currentHandler.maxErrCount != tt.wantRetries {
|
||||
t.Errorf("Spooler.process() wrong retry count got: %d want %d", tt.fields.currentHandler.maxErrCount, tt.wantRetries)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start).Round(1 * time.Second)
|
||||
if tt.args.timeout != 0 && elapsed != tt.args.timeout {
|
||||
@@ -208,31 +242,31 @@ func TestSpooler_load(t *testing.T) {
|
||||
{
|
||||
"lock exists",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond),
|
||||
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView1", cycleDuration: 1 * time.Second, bulkLimit: 10},
|
||||
locker: newTestLocker(t, "testID", "testView1").expectRenew(t, fmt.Errorf("lock already exists"), 500*time.Millisecond),
|
||||
},
|
||||
},
|
||||
{
|
||||
"lock fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond),
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView2", cycleDuration: 1 * time.Second, bulkLimit: 10},
|
||||
locker: newTestLocker(t, "testID", "testView2").expectRenew(t, fmt.Errorf("fail"), 500*time.Millisecond),
|
||||
eventstore: &eventstoreStub{events: []*models.Event{{}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
"query fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView3", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10},
|
||||
locker: newTestLocker(t, "testID", "testView3").expectRenew(t, nil, 500*time.Millisecond),
|
||||
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
|
||||
},
|
||||
},
|
||||
{
|
||||
"process event fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond),
|
||||
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView4", cycleDuration: 500 * time.Millisecond, bulkLimit: 10},
|
||||
locker: newTestLocker(t, "testID", "testView4").expectRenew(t, nil, 250*time.Millisecond),
|
||||
eventstore: &eventstoreStub{events: []*models.Event{{}}},
|
||||
},
|
||||
},
|
||||
@@ -268,7 +302,7 @@ func TestSpooler_lock(t *testing.T) {
|
||||
"renew correct",
|
||||
fields{
|
||||
currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 500*time.Millisecond),
|
||||
expectsErr: false,
|
||||
},
|
||||
args{
|
||||
@@ -279,7 +313,7 @@ func TestSpooler_lock(t *testing.T) {
|
||||
"renew fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"},
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 1800*time.Millisecond),
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 450*time.Millisecond),
|
||||
expectsErr: true,
|
||||
},
|
||||
args{
|
||||
@@ -333,13 +367,15 @@ func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker {
|
||||
}
|
||||
|
||||
func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker {
|
||||
t.Helper()
|
||||
l.mock.EXPECT().Renew(gomock.Any(), l.viewName, gomock.Any()).DoAndReturn(
|
||||
func(_, _ string, gotten time.Duration) error {
|
||||
t.Helper()
|
||||
if waitTime-gotten != 0 {
|
||||
t.Errorf("expected waittime %v got %v", waitTime, gotten)
|
||||
}
|
||||
return err
|
||||
}).Times(1)
|
||||
}).MinTimes(1).MaxTimes(3)
|
||||
|
||||
return l
|
||||
}
|
||||
@@ -419,6 +455,7 @@ func TestHandleError(t *testing.T) {
|
||||
},
|
||||
res: res{
|
||||
shouldProcessSequence: false,
|
||||
wantErr: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -432,7 +469,7 @@ func TestHandleError(t *testing.T) {
|
||||
func(*repository.FailedEvent) error {
|
||||
return nil
|
||||
},
|
||||
func(uint64, time.Time) error {
|
||||
func(*models.Event) error {
|
||||
processedSequence = true
|
||||
return nil
|
||||
},
|
||||
|
73
internal/eventstore/subscription.go
Normal file
73
internal/eventstore/subscription.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
)
|
||||
|
||||
var (
|
||||
subscriptions map[models.AggregateType][]*Subscription = map[models.AggregateType][]*Subscription{}
|
||||
subsMutext sync.Mutex
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
Events chan *models.Event
|
||||
aggregates []models.AggregateType
|
||||
}
|
||||
|
||||
func (es *eventstore) Subscribe(aggregates ...models.AggregateType) *Subscription {
|
||||
events := make(chan *models.Event, 100)
|
||||
sub := &Subscription{
|
||||
Events: events,
|
||||
aggregates: aggregates,
|
||||
}
|
||||
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
|
||||
for _, aggregate := range aggregates {
|
||||
_, ok := subscriptions[aggregate]
|
||||
if !ok {
|
||||
subscriptions[aggregate] = make([]*Subscription, 0, 1)
|
||||
}
|
||||
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
|
||||
}
|
||||
|
||||
return sub
|
||||
}
|
||||
|
||||
func notify(aggregates []*models.Aggregate) {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
for _, aggregate := range aggregates {
|
||||
subs, ok := subscriptions[aggregate.Type()]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, sub := range subs {
|
||||
for _, event := range aggregate.Events {
|
||||
sub.Events <- event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscription) Unsubscribe() {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
for _, aggregate := range s.aggregates {
|
||||
subs, ok := subscriptions[aggregate]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for i := len(subs) - 1; i >= 0; i-- {
|
||||
if subs[i] == s {
|
||||
subs[i] = subs[len(subs)-1]
|
||||
subs[len(subs)-1] = nil
|
||||
subs = subs[:len(subs)-1]
|
||||
}
|
||||
}
|
||||
}
|
||||
close(s.Events)
|
||||
}
|
Reference in New Issue
Block a user