zitadel/internal/eventstore/subscription.go
Silvan 5349d96ce4
fix(eventstore): sub queries (#1805)
* sub queries

* fix: tests

* add builder to tests

* new search query

* rename searchquerybuilder to builder

* remove comment from code

* test with multiple queries

* add filters test

* fix(contibute): listing

* add validate module

* fix: search queries

* remove unused event type in query

* ignore query if error in marshal

* go mod tidy

* update privacy policy query

* update queries

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
2021-07-06 13:55:57 +02:00

96 lines
2.1 KiB
Go

package eventstore
import (
"sync"
v1 "github.com/caos/zitadel/internal/eventstore/v1"
"github.com/caos/zitadel/internal/eventstore/v1/models"
)
var (
subscriptions = map[AggregateType][]*Subscription{}
subsMutext sync.Mutex
)
type Subscription struct {
Events chan EventReader
aggregates []AggregateType
}
func Subscribe(aggregates ...AggregateType) *Subscription {
events := make(chan EventReader, 100)
sub := &Subscription{
Events: events,
aggregates: aggregates,
}
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range aggregates {
_, ok := subscriptions[aggregate]
if !ok {
subscriptions[aggregate] = make([]*Subscription, 0, 1)
}
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
}
return sub
}
func notify(events []EventReader) {
go v1.Notify(MapEventsToV1Events(events))
subsMutext.Lock()
defer subsMutext.Unlock()
for _, event := range events {
subs, ok := subscriptions[event.Aggregate().Typ]
if !ok {
continue
}
for _, sub := range subs {
sub.Events <- event
}
}
}
func (s *Subscription) Unsubscribe() {
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range s.aggregates {
subs, ok := subscriptions[aggregate]
if !ok {
continue
}
for i := len(subs) - 1; i >= 0; i-- {
if subs[i] == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
subs = subs[:len(subs)-1]
}
}
}
close(s.Events)
}
func MapEventsToV1Events(events []EventReader) []*models.Event {
v1Events := make([]*models.Event, len(events))
for i, event := range events {
v1Events[i] = mapEventToV1Event(event)
}
return v1Events
}
func mapEventToV1Event(event EventReader) *models.Event {
return &models.Event{
Sequence: event.Sequence(),
CreationDate: event.CreationDate(),
Type: models.EventType(event.Type()),
AggregateType: models.AggregateType(event.Aggregate().Typ),
AggregateID: event.Aggregate().ID,
ResourceOwner: event.Aggregate().ResourceOwner,
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Data: event.DataAsBytes(),
}
}