2020-04-06 06:42:21 +02:00
|
|
|
package eventstore
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2024-02-23 10:29:10 +02:00
|
|
|
"errors"
|
2023-01-16 12:30:03 +01:00
|
|
|
"sort"
|
2021-02-23 15:13:04 +01:00
|
|
|
"sync"
|
2022-12-15 10:40:13 +01:00
|
|
|
"time"
|
2020-04-06 06:42:21 +02:00
|
|
|
|
2024-03-27 14:48:22 +01:00
|
|
|
"github.com/jackc/pgx/v5/pgconn"
|
2024-02-23 10:29:10 +02:00
|
|
|
"github.com/zitadel/logging"
|
|
|
|
|
2022-04-27 01:01:45 +02:00
|
|
|
"github.com/zitadel/zitadel/internal/api/authz"
|
2020-04-06 06:42:21 +02:00
|
|
|
)
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// Eventstore abstracts all functions needed to store valid events
|
2021-02-23 15:13:04 +01:00
|
|
|
// and filters the stored events
|
|
|
|
type Eventstore struct {
|
2024-01-25 17:28:20 +01:00
|
|
|
PushTimeout time.Duration
|
2024-02-23 10:29:10 +02:00
|
|
|
maxRetries int
|
2023-09-06 16:34:07 +02:00
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
pusher Pusher
|
|
|
|
querier Querier
|
|
|
|
|
|
|
|
instances []string
|
|
|
|
lastInstanceQuery time.Time
|
|
|
|
instancesMu sync.Mutex
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
|
|
|
|
2024-01-25 17:28:20 +01:00
|
|
|
var (
|
|
|
|
eventInterceptors map[EventType]eventTypeInterceptors
|
|
|
|
eventTypes []string
|
|
|
|
aggregateTypes []string
|
2024-03-05 16:44:51 +01:00
|
|
|
eventTypeMapping = map[EventType]AggregateType{}
|
2024-01-25 17:28:20 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
|
|
|
func RegisterFilterEventMapper(aggregateType AggregateType, eventType EventType, mapper func(Event) (Event, error)) {
|
|
|
|
if mapper == nil || eventType == "" {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
appendEventType(eventType)
|
|
|
|
appendAggregateType(aggregateType)
|
|
|
|
|
|
|
|
if eventInterceptors == nil {
|
|
|
|
eventInterceptors = make(map[EventType]eventTypeInterceptors)
|
|
|
|
}
|
2024-03-05 16:44:51 +01:00
|
|
|
|
2024-01-25 17:28:20 +01:00
|
|
|
interceptor := eventInterceptors[eventType]
|
|
|
|
interceptor.eventMapper = mapper
|
|
|
|
eventInterceptors[eventType] = interceptor
|
2024-03-05 16:44:51 +01:00
|
|
|
eventTypeMapping[eventType] = aggregateType
|
2024-01-25 17:28:20 +01:00
|
|
|
}
|
|
|
|
|
2021-02-23 15:13:04 +01:00
|
|
|
type eventTypeInterceptors struct {
|
2023-10-19 12:19:10 +02:00
|
|
|
eventMapper func(Event) (Event, error)
|
2021-02-23 15:13:04 +01:00
|
|
|
}
|
2020-04-06 06:42:21 +02:00
|
|
|
|
2022-12-15 10:40:13 +01:00
|
|
|
func NewEventstore(config *Config) *Eventstore {
|
2021-02-23 15:13:04 +01:00
|
|
|
return &Eventstore{
|
2024-01-25 17:28:20 +01:00
|
|
|
PushTimeout: config.PushTimeout,
|
2024-02-23 10:29:10 +02:00
|
|
|
maxRetries: int(config.MaxRetries),
|
2023-10-19 12:19:10 +02:00
|
|
|
|
|
|
|
pusher: config.Pusher,
|
|
|
|
querier: config.Querier,
|
|
|
|
|
|
|
|
instancesMu: sync.Mutex{},
|
2021-02-23 15:13:04 +01:00
|
|
|
}
|
|
|
|
}
|
2020-11-12 22:50:01 +01:00
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// Health checks if the eventstore can properly work
|
2021-02-23 15:13:04 +01:00
|
|
|
// It checks if the repository can serve load
|
|
|
|
func (es *Eventstore) Health(ctx context.Context) error {
|
2023-10-19 12:19:10 +02:00
|
|
|
if err := es.pusher.Health(ctx); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return es.querier.Health(ctx)
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// Push pushes the events in a single transaction
|
2021-02-23 15:13:04 +01:00
|
|
|
// an event needs at least an aggregate
|
2022-01-03 09:19:07 +01:00
|
|
|
func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error) {
|
2022-12-15 10:40:13 +01:00
|
|
|
if es.PushTimeout > 0 {
|
|
|
|
var cancel func()
|
|
|
|
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
|
|
|
|
defer cancel()
|
|
|
|
}
|
2024-02-23 10:29:10 +02:00
|
|
|
var (
|
|
|
|
events []Event
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
|
|
|
|
// Retry when there is a collision of the sequence as part of the primary key.
|
|
|
|
// "duplicate key value violates unique constraint \"events2_pkey\" (SQLSTATE 23505)"
|
|
|
|
// https://github.com/zitadel/zitadel/issues/7202
|
|
|
|
retry:
|
|
|
|
for i := 0; i <= es.maxRetries; i++ {
|
|
|
|
events, err = es.pusher.Push(ctx, cmds...)
|
|
|
|
var pgErr *pgconn.PgError
|
|
|
|
if !errors.As(err, &pgErr) || pgErr.ConstraintName != "events2_pkey" || pgErr.SQLState() != "23505" {
|
|
|
|
break retry
|
|
|
|
}
|
|
|
|
logging.WithError(err).Info("eventstore push retry")
|
|
|
|
}
|
2021-02-23 15:13:04 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-02-24 13:24:33 +01:00
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
mappedEvents, err := es.mapEvents(events)
|
2021-02-24 13:24:33 +01:00
|
|
|
if err != nil {
|
2023-10-19 12:19:10 +02:00
|
|
|
return mappedEvents, err
|
2021-02-24 13:24:33 +01:00
|
|
|
}
|
2023-10-19 12:19:10 +02:00
|
|
|
es.notify(mappedEvents)
|
|
|
|
return mappedEvents, nil
|
2022-04-13 07:42:48 +02:00
|
|
|
}
|
|
|
|
|
2024-03-05 16:44:51 +01:00
|
|
|
func AggregateTypeFromEventType(typ EventType) AggregateType {
|
|
|
|
return eventTypeMapping[typ]
|
|
|
|
}
|
|
|
|
|
2023-01-16 12:30:03 +01:00
|
|
|
func (es *Eventstore) EventTypes() []string {
|
2024-01-25 17:28:20 +01:00
|
|
|
return eventTypes
|
2023-01-16 12:30:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (es *Eventstore) AggregateTypes() []string {
|
2024-01-25 17:28:20 +01:00
|
|
|
return aggregateTypes
|
2023-01-16 12:30:03 +01:00
|
|
|
}
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// Filter filters the stored events based on the searchQuery
|
2021-02-23 15:13:04 +01:00
|
|
|
// and maps the events to the defined event structs
|
2023-10-19 18:21:31 +03:00
|
|
|
//
|
|
|
|
// Deprecated: Use [FilterToQueryReducer] instead to avoid allocations.
|
|
|
|
func (es *Eventstore) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) {
|
|
|
|
events := make([]Event, 0, searchQuery.GetLimit())
|
|
|
|
searchQuery.ensureInstanceID(ctx)
|
|
|
|
err := es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error {
|
|
|
|
event, err := es.mapEvent(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
events = append(events, event)
|
|
|
|
return nil
|
|
|
|
})
|
2021-02-23 15:13:04 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-10-19 18:21:31 +03:00
|
|
|
return events, nil
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error) {
|
2022-01-03 09:19:07 +01:00
|
|
|
mappedEvents = make([]Event, len(events))
|
2021-02-23 15:13:04 +01:00
|
|
|
for i, event := range events {
|
2023-10-19 18:21:31 +03:00
|
|
|
mappedEvents[i], err = es.mapEventLocked(event)
|
2021-02-23 15:13:04 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
|
|
|
}
|
2021-02-23 15:13:04 +01:00
|
|
|
return mappedEvents, nil
|
|
|
|
}
|
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
func (es *Eventstore) mapEvent(event Event) (Event, error) {
|
2023-10-19 18:21:31 +03:00
|
|
|
return es.mapEventLocked(event)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (es *Eventstore) mapEventLocked(event Event) (Event, error) {
|
2024-01-25 17:28:20 +01:00
|
|
|
interceptors, ok := eventInterceptors[event.Type()]
|
2023-10-19 12:19:10 +02:00
|
|
|
if !ok || interceptors.eventMapper == nil {
|
|
|
|
return BaseEventFromRepo(event), nil
|
|
|
|
}
|
|
|
|
return interceptors.eventMapper(event)
|
|
|
|
}
|
|
|
|
|
2023-10-19 18:21:31 +03:00
|
|
|
// TODO: refactor so we can change to the following interface:
|
|
|
|
/*
|
|
|
|
type reducer interface {
|
|
|
|
// Reduce applies an event on the object.
|
|
|
|
Reduce(Event) error
|
|
|
|
}
|
|
|
|
*/
|
|
|
|
|
2021-02-23 15:13:04 +01:00
|
|
|
type reducer interface {
|
|
|
|
//Reduce handles the events of the internal events list
|
|
|
|
// it only appends the newly added events
|
|
|
|
Reduce() error
|
|
|
|
//AppendEvents appends the passed events to an internal list of events
|
2022-01-03 09:19:07 +01:00
|
|
|
AppendEvents(...Event)
|
2021-02-23 15:13:04 +01:00
|
|
|
}
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
|
2021-02-23 15:13:04 +01:00
|
|
|
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error {
|
2023-10-19 18:21:31 +03:00
|
|
|
searchQuery.ensureInstanceID(ctx)
|
|
|
|
return es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error {
|
|
|
|
event, err := es.mapEvent(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.AppendEvents(event)
|
|
|
|
return r.Reduce()
|
|
|
|
})
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// LatestSequence filters the latest sequence for the given search query
|
2023-10-19 12:19:10 +02:00
|
|
|
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
|
|
|
|
queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID())
|
|
|
|
|
|
|
|
return es.querier.LatestSequence(ctx, queryFactory)
|
2020-07-28 09:42:21 +02:00
|
|
|
}
|
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// InstanceIDs returns the instance ids found by the search query
|
2023-09-06 16:34:07 +02:00
|
|
|
// forceDBCall forces to query the database, the instance ids are not cached
|
|
|
|
func (es *Eventstore) InstanceIDs(ctx context.Context, maxAge time.Duration, forceDBCall bool, queryFactory *SearchQueryBuilder) ([]string, error) {
|
|
|
|
es.instancesMu.Lock()
|
|
|
|
defer es.instancesMu.Unlock()
|
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
if !forceDBCall && time.Since(es.lastInstanceQuery) <= maxAge {
|
2023-09-06 16:34:07 +02:00
|
|
|
return es.instances, nil
|
|
|
|
}
|
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
instances, err := es.querier.InstanceIDs(ctx, queryFactory)
|
2023-09-06 16:34:07 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-10-19 12:19:10 +02:00
|
|
|
|
2023-09-06 16:34:07 +02:00
|
|
|
if !forceDBCall {
|
|
|
|
es.instances = instances
|
2023-10-19 12:19:10 +02:00
|
|
|
es.lastInstanceQuery = time.Now()
|
2023-09-06 16:34:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return instances, nil
|
2022-07-22 12:08:39 +02:00
|
|
|
}
|
|
|
|
|
2022-04-12 16:20:17 +02:00
|
|
|
type QueryReducer interface {
|
2021-02-23 15:13:04 +01:00
|
|
|
reducer
|
|
|
|
//Query returns the SearchQueryFactory for the events needed in reducer
|
|
|
|
Query() *SearchQueryBuilder
|
2020-04-06 06:42:21 +02:00
|
|
|
}
|
2020-04-07 13:23:04 +02:00
|
|
|
|
2022-10-26 15:06:48 +02:00
|
|
|
// FilterToQueryReducer filters the events based on the search query of the query function,
|
2021-02-23 15:13:04 +01:00
|
|
|
// appends all events to the reducer and calls it's reduce function
|
2022-04-12 16:20:17 +02:00
|
|
|
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error {
|
2023-10-19 18:21:31 +03:00
|
|
|
return es.FilterToReducer(ctx, r.Query(), r)
|
2021-02-23 15:13:04 +01:00
|
|
|
}
|
|
|
|
|
2023-10-19 18:21:31 +03:00
|
|
|
type Reducer func(event Event) error
|
|
|
|
|
2023-10-19 12:19:10 +02:00
|
|
|
type Querier interface {
|
|
|
|
// Health checks if the connection to the storage is available
|
|
|
|
Health(ctx context.Context) error
|
2023-10-19 18:21:31 +03:00
|
|
|
// FilterToReducer calls r for every event returned from the storage
|
|
|
|
FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error
|
2023-10-19 12:19:10 +02:00
|
|
|
// LatestSequence returns the latest sequence found by the search query
|
|
|
|
LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error)
|
|
|
|
// InstanceIDs returns the instance ids found by the search query
|
|
|
|
InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
type Pusher interface {
|
|
|
|
// Health checks if the connection to the storage is available
|
|
|
|
Health(ctx context.Context) error
|
|
|
|
// Push stores the actions
|
|
|
|
Push(ctx context.Context, commands ...Command) (_ []Event, err error)
|
|
|
|
}
|
|
|
|
|
2024-01-25 17:28:20 +01:00
|
|
|
func appendEventType(typ EventType) {
|
|
|
|
i := sort.SearchStrings(eventTypes, string(typ))
|
|
|
|
if i < len(eventTypes) && eventTypes[i] == string(typ) {
|
2023-01-16 12:30:03 +01:00
|
|
|
return
|
|
|
|
}
|
2024-01-25 17:28:20 +01:00
|
|
|
eventTypes = append(eventTypes[:i], append([]string{string(typ)}, eventTypes[i:]...)...)
|
2023-01-16 12:30:03 +01:00
|
|
|
}
|
|
|
|
|
2024-01-25 17:28:20 +01:00
|
|
|
func appendAggregateType(typ AggregateType) {
|
|
|
|
i := sort.SearchStrings(aggregateTypes, string(typ))
|
|
|
|
if len(aggregateTypes) > i && aggregateTypes[i] == string(typ) {
|
2023-01-16 12:30:03 +01:00
|
|
|
return
|
|
|
|
}
|
2024-01-25 17:28:20 +01:00
|
|
|
aggregateTypes = append(aggregateTypes[:i], append([]string{string(typ)}, aggregateTypes[i:]...)...)
|
2023-01-16 12:30:03 +01:00
|
|
|
}
|