zitadel/internal/eventstore/eventstore.go

358 lines
10 KiB
Go
Raw Normal View History

package eventstore
import (
"context"
"database/sql"
"encoding/json"
"reflect"
"sort"
"sync"
"time"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
2022-10-26 15:06:48 +02:00
// Eventstore abstracts all functions needed to store valid events
// and filters the stored events
type Eventstore struct {
repo repository.Repository
interceptorMutex sync.Mutex
eventInterceptors map[EventType]eventTypeInterceptors
eventTypes []string
aggregateTypes []string
PushTimeout time.Duration
instancesMu sync.Mutex
instances []string
lastInstancesQuery time.Time
}
type eventTypeInterceptors struct {
eventMapper func(*repository.Event) (Event, error)
}
func NewEventstore(config *Config) *Eventstore {
return &Eventstore{
repo: config.repo,
eventInterceptors: map[EventType]eventTypeInterceptors{},
interceptorMutex: sync.Mutex{},
PushTimeout: config.PushTimeout,
}
}
2022-10-26 15:06:48 +02:00
// Health checks if the eventstore can properly work
// It checks if the repository can serve load
func (es *Eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}
2022-10-26 15:06:48 +02:00
// Push pushes the events in a single transaction
// an event needs at least an aggregate
func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error) {
feat: handle instance from context (#3382) * commander * commander * selber! * move to packages * fix(errors): implement Is interface * test: command * test: commands * add init steps * setup tenant * add default step yaml * possibility to set password * merge v2 into v2-commander * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: search query builder can filter events in memory * fix: filters for add member * fix(setup): add `ExternalSecure` to config * chore: name iam to instance * fix: matching * remove unsued func * base url * base url * test(command): filter funcs * test: commands * fix: rename orgiampolicy to domain policy * start from init * commands * config * fix indexes and add constraints * fixes * fix: merge conflicts * fix: protos * fix: md files * setup * add deprecated org iam policy again * typo * fix search query * fix filter * Apply suggestions from code review * remove custom org from org setup * add todos for verification * change apps creation * simplify package structure * fix error * move preparation helper for tests * fix unique constraints * fix config mapping in setup * fix error handling in encryption_keys.go * fix projection config * fix query from old views to projection * fix setup of mgmt api * set iam project and fix instance projection * fix tokens view * fix steps.yaml and defaults.yaml * fix projections * change instance context to interface * instance interceptors and additional events in setup * cleanup * tests for interceptors * fix label policy * add todo * single api endpoint in environment.json Co-authored-by: adlerhurst <silvan.reusser@gmail.com> Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2022-03-29 11:53:19 +02:00
events, constraints, err := commandsToRepository(authz.GetInstance(ctx).InstanceID(), cmds)
if err != nil {
return nil, err
}
if es.PushTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
defer cancel()
}
err = es.repo.Push(ctx, events, constraints...)
if err != nil {
return nil, err
}
eventReaders, err := es.mapEvents(events)
if err != nil {
return nil, err
}
go notify(eventReaders)
return eventReaders, nil
}
func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error {
err := es.repo.CreateInstance(ctx, instanceID)
if err != nil {
return err
}
es.instancesMu.Lock()
es.instances = append(es.instances, instanceID)
es.instancesMu.Unlock()
return nil
}
func (es *Eventstore) EventTypes() []string {
return es.eventTypes
}
func (es *Eventstore) AggregateTypes() []string {
return es.aggregateTypes
}
func commandsToRepository(instanceID string, cmds []Command) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {
events = make([]*repository.Event, len(cmds))
for i, cmd := range cmds {
data, err := EventData(cmd)
if err != nil {
return nil, nil, err
}
if cmd.Aggregate().ID == "" {
2021-12-17 11:11:57 +01:00
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Afdfe", "aggregate id must not be empty")
}
if cmd.Aggregate().Type == "" {
2021-12-17 11:11:57 +01:00
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Dfg32", "aggregate type must not be empty")
}
if cmd.Type() == "" {
2021-12-17 11:11:57 +01:00
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Drg34", "event type must not be empty")
}
if cmd.Aggregate().Version == "" {
2021-12-17 11:11:57 +01:00
return nil, nil, errors.ThrowInvalidArgument(nil, "V2-Dgfg4", "aggregate version must not be empty")
}
events[i] = &repository.Event{
AggregateID: cmd.Aggregate().ID,
AggregateType: repository.AggregateType(cmd.Aggregate().Type),
ResourceOwner: sql.NullString{String: cmd.Aggregate().ResourceOwner, Valid: cmd.Aggregate().ResourceOwner != ""},
InstanceID: instanceID,
EditorService: cmd.EditorService(),
EditorUser: cmd.EditorUser(),
Type: repository.EventType(cmd.Type()),
Version: repository.Version(cmd.Aggregate().Version),
Data: data,
}
if len(cmd.UniqueConstraints()) > 0 {
feat(cli): setup (#3267) * commander * commander * selber! * move to packages * fix(errors): implement Is interface * test: command * test: commands * add init steps * setup tenant * add default step yaml * possibility to set password * merge v2 into v2-commander * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: search query builder can filter events in memory * fix: filters for add member * fix(setup): add `ExternalSecure` to config * chore: name iam to instance * fix: matching * remove unsued func * base url * base url * test(command): filter funcs * test: commands * fix: rename orgiampolicy to domain policy * start from init * commands * config * fix indexes and add constraints * fixes * fix: merge conflicts * fix: protos * fix: md files * setup * add deprecated org iam policy again * typo * fix search query * fix filter * Apply suggestions from code review * remove custom org from org setup * add todos for verification * change apps creation * simplify package structure * fix error * move preparation helper for tests * fix unique constraints * fix config mapping in setup * fix error handling in encryption_keys.go * fix projection config * fix query from old views to projection * fix setup of mgmt api * set iam project and fix instance projection * imports Co-authored-by: Livio Amstutz <livio.a@gmail.com> Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2022-03-28 10:05:09 +02:00
constraints = append(constraints, uniqueConstraintsToRepository(instanceID, cmd.UniqueConstraints())...)
}
}
return events, constraints, nil
}
feat(cli): setup (#3267) * commander * commander * selber! * move to packages * fix(errors): implement Is interface * test: command * test: commands * add init steps * setup tenant * add default step yaml * possibility to set password * merge v2 into v2-commander * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: search query builder can filter events in memory * fix: filters for add member * fix(setup): add `ExternalSecure` to config * chore: name iam to instance * fix: matching * remove unsued func * base url * base url * test(command): filter funcs * test: commands * fix: rename orgiampolicy to domain policy * start from init * commands * config * fix indexes and add constraints * fixes * fix: merge conflicts * fix: protos * fix: md files * setup * add deprecated org iam policy again * typo * fix search query * fix filter * Apply suggestions from code review * remove custom org from org setup * add todos for verification * change apps creation * simplify package structure * fix error * move preparation helper for tests * fix unique constraints * fix config mapping in setup * fix error handling in encryption_keys.go * fix projection config * fix query from old views to projection * fix setup of mgmt api * set iam project and fix instance projection * imports Co-authored-by: Livio Amstutz <livio.a@gmail.com> Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2022-03-28 10:05:09 +02:00
func uniqueConstraintsToRepository(instanceID string, constraints []*EventUniqueConstraint) (uniqueConstraints []*repository.UniqueConstraint) {
uniqueConstraints = make([]*repository.UniqueConstraint, len(constraints))
for i, constraint := range constraints {
var id string
if !constraint.IsGlobal {
id = instanceID
}
uniqueConstraints[i] = &repository.UniqueConstraint{
UniqueType: constraint.UniqueType,
UniqueField: constraint.UniqueField,
InstanceID: id,
Action: uniqueConstraintActionToRepository(constraint.Action),
ErrorMessage: constraint.ErrorMessage,
}
}
return uniqueConstraints
}
2022-10-26 15:06:48 +02:00
// Filter filters the stored events based on the searchQuery
// and maps the events to the defined event structs
func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error) {
feat: handle instance from context (#3382) * commander * commander * selber! * move to packages * fix(errors): implement Is interface * test: command * test: commands * add init steps * setup tenant * add default step yaml * possibility to set password * merge v2 into v2-commander * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: search query builder can filter events in memory * fix: filters for add member * fix(setup): add `ExternalSecure` to config * chore: name iam to instance * fix: matching * remove unsued func * base url * base url * test(command): filter funcs * test: commands * fix: rename orgiampolicy to domain policy * start from init * commands * config * fix indexes and add constraints * fixes * fix: merge conflicts * fix: protos * fix: md files * setup * add deprecated org iam policy again * typo * fix search query * fix filter * Apply suggestions from code review * remove custom org from org setup * add todos for verification * change apps creation * simplify package structure * fix error * move preparation helper for tests * fix unique constraints * fix config mapping in setup * fix error handling in encryption_keys.go * fix projection config * fix query from old views to projection * fix setup of mgmt api * set iam project and fix instance projection * fix tokens view * fix steps.yaml and defaults.yaml * fix projections * change instance context to interface * instance interceptors and additional events in setup * cleanup * tests for interceptors * fix label policy * add todo * single api endpoint in environment.json Co-authored-by: adlerhurst <silvan.reusser@gmail.com> Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2022-03-29 11:53:19 +02:00
query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
events, err := es.repo.Filter(ctx, query)
if err != nil {
return nil, err
}
return es.mapEvents(events)
}
func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Event, err error) {
mappedEvents = make([]Event, len(events))
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
for i, event := range events {
interceptors, ok := es.eventInterceptors[EventType(event.Type)]
if !ok || interceptors.eventMapper == nil {
mappedEvents[i] = BaseEventFromRepo(event)
//TODO: return error if unable to map event
continue
// return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
}
mappedEvents[i], err = interceptors.eventMapper(event)
if err != nil {
return nil, err
}
}
return mappedEvents, nil
}
type reducer interface {
//Reduce handles the events of the internal events list
// it only appends the newly added events
Reduce() error
//AppendEvents appends the passed events to an internal list of events
AppendEvents(...Event)
}
2022-10-26 15:06:48 +02:00
// FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error {
events, err := es.Filter(ctx, searchQuery)
if err != nil {
return err
}
r.AppendEvents(events...)
return r.Reduce()
}
2022-10-26 15:06:48 +02:00
// LatestSequence filters the latest sequence for the given search query
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (uint64, error) {
feat: handle instance from context (#3382) * commander * commander * selber! * move to packages * fix(errors): implement Is interface * test: command * test: commands * add init steps * setup tenant * add default step yaml * possibility to set password * merge v2 into v2-commander * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: rename iam command side to instance * fix: search query builder can filter events in memory * fix: filters for add member * fix(setup): add `ExternalSecure` to config * chore: name iam to instance * fix: matching * remove unsued func * base url * base url * test(command): filter funcs * test: commands * fix: rename orgiampolicy to domain policy * start from init * commands * config * fix indexes and add constraints * fixes * fix: merge conflicts * fix: protos * fix: md files * setup * add deprecated org iam policy again * typo * fix search query * fix filter * Apply suggestions from code review * remove custom org from org setup * add todos for verification * change apps creation * simplify package structure * fix error * move preparation helper for tests * fix unique constraints * fix config mapping in setup * fix error handling in encryption_keys.go * fix projection config * fix query from old views to projection * fix setup of mgmt api * set iam project and fix instance projection * fix tokens view * fix steps.yaml and defaults.yaml * fix projections * change instance context to interface * instance interceptors and additional events in setup * cleanup * tests for interceptors * fix label policy * add todo * single api endpoint in environment.json Co-authored-by: adlerhurst <silvan.reusser@gmail.com> Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2022-03-29 11:53:19 +02:00
query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID())
if err != nil {
return 0, err
}
return es.repo.LatestSequence(ctx, query)
}
2022-10-26 15:06:48 +02:00
// InstanceIDs returns the instance ids found by the search query
// forceDBCall forces to query the database, the instance ids are not cached
func (es *Eventstore) InstanceIDs(ctx context.Context, maxAge time.Duration, forceDBCall bool, queryFactory *SearchQueryBuilder) ([]string, error) {
es.instancesMu.Lock()
defer es.instancesMu.Unlock()
if !forceDBCall && time.Since(es.lastInstancesQuery) <= maxAge {
return es.instances, nil
}
query, err := queryFactory.build(authz.GetInstance(ctx).InstanceID())
if err != nil {
return nil, err
}
instances, err := es.repo.InstanceIDs(ctx, query)
if err != nil {
return nil, err
}
if !forceDBCall {
es.instances = instances
es.lastInstancesQuery = time.Now()
}
return instances, nil
}
type QueryReducer interface {
reducer
//Query returns the SearchQueryFactory for the events needed in reducer
Query() *SearchQueryBuilder
}
Project commands (#26) * feat: eventstore repository * fix: remove gorm * version * feat: pkg * feat: add some files for project * feat: eventstore without eventstore-lib * rename files * gnueg * fix: key json * fix: add object * fix: change imports * fix: internal models * fix: some imports * fix: global model * fix: add some functions on repo * feat(eventstore): sdk * fix(eventstore): search query * fix(eventstore): rename app to eventstore * delete empty test * remove unused func * merge master * fix(eventstore): tests * fix(models): delete unused struct * fix: some funcitons * feat(eventstore): implemented push events * fix: move project eventstore to project package * fix: change project eventstore funcs * feat(eventstore): overwrite context data * fix: change project eventstore * fix: add project repo to mgmt server * feat(types): SQL-config * fix: commented code * feat(eventstore): options to overwrite editor * feat: auth interceptor and cockroach migrations * fix: migrations * fix: fix filter * fix: not found on getbyid * fix: add sequence * fix: add some tests * fix(eventstore): nullable sequence * fix: add some tests * merge * fix: add some tests * fix(migrations): correct statements for sequence * fix: add some tests * fix: add some tests * fix: changes from mr * Update internal/eventstore/models/field.go Co-Authored-By: livio-a <livio.a@gmail.com> * fix(eventstore): code quality * fix: add types to aggregate/Event-types * fix(eventstore): rename modifier* to editor* * fix(eventstore): delete editor_org * fix(migrations): remove editor_org field, rename modifier_* to editor_* * fix: generate files * fix(eventstore): tests * fix(eventstore): rename modifier to editor * fix(migrations): add cluster migration, fix(migrations): fix typo of host in clean clsuter * fix(eventstore): move health * fix(eventstore): AggregateTypeFilter aggregateType as param * code quality * feat: start implementing project members * feat: remove member funcs * feat: remove member model * feat: remove member events * feat: remove member repo model * fix: better error func testing * Update docs/local.md Co-Authored-By: Silvan <silvan.reusser@gmail.com> * Update docs/local.md Co-Authored-By: Silvan <silvan.reusser@gmail.com> * fix: mr requests * fix: md file Co-authored-by: adlerhurst <silvan.reusser@gmail.com> Co-authored-by: livio-a <livio.a@gmail.com>
2020-04-07 13:23:04 +02:00
2022-10-26 15:06:48 +02:00
// FilterToQueryReducer filters the events based on the search query of the query function,
// appends all events to the reducer and calls it's reduce function
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error {
events, err := es.Filter(ctx, r.Query())
if err != nil {
return err
}
r.AppendEvents(events...)
return r.Reduce()
}
2022-10-26 15:06:48 +02:00
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
func (es *Eventstore) RegisterFilterEventMapper(aggregateType AggregateType, eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore {
if mapper == nil || eventType == "" {
return es
}
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
es.appendEventType(eventType)
es.appendAggregateType(aggregateType)
interceptor := es.eventInterceptors[eventType]
interceptor.eventMapper = mapper
es.eventInterceptors[eventType] = interceptor
return es
}
func (es *Eventstore) appendEventType(typ EventType) {
i := sort.SearchStrings(es.eventTypes, string(typ))
if i < len(es.eventTypes) && es.eventTypes[i] == string(typ) {
return
}
es.eventTypes = append(es.eventTypes[:i], append([]string{string(typ)}, es.eventTypes[i:]...)...)
}
func (es *Eventstore) appendAggregateType(typ AggregateType) {
i := sort.SearchStrings(es.aggregateTypes, string(typ))
if len(es.aggregateTypes) > i && es.aggregateTypes[i] == string(typ) {
return
}
es.aggregateTypes = append(es.aggregateTypes[:i], append([]string{string(typ)}, es.aggregateTypes[i:]...)...)
}
func EventData(event Command) ([]byte, error) {
switch data := event.Data().(type) {
case nil:
return nil, nil
case []byte:
if json.Valid(data) {
return data, nil
}
return nil, errors.ThrowInvalidArgument(nil, "V2-6SbbS", "data bytes are not json")
}
dataType := reflect.TypeOf(event.Data())
if dataType.Kind() == reflect.Ptr {
dataType = dataType.Elem()
}
if dataType.Kind() == reflect.Struct {
dataBytes, err := json.Marshal(event.Data())
if err != nil {
return nil, errors.ThrowInvalidArgument(err, "V2-xG87M", "could not marshal data")
}
return dataBytes, nil
}
return nil, errors.ThrowInvalidArgument(nil, "V2-91NRm", "wrong type of event data")
Project commands (#26) * feat: eventstore repository * fix: remove gorm * version * feat: pkg * feat: add some files for project * feat: eventstore without eventstore-lib * rename files * gnueg * fix: key json * fix: add object * fix: change imports * fix: internal models * fix: some imports * fix: global model * fix: add some functions on repo * feat(eventstore): sdk * fix(eventstore): search query * fix(eventstore): rename app to eventstore * delete empty test * remove unused func * merge master * fix(eventstore): tests * fix(models): delete unused struct * fix: some funcitons * feat(eventstore): implemented push events * fix: move project eventstore to project package * fix: change project eventstore funcs * feat(eventstore): overwrite context data * fix: change project eventstore * fix: add project repo to mgmt server * feat(types): SQL-config * fix: commented code * feat(eventstore): options to overwrite editor * feat: auth interceptor and cockroach migrations * fix: migrations * fix: fix filter * fix: not found on getbyid * fix: add sequence * fix: add some tests * fix(eventstore): nullable sequence * fix: add some tests * merge * fix: add some tests * fix(migrations): correct statements for sequence * fix: add some tests * fix: add some tests * fix: changes from mr * Update internal/eventstore/models/field.go Co-Authored-By: livio-a <livio.a@gmail.com> * fix(eventstore): code quality * fix: add types to aggregate/Event-types * fix(eventstore): rename modifier* to editor* * fix(eventstore): delete editor_org * fix(migrations): remove editor_org field, rename modifier_* to editor_* * fix: generate files * fix(eventstore): tests * fix(eventstore): rename modifier to editor * fix(migrations): add cluster migration, fix(migrations): fix typo of host in clean clsuter * fix(eventstore): move health * fix(eventstore): AggregateTypeFilter aggregateType as param * code quality * feat: start implementing project members * feat: remove member funcs * feat: remove member model * feat: remove member events * feat: remove member repo model * fix: better error func testing * Update docs/local.md Co-Authored-By: Silvan <silvan.reusser@gmail.com> * Update docs/local.md Co-Authored-By: Silvan <silvan.reusser@gmail.com> * fix: mr requests * fix: md file Co-authored-by: adlerhurst <silvan.reusser@gmail.com> Co-authored-by: livio-a <livio.a@gmail.com>
2020-04-07 13:23:04 +02:00
}
func uniqueConstraintActionToRepository(action UniqueConstraintAction) repository.UniqueConstraintAction {
switch action {
case UniqueConstraintAdd:
return repository.UniqueConstraintAdd
case UniqueConstraintRemove:
return repository.UniqueConstraintRemoved
2022-10-26 15:06:48 +02:00
case UniqueConstraintInstanceRemove:
return repository.UniqueConstraintInstanceRemoved
default:
return repository.UniqueConstraintAdd
}
}
type BaseEventSetter[T any] interface {
Event
SetBaseEvent(*BaseEvent)
*T
}
func GenericEventMapper[T any, PT BaseEventSetter[T]](event *repository.Event) (Event, error) {
e := PT(new(T))
e.SetBaseEvent(BaseEventFromRepo(event))
if len(event.Data) == 0 {
return e, nil
}
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "V2-Thai6", "unable to unmarshal event")
}
return e, nil
}