mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 04:57:33 +00:00
fix(event handling): use internal pubsub for view update (#1118)
* start sub * start implement subsciptions * start subscription * implementation for member done * admin done * fix: tests * extend handlers * prepary notification * no errors in adminapi * changed current sequence in all packages * ignore mocks * works * subscriptions as singleton * tests * refactor: rename function scope var
This commit is contained in:
@@ -14,6 +14,7 @@ type Eventstore interface {
|
||||
PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error
|
||||
FilterEvents(ctx context.Context, searchQuery *models.SearchQuery) (events []*models.Event, err error)
|
||||
LatestSequence(ctx context.Context, searchQuery *models.SearchQueryFactory) (uint64, error)
|
||||
Subscribe(aggregates ...models.AggregateType) *Subscription
|
||||
}
|
||||
|
||||
var _ Eventstore = (*eventstore)(nil)
|
||||
@@ -42,6 +43,8 @@ func (es *eventstore) PushAggregates(ctx context.Context, aggregates ...*models.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go notify(aggregates)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -6,6 +6,7 @@ package mock
|
||||
|
||||
import (
|
||||
context "context"
|
||||
eventstore "github.com/caos/zitadel/internal/eventstore"
|
||||
models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
reflect "reflect"
|
||||
@@ -110,3 +111,21 @@ func (mr *MockEventstoreMockRecorder) PushAggregates(arg0 interface{}, arg1 ...i
|
||||
varargs := append([]interface{}{arg0}, arg1...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushAggregates", reflect.TypeOf((*MockEventstore)(nil).PushAggregates), varargs...)
|
||||
}
|
||||
|
||||
// Subscribe mocks base method
|
||||
func (m *MockEventstore) Subscribe(arg0 ...models.AggregateType) *eventstore.Subscription {
|
||||
m.ctrl.T.Helper()
|
||||
varargs := []interface{}{}
|
||||
for _, a := range arg0 {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "Subscribe", varargs...)
|
||||
ret0, _ := ret[0].(*eventstore.Subscription)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Subscribe indicates an expected call of Subscribe
|
||||
func (mr *MockEventstoreMockRecorder) Subscribe(arg0 ...interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventstore)(nil).Subscribe), arg0...)
|
||||
}
|
||||
|
@@ -27,6 +27,10 @@ type Aggregate struct {
|
||||
Precondition *precondition
|
||||
}
|
||||
|
||||
func (a *Aggregate) Type() AggregateType {
|
||||
return a.typ
|
||||
}
|
||||
|
||||
type precondition struct {
|
||||
Query *SearchQuery
|
||||
Validation func(...*Event) error
|
||||
|
@@ -11,7 +11,8 @@ type SearchQueryFactory struct {
|
||||
desc bool
|
||||
aggregateTypes []AggregateType
|
||||
aggregateIDs []string
|
||||
eventSequence uint64
|
||||
sequenceFrom uint64
|
||||
sequenceTo uint64
|
||||
eventTypes []EventType
|
||||
resourceOwner string
|
||||
}
|
||||
@@ -51,7 +52,11 @@ func FactoryFromSearchQuery(query *SearchQuery) *SearchQueryFactory {
|
||||
factory = factory.AggregateIDs(aggregateIDs...)
|
||||
}
|
||||
case Field_LatestSequence:
|
||||
factory = factory.SequenceGreater(filter.value.(uint64))
|
||||
if filter.operation == Operation_Greater {
|
||||
factory = factory.SequenceGreater(filter.value.(uint64))
|
||||
} else {
|
||||
factory = factory.SequenceLess(filter.value.(uint64))
|
||||
}
|
||||
case Field_ResourceOwner:
|
||||
factory = factory.ResourceOwner(filter.value.(string))
|
||||
case Field_EventType:
|
||||
@@ -82,7 +87,12 @@ func (factory *SearchQueryFactory) Limit(limit uint64) *SearchQueryFactory {
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) SequenceGreater(sequence uint64) *SearchQueryFactory {
|
||||
factory.eventSequence = sequence
|
||||
factory.sequenceFrom = sequence
|
||||
return factory
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) SequenceLess(sequence uint64) *SearchQueryFactory {
|
||||
factory.sequenceTo = sequence
|
||||
return factory
|
||||
}
|
||||
|
||||
@@ -128,7 +138,8 @@ func (factory *SearchQueryFactory) Build() (*searchQuery, error) {
|
||||
|
||||
for _, f := range []func() *Filter{
|
||||
factory.aggregateIDFilter,
|
||||
factory.eventSequenceFilter,
|
||||
factory.sequenceFromFilter,
|
||||
factory.sequenceToFilter,
|
||||
factory.eventTypeFilter,
|
||||
factory.resourceOwnerFilter,
|
||||
} {
|
||||
@@ -172,15 +183,26 @@ func (factory *SearchQueryFactory) aggregateTypeFilter() *Filter {
|
||||
return NewFilter(Field_AggregateType, factory.aggregateTypes, Operation_In)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) eventSequenceFilter() *Filter {
|
||||
if factory.eventSequence == 0 {
|
||||
func (factory *SearchQueryFactory) sequenceFromFilter() *Filter {
|
||||
if factory.sequenceFrom == 0 {
|
||||
return nil
|
||||
}
|
||||
sortOrder := Operation_Greater
|
||||
if factory.desc {
|
||||
sortOrder = Operation_Less
|
||||
}
|
||||
return NewFilter(Field_LatestSequence, factory.eventSequence, sortOrder)
|
||||
return NewFilter(Field_LatestSequence, factory.sequenceFrom, sortOrder)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) sequenceToFilter() *Filter {
|
||||
if factory.sequenceTo == 0 {
|
||||
return nil
|
||||
}
|
||||
sortOrder := Operation_Less
|
||||
if factory.desc {
|
||||
sortOrder = Operation_Greater
|
||||
}
|
||||
return NewFilter(Field_LatestSequence, factory.sequenceTo, sortOrder)
|
||||
}
|
||||
|
||||
func (factory *SearchQueryFactory) resourceOwnerFilter() *Filter {
|
||||
|
@@ -58,6 +58,12 @@ func (q *SearchQuery) LatestSequenceFilter(sequence uint64) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_LatestSequence, sequence, sortOrder))
|
||||
}
|
||||
|
||||
func (q *SearchQuery) SequenceBetween(from, to uint64) *SearchQuery {
|
||||
q.setFilter(NewFilter(Field_LatestSequence, from, Operation_Greater))
|
||||
q.setFilter(NewFilter(Field_LatestSequence, to, Operation_Less))
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_ResourceOwner, resourceOwner, Operation_Equals))
|
||||
}
|
||||
|
@@ -103,7 +103,7 @@ func TestSearchQueryFactorySetters(t *testing.T) {
|
||||
setters: []func(*SearchQueryFactory) *SearchQueryFactory{testSetSequence(90)},
|
||||
},
|
||||
res: &SearchQueryFactory{
|
||||
eventSequence: 90,
|
||||
sequenceFrom: 90,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@@ -1,8 +1,11 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
)
|
||||
|
||||
@@ -14,4 +17,42 @@ type Handler interface {
|
||||
OnSuccess() error
|
||||
MinimumCycleDuration() time.Duration
|
||||
QueryLimit() uint64
|
||||
|
||||
AggregateTypes() []models.AggregateType
|
||||
CurrentSequence(*models.Event) (uint64, error)
|
||||
Eventstore() eventstore.Eventstore
|
||||
}
|
||||
|
||||
func ReduceEvent(handler Handler, event *models.Event) {
|
||||
currentSequence, err := handler.CurrentSequence(event)
|
||||
if err != nil {
|
||||
logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence")
|
||||
return
|
||||
}
|
||||
if event.PreviousSequence > currentSequence {
|
||||
searchQuery := models.NewSearchQuery().
|
||||
AggregateTypeFilter(handler.AggregateTypes()...).
|
||||
SequenceBetween(currentSequence, event.PreviousSequence)
|
||||
|
||||
events, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed")
|
||||
return
|
||||
}
|
||||
for _, previousEvent := range events {
|
||||
//if other process already updated view
|
||||
//TODO: correct?
|
||||
if event.PreviousSequence > previousEvent.Sequence {
|
||||
continue
|
||||
}
|
||||
err = handler.Reduce(previousEvent)
|
||||
logging.LogWithFields("HANDL-V42TI", "seq", previousEvent.Sequence).OnError(err).Warn("reduce failed")
|
||||
return
|
||||
}
|
||||
} else if event.PreviousSequence > 0 && event.PreviousSequence < currentSequence {
|
||||
logging.LogWithFields("HANDL-w9Bdy", "previousSeq", event.PreviousSequence, "currentSeq", currentSequence).Debug("already processed")
|
||||
return
|
||||
}
|
||||
err = handler.Reduce(event)
|
||||
logging.LogWithFields("HANDL-wQDL2", "seq", event.Sequence).OnError(err).Warn("reduce failed")
|
||||
}
|
||||
|
@@ -167,7 +167,7 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s
|
||||
func HandleError(event *models.Event, failedErr error,
|
||||
latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error),
|
||||
processFailedEvent func(*repository.FailedEvent) error,
|
||||
processSequence func(uint64, time.Time) error, errorCountUntilSkip uint64) error {
|
||||
processSequence func(*models.Event) error, errorCountUntilSkip uint64) error {
|
||||
failedEvent, err := latestFailedEvent(event.Sequence)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -179,7 +179,7 @@ func HandleError(event *models.Event, failedErr error,
|
||||
return err
|
||||
}
|
||||
if errorCountUntilSkip <= failedEvent.FailureCount {
|
||||
return processSequence(event.Sequence, event.CreationDate)
|
||||
return processSequence(event)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -24,6 +24,18 @@ type testHandler struct {
|
||||
bulkLimit uint64
|
||||
}
|
||||
|
||||
func (h *testHandler) AggregateTypes() []models.AggregateType {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *testHandler) CurrentSequence(event *models.Event) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (h *testHandler) Eventstore() eventstore.Eventstore {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *testHandler) ViewModel() string {
|
||||
return h.viewModel
|
||||
}
|
||||
@@ -55,6 +67,8 @@ type eventstoreStub struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (es *eventstoreStub) Subscribe(...models.AggregateType) *eventstore.Subscription { return nil }
|
||||
|
||||
func (es *eventstoreStub) Health(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@@ -432,7 +446,7 @@ func TestHandleError(t *testing.T) {
|
||||
func(*repository.FailedEvent) error {
|
||||
return nil
|
||||
},
|
||||
func(uint64, time.Time) error {
|
||||
func(*models.Event) error {
|
||||
processedSequence = true
|
||||
return nil
|
||||
},
|
||||
|
73
internal/eventstore/subscription.go
Normal file
73
internal/eventstore/subscription.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
)
|
||||
|
||||
var (
|
||||
subscriptions map[models.AggregateType][]*Subscription = map[models.AggregateType][]*Subscription{}
|
||||
subsMutext sync.Mutex
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
Events chan *models.Event
|
||||
aggregates []models.AggregateType
|
||||
}
|
||||
|
||||
func (es *eventstore) Subscribe(aggregates ...models.AggregateType) *Subscription {
|
||||
events := make(chan *models.Event, 100)
|
||||
sub := &Subscription{
|
||||
Events: events,
|
||||
aggregates: aggregates,
|
||||
}
|
||||
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
|
||||
for _, aggregate := range aggregates {
|
||||
_, ok := subscriptions[aggregate]
|
||||
if !ok {
|
||||
subscriptions[aggregate] = make([]*Subscription, 0, 1)
|
||||
}
|
||||
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
|
||||
}
|
||||
|
||||
return sub
|
||||
}
|
||||
|
||||
func notify(aggregates []*models.Aggregate) {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
for _, aggregate := range aggregates {
|
||||
subs, ok := subscriptions[aggregate.Type()]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, sub := range subs {
|
||||
for _, event := range aggregate.Events {
|
||||
sub.Events <- event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscription) Unsubscribe() {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
for _, aggregate := range s.aggregates {
|
||||
subs, ok := subscriptions[aggregate]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for i := len(subs) - 1; i >= 0; i-- {
|
||||
if subs[i] == s {
|
||||
subs[i] = subs[len(subs)-1]
|
||||
subs[len(subs)-1] = nil
|
||||
subs = subs[:len(subs)-1]
|
||||
}
|
||||
}
|
||||
}
|
||||
close(s.Events)
|
||||
}
|
Reference in New Issue
Block a user