mirror of
https://github.com/zitadel/zitadel.git
synced 2025-04-16 11:01:30 +00:00
fix(eventstore): prevent allocation of filtered events (#6749)
* fix(eventstore): prevent allocation of filtered events Directly reduce each event obtained from a sql.Rows scan, so that we do not have to allocate all events in a slice. * reinstate the mutex as RWMutex * scan data directly * add todos * fix(writemodels): add reduce of parent * test: remove comment * update comments --------- Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
This commit is contained in:
parent
459761d99a
commit
ab79855cf0
@ -38,7 +38,7 @@ func (rm *ExistingLabelPoliciesReadModel) Reduce() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return rm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *ExistingLabelPoliciesReadModel) Query() *eventstore.SearchQueryBuilder {
|
func (rm *ExistingLabelPoliciesReadModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -55,7 +55,7 @@ func (wm *InstanceDomainWriteModel) Reduce() error {
|
|||||||
wm.State = domain.InstanceDomainStateRemoved
|
wm.State = domain.InstanceDomainStateRemoved
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *InstanceDomainWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *InstanceDomainWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -60,7 +60,7 @@ func (wm *InstanceWriteModel) Reduce() error {
|
|||||||
wm.DefaultLanguage = e.Language
|
wm.DefaultLanguage = e.Language
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *InstanceWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *InstanceWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -84,7 +84,7 @@ func (wm *OrgDomainWriteModel) Reduce() error {
|
|||||||
wm.ValidationCode = nil
|
wm.ValidationCode = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *OrgDomainWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *OrgDomainWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
@ -154,7 +154,7 @@ func (wm *OrgDomainsWriteModel) Reduce() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *OrgDomainsWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *OrgDomainsWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
@ -216,7 +216,7 @@ func (wm *OrgDomainVerifiedWriteModel) Reduce() error {
|
|||||||
wm.Verified = false
|
wm.Verified = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *OrgDomainVerifiedWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *OrgDomainVerifiedWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -25,6 +25,8 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// PrepareCommands checks the passed validations and if ok creates the commands
|
// PrepareCommands checks the passed validations and if ok creates the commands
|
||||||
|
//
|
||||||
|
// Deprecated: filter causes unneeded allocation. Use [eventstore.FilterToQueryReducer] instead.
|
||||||
func PrepareCommands(ctx context.Context, filter FilterToQueryReducer, validations ...Validation) (cmds []eventstore.Command, err error) {
|
func PrepareCommands(ctx context.Context, filter FilterToQueryReducer, validations ...Validation) (cmds []eventstore.Command, err error) {
|
||||||
commanders, err := validate(validations)
|
commanders, err := validate(validations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -79,7 +79,7 @@ func (wm *quotaWriteModel) Reduce() error {
|
|||||||
}
|
}
|
||||||
// wm.WriteModel.Reduce() sets the aggregateID to the first event's aggregateID, but we need the last one
|
// wm.WriteModel.Reduce() sets the aggregateID to the first event's aggregateID, but we need the last one
|
||||||
wm.AggregateID = wm.rollingAggregateID
|
wm.AggregateID = wm.rollingAggregateID
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChanges returns all changes that need to be applied to the aggregate.
|
// NewChanges returns all changes that need to be applied to the aggregate.
|
||||||
|
@ -94,7 +94,7 @@ func (wm *SystemConfigWriteModel) Reduce() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return wm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wm *SystemConfigWriteModel) Query() *eventstore.SearchQueryBuilder {
|
func (wm *SystemConfigWriteModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -183,7 +183,7 @@ func (rm *UniqueConstraintReadModel) Reduce() error {
|
|||||||
rm.removeUniqueConstraint(e.Aggregate().ID, e.UserID, member.UniqueMember)
|
rm.removeUniqueConstraint(e.Aggregate().ID, e.UserID, member.UniqueMember)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return rm.WriteModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *UniqueConstraintReadModel) Query() *eventstore.SearchQueryBuilder {
|
func (rm *UniqueConstraintReadModel) Query() *eventstore.SearchQueryBuilder {
|
||||||
|
@ -12,7 +12,9 @@ import (
|
|||||||
// Eventstore abstracts all functions needed to store valid events
|
// Eventstore abstracts all functions needed to store valid events
|
||||||
// and filters the stored events
|
// and filters the stored events
|
||||||
type Eventstore struct {
|
type Eventstore struct {
|
||||||
interceptorMutex sync.Mutex
|
// TODO: get rid of this mutex,
|
||||||
|
// or if we scale to >4vCPU use a sync.Map
|
||||||
|
interceptorMutex sync.RWMutex
|
||||||
eventInterceptors map[EventType]eventTypeInterceptors
|
eventInterceptors map[EventType]eventTypeInterceptors
|
||||||
eventTypes []string
|
eventTypes []string
|
||||||
aggregateTypes []string
|
aggregateTypes []string
|
||||||
@ -33,7 +35,6 @@ type eventTypeInterceptors struct {
|
|||||||
func NewEventstore(config *Config) *Eventstore {
|
func NewEventstore(config *Config) *Eventstore {
|
||||||
return &Eventstore{
|
return &Eventstore{
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
interceptorMutex: sync.Mutex{},
|
|
||||||
PushTimeout: config.PushTimeout,
|
PushTimeout: config.PushTimeout,
|
||||||
|
|
||||||
pusher: config.Pusher,
|
pusher: config.Pusher,
|
||||||
@ -83,28 +84,33 @@ func (es *Eventstore) AggregateTypes() []string {
|
|||||||
|
|
||||||
// Filter filters the stored events based on the searchQuery
|
// Filter filters the stored events based on the searchQuery
|
||||||
// and maps the events to the defined event structs
|
// and maps the events to the defined event structs
|
||||||
func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error) {
|
//
|
||||||
// make sure that the instance id is always set
|
// Deprecated: Use [FilterToQueryReducer] instead to avoid allocations.
|
||||||
if queryFactory.instanceID == nil && authz.GetInstance(ctx).InstanceID() != "" {
|
func (es *Eventstore) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) {
|
||||||
queryFactory.InstanceID(authz.GetInstance(ctx).InstanceID())
|
events := make([]Event, 0, searchQuery.GetLimit())
|
||||||
|
searchQuery.ensureInstanceID(ctx)
|
||||||
|
err := es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error {
|
||||||
|
event, err := es.mapEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
events = append(events, event)
|
||||||
events, err := es.querier.Filter(ctx, queryFactory)
|
return nil
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return events, nil
|
||||||
return es.mapEvents(events)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error) {
|
func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error) {
|
||||||
mappedEvents = make([]Event, len(events))
|
mappedEvents = make([]Event, len(events))
|
||||||
|
|
||||||
es.interceptorMutex.Lock()
|
es.interceptorMutex.RLock()
|
||||||
defer es.interceptorMutex.Unlock()
|
defer es.interceptorMutex.RUnlock()
|
||||||
|
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
mappedEvents[i], err = es.mapEvent(event)
|
mappedEvents[i], err = es.mapEventLocked(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -114,6 +120,12 @@ func (es *Eventstore) mapEvents(events []Event) (mappedEvents []Event, err error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (es *Eventstore) mapEvent(event Event) (Event, error) {
|
func (es *Eventstore) mapEvent(event Event) (Event, error) {
|
||||||
|
es.interceptorMutex.RLock()
|
||||||
|
defer es.interceptorMutex.RUnlock()
|
||||||
|
return es.mapEventLocked(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (es *Eventstore) mapEventLocked(event Event) (Event, error) {
|
||||||
interceptors, ok := es.eventInterceptors[event.Type()]
|
interceptors, ok := es.eventInterceptors[event.Type()]
|
||||||
if !ok || interceptors.eventMapper == nil {
|
if !ok || interceptors.eventMapper == nil {
|
||||||
return BaseEventFromRepo(event), nil
|
return BaseEventFromRepo(event), nil
|
||||||
@ -121,6 +133,14 @@ func (es *Eventstore) mapEvent(event Event) (Event, error) {
|
|||||||
return interceptors.eventMapper(event)
|
return interceptors.eventMapper(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: refactor so we can change to the following interface:
|
||||||
|
/*
|
||||||
|
type reducer interface {
|
||||||
|
// Reduce applies an event on the object.
|
||||||
|
Reduce(Event) error
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
type reducer interface {
|
type reducer interface {
|
||||||
//Reduce handles the events of the internal events list
|
//Reduce handles the events of the internal events list
|
||||||
// it only appends the newly added events
|
// it only appends the newly added events
|
||||||
@ -131,14 +151,15 @@ type reducer interface {
|
|||||||
|
|
||||||
// FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
|
// FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
|
||||||
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error {
|
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error {
|
||||||
events, err := es.Filter(ctx, searchQuery)
|
searchQuery.ensureInstanceID(ctx)
|
||||||
|
return es.querier.FilterToReducer(ctx, searchQuery, func(event Event) error {
|
||||||
|
event, err := es.mapEvent(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
r.AppendEvents(event)
|
||||||
r.AppendEvents(events...)
|
|
||||||
|
|
||||||
return r.Reduce()
|
return r.Reduce()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// LatestSequence filters the latest sequence for the given search query
|
// LatestSequence filters the latest sequence for the given search query
|
||||||
@ -180,13 +201,7 @@ type QueryReducer interface {
|
|||||||
// FilterToQueryReducer filters the events based on the search query of the query function,
|
// FilterToQueryReducer filters the events based on the search query of the query function,
|
||||||
// appends all events to the reducer and calls it's reduce function
|
// appends all events to the reducer and calls it's reduce function
|
||||||
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error {
|
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error {
|
||||||
events, err := es.Filter(ctx, r.Query())
|
return es.FilterToReducer(ctx, r.Query(), r)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
r.AppendEvents(events...)
|
|
||||||
|
|
||||||
return r.Reduce()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
// RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
|
||||||
@ -207,11 +222,13 @@ func (es *Eventstore) RegisterFilterEventMapper(aggregateType AggregateType, eve
|
|||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Reducer func(event Event) error
|
||||||
|
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
// Health checks if the connection to the storage is available
|
// Health checks if the connection to the storage is available
|
||||||
Health(ctx context.Context) error
|
Health(ctx context.Context) error
|
||||||
// Filter returns all events matching the given search query
|
// FilterToReducer calls r for every event returned from the storage
|
||||||
Filter(ctx context.Context, searchQuery *SearchQueryBuilder) (events []Event, err error)
|
FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r Reducer) error
|
||||||
// LatestSequence returns the latest sequence found by the search query
|
// LatestSequence returns the latest sequence found by the search query
|
||||||
LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error)
|
LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error)
|
||||||
// InstanceIDs returns the instance ids found by the search query
|
// InstanceIDs returns the instance ids found by the search query
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -402,6 +401,7 @@ func (repo *testQuerier) Health(ctx context.Context) error {
|
|||||||
func (repo *testQuerier) CreateInstance(ctx context.Context, instance string) error {
|
func (repo *testQuerier) CreateInstance(ctx context.Context, instance string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) {
|
func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBuilder) ([]Event, error) {
|
||||||
if repo.err != nil {
|
if repo.err != nil {
|
||||||
return nil, repo.err
|
return nil, repo.err
|
||||||
@ -409,6 +409,18 @@ func (repo *testQuerier) Filter(ctx context.Context, searchQuery *SearchQueryBui
|
|||||||
return repo.events, nil
|
return repo.events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (repo *testQuerier) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, reduce Reducer) error {
|
||||||
|
if repo.err != nil {
|
||||||
|
return repo.err
|
||||||
|
}
|
||||||
|
for _, event := range repo.events {
|
||||||
|
if err := reduce(event); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
|
func (repo *testQuerier) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (float64, error) {
|
||||||
if repo.err != nil {
|
if repo.err != nil {
|
||||||
return 0, repo.err
|
return 0, repo.err
|
||||||
@ -684,7 +696,6 @@ func TestEventstore_Push(t *testing.T) {
|
|||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
pusher: tt.fields.pusher,
|
pusher: tt.fields.pusher,
|
||||||
interceptorMutex: sync.Mutex{},
|
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
@ -816,7 +827,6 @@ func TestEventstore_FilterEvents(t *testing.T) {
|
|||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
querier: tt.fields.repo,
|
querier: tt.fields.repo,
|
||||||
interceptorMutex: sync.Mutex{},
|
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1121,7 +1131,6 @@ func TestEventstore_FilterToReducer(t *testing.T) {
|
|||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
querier: tt.fields.repo,
|
querier: tt.fields.repo,
|
||||||
interceptorMutex: sync.Mutex{},
|
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
@ -1238,7 +1247,6 @@ func TestEventstore_mapEvents(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
es := &Eventstore{
|
es := &Eventstore{
|
||||||
interceptorMutex: sync.Mutex{},
|
|
||||||
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
eventInterceptors: map[EventType]eventTypeInterceptors{},
|
||||||
}
|
}
|
||||||
for eventType, mapper := range tt.fields.eventMapper {
|
for eventType, mapper := range tt.fields.eventMapper {
|
||||||
|
@ -35,19 +35,18 @@ func (m *MockQuerier) EXPECT() *MockQuerierMockRecorder {
|
|||||||
return m.recorder
|
return m.recorder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter mocks base method.
|
// FilterToReducer mocks base method.
|
||||||
func (m *MockQuerier) Filter(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder) ([]eventstore.Event, error) {
|
func (m *MockQuerier) FilterToReducer(arg0 context.Context, arg1 *eventstore.SearchQueryBuilder, arg2 eventstore.Reducer) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "Filter", arg0, arg1)
|
ret := m.ctrl.Call(m, "FilterToReducer", arg0, arg1, arg2)
|
||||||
ret0, _ := ret[0].([]eventstore.Event)
|
ret0, _ := ret[0].(error)
|
||||||
ret1, _ := ret[1].(error)
|
return ret0
|
||||||
return ret0, ret1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter indicates an expected call of Filter.
|
// FilterToReducer indicates an expected call of FilterToReducer.
|
||||||
func (mr *MockQuerierMockRecorder) Filter(arg0, arg1 interface{}) *gomock.Call {
|
func (mr *MockQuerierMockRecorder) FilterToReducer(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockQuerier)(nil).Filter), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterToReducer", reflect.TypeOf((*MockQuerier)(nil).FilterToReducer), arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Health mocks base method.
|
// Health mocks base method.
|
||||||
|
@ -30,21 +30,30 @@ func NewRepo(t *testing.T) *MockRepository {
|
|||||||
func (m *MockRepository) ExpectFilterNoEventsNoError() *MockRepository {
|
func (m *MockRepository) ExpectFilterNoEventsNoError() *MockRepository {
|
||||||
m.MockQuerier.ctrl.T.Helper()
|
m.MockQuerier.ctrl.T.Helper()
|
||||||
|
|
||||||
m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(nil, nil)
|
m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockRepository) ExpectFilterEvents(events ...eventstore.Event) *MockRepository {
|
func (m *MockRepository) ExpectFilterEvents(events ...eventstore.Event) *MockRepository {
|
||||||
m.MockQuerier.ctrl.T.Helper()
|
m.MockQuerier.ctrl.T.Helper()
|
||||||
|
|
||||||
m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(events, nil)
|
m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
|
||||||
|
func(_ context.Context, _ *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) error {
|
||||||
|
for _, event := range events {
|
||||||
|
if err := reduce(event); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockRepository) ExpectFilterEventsError(err error) *MockRepository {
|
func (m *MockRepository) ExpectFilterEventsError(err error) *MockRepository {
|
||||||
m.MockQuerier.ctrl.T.Helper()
|
m.MockQuerier.ctrl.T.Helper()
|
||||||
|
|
||||||
m.MockQuerier.EXPECT().Filter(gomock.Any(), gomock.Any()).Return(nil, err)
|
m.MockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(err)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,22 +247,18 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter returns all events matching the given search query
|
// FilterToReducer finds all events matching the given search query and passes them to the reduce function.
|
||||||
func (crdb *CRDB) Filter(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder) (events []eventstore.Event, err error) {
|
func (crdb *CRDB) FilterToReducer(ctx context.Context, searchQuery *eventstore.SearchQueryBuilder, reduce eventstore.Reducer) error {
|
||||||
events = make([]eventstore.Event, 0, searchQuery.GetLimit())
|
err := query(ctx, crdb, searchQuery, reduce, false)
|
||||||
err = query(ctx, crdb, searchQuery, &events, false)
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
pgErr := new(pgconn.PgError)
|
pgErr := new(pgconn.PgError)
|
||||||
// check events2 not exists
|
// check events2 not exists
|
||||||
if err != nil && errors.As(err, &pgErr) {
|
if errors.As(err, &pgErr) && pgErr.Code == "42P01" {
|
||||||
if pgErr.Code == "42P01" {
|
return query(ctx, crdb, searchQuery, reduce, true)
|
||||||
err = query(ctx, crdb, searchQuery, &events, true)
|
|
||||||
}
|
}
|
||||||
}
|
return err
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return events, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LatestSequence returns the latest sequence found by the search query
|
// LatestSequence returns the latest sequence found by the search query
|
||||||
|
@ -168,12 +168,11 @@ func instanceIDsScanner(scanner scan, dest interface{}) (err error) {
|
|||||||
|
|
||||||
func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) {
|
func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error) {
|
||||||
return func(scanner scan, dest interface{}) (err error) {
|
return func(scanner scan, dest interface{}) (err error) {
|
||||||
events, ok := dest.(*[]eventstore.Event)
|
reduce, ok := dest.(eventstore.Reducer)
|
||||||
if !ok {
|
if !ok {
|
||||||
return z_errors.ThrowInvalidArgument(nil, "SQL-4GP6F", "type must be event")
|
return z_errors.ThrowInvalidArgumentf(nil, "SQL-4GP6F", "events scanner: invalid type %T", dest)
|
||||||
}
|
}
|
||||||
event := new(repository.Event)
|
event := new(repository.Event)
|
||||||
data := sql.RawBytes{}
|
|
||||||
position := new(sql.NullFloat64)
|
position := new(sql.NullFloat64)
|
||||||
|
|
||||||
if useV1 {
|
if useV1 {
|
||||||
@ -181,7 +180,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
|
|||||||
&event.CreationDate,
|
&event.CreationDate,
|
||||||
&event.Typ,
|
&event.Typ,
|
||||||
&event.Seq,
|
&event.Seq,
|
||||||
&data,
|
&event.Data,
|
||||||
&event.EditorUser,
|
&event.EditorUser,
|
||||||
&event.ResourceOwner,
|
&event.ResourceOwner,
|
||||||
&event.InstanceID,
|
&event.InstanceID,
|
||||||
@ -196,7 +195,7 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
|
|||||||
&event.Typ,
|
&event.Typ,
|
||||||
&event.Seq,
|
&event.Seq,
|
||||||
position,
|
position,
|
||||||
&data,
|
&event.Data,
|
||||||
&event.EditorUser,
|
&event.EditorUser,
|
||||||
&event.ResourceOwner,
|
&event.ResourceOwner,
|
||||||
&event.InstanceID,
|
&event.InstanceID,
|
||||||
@ -211,14 +210,8 @@ func eventsScanner(useV1 bool) func(scanner scan, dest interface{}) (err error)
|
|||||||
logging.New().WithError(err).Warn("unable to scan row")
|
logging.New().WithError(err).Warn("unable to scan row")
|
||||||
return z_errors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
|
return z_errors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
|
||||||
}
|
}
|
||||||
|
|
||||||
event.Data = make([]byte, len(data))
|
|
||||||
copy(event.Data, data)
|
|
||||||
event.Pos = position.Float64
|
event.Pos = position.Float64
|
||||||
|
return reduce(event)
|
||||||
*events = append(*events, event)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/DATA-DOG/go-sqlmock"
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/zitadel/zitadel/internal/database"
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
"github.com/zitadel/zitadel/internal/database/cockroach"
|
"github.com/zitadel/zitadel/internal/database/cockroach"
|
||||||
@ -74,6 +75,8 @@ func Test_getCondition(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_prepareColumns(t *testing.T) {
|
func Test_prepareColumns(t *testing.T) {
|
||||||
|
var reducedEvents []eventstore.Event
|
||||||
|
|
||||||
type fields struct {
|
type fields struct {
|
||||||
dbRow []interface{}
|
dbRow []interface{}
|
||||||
}
|
}
|
||||||
@ -146,13 +149,16 @@ func Test_prepareColumns(t *testing.T) {
|
|||||||
name: "events",
|
name: "events",
|
||||||
args: args{
|
args: args{
|
||||||
columns: eventstore.ColumnsEvent,
|
columns: eventstore.ColumnsEvent,
|
||||||
dest: &[]eventstore.Event{},
|
dest: eventstore.Reducer(func(event eventstore.Event) error {
|
||||||
|
reducedEvents = append(reducedEvents, event)
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
useV1: true,
|
useV1: true,
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events`,
|
query: `SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events`,
|
||||||
expected: []eventstore.Event{
|
expected: []eventstore.Event{
|
||||||
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Data: make(sql.RawBytes, 0)},
|
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Data: nil},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
@ -163,12 +169,15 @@ func Test_prepareColumns(t *testing.T) {
|
|||||||
name: "events v2",
|
name: "events v2",
|
||||||
args: args{
|
args: args{
|
||||||
columns: eventstore.ColumnsEvent,
|
columns: eventstore.ColumnsEvent,
|
||||||
dest: &[]eventstore.Event{},
|
dest: eventstore.Reducer(func(event eventstore.Event) error {
|
||||||
|
reducedEvents = append(reducedEvents, event)
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
|
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
|
||||||
expected: []eventstore.Event{
|
expected: []eventstore.Event{
|
||||||
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: make(sql.RawBytes, 0), Version: "v1"},
|
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 42, Data: nil, Version: "v1"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
@ -179,12 +188,15 @@ func Test_prepareColumns(t *testing.T) {
|
|||||||
name: "event null position",
|
name: "event null position",
|
||||||
args: args{
|
args: args{
|
||||||
columns: eventstore.ColumnsEvent,
|
columns: eventstore.ColumnsEvent,
|
||||||
dest: &[]eventstore.Event{},
|
dest: eventstore.Reducer(func(event eventstore.Event) error {
|
||||||
|
reducedEvents = append(reducedEvents, event)
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
|
query: `SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2`,
|
||||||
expected: []eventstore.Event{
|
expected: []eventstore.Event{
|
||||||
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: make(sql.RawBytes, 0), Version: "v1"},
|
&repository.Event{AggregateID: "hodor", AggregateType: "user", Seq: 5, Pos: 0, Data: nil, Version: "v1"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
@ -207,7 +219,10 @@ func Test_prepareColumns(t *testing.T) {
|
|||||||
name: "event query error",
|
name: "event query error",
|
||||||
args: args{
|
args: args{
|
||||||
columns: eventstore.ColumnsEvent,
|
columns: eventstore.ColumnsEvent,
|
||||||
dest: &[]eventstore.Event{},
|
dest: eventstore.Reducer(func(event eventstore.Event) error {
|
||||||
|
reducedEvents = append(reducedEvents, event)
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
dbErr: sql.ErrConnDone,
|
dbErr: sql.ErrConnDone,
|
||||||
useV1: true,
|
useV1: true,
|
||||||
},
|
},
|
||||||
@ -242,6 +257,12 @@ func Test_prepareColumns(t *testing.T) {
|
|||||||
equalizer.Equal(tt.args.dest.(*sql.NullTime).Time)
|
equalizer.Equal(tt.args.dest.(*sql.NullTime).Time)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if _, ok := tt.args.dest.(eventstore.Reducer); ok {
|
||||||
|
assert.Equal(t, tt.res.expected, reducedEvents)
|
||||||
|
reducedEvents = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
got := reflect.Indirect(reflect.ValueOf(tt.args.dest)).Interface()
|
got := reflect.Indirect(reflect.ValueOf(tt.args.dest)).Interface()
|
||||||
if !reflect.DeepEqual(got, tt.res.expected) {
|
if !reflect.DeepEqual(got, tt.res.expected) {
|
||||||
t.Errorf("unexpected result from rowScanner \nwant: %+v \ngot: %+v", tt.res.expected, got)
|
t.Errorf("unexpected result from rowScanner \nwant: %+v \ngot: %+v", tt.res.expected, got)
|
||||||
@ -625,7 +646,10 @@ func Test_query_events_with_crdb(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
events := []eventstore.Event{}
|
events := []eventstore.Event{}
|
||||||
if err := query(context.Background(), db, tt.args.searchQuery, &events, true); (err != nil) != tt.wantErr {
|
if err := query(context.Background(), db, tt.args.searchQuery, eventstore.Reducer(func(event eventstore.Event) error {
|
||||||
|
events = append(events, event)
|
||||||
|
return nil
|
||||||
|
}), true); (err != nil) != tt.wantErr {
|
||||||
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("CRDB.query() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
package eventstore
|
package eventstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/api/authz"
|
||||||
"github.com/zitadel/zitadel/internal/errors"
|
"github.com/zitadel/zitadel/internal/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -82,6 +84,13 @@ func (q SearchQueryBuilder) GetCreationDateAfter() time.Time {
|
|||||||
return q.creationDateAfter
|
return q.creationDateAfter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensureInstanceID makes sure that the instance id is always set
|
||||||
|
func (b *SearchQueryBuilder) ensureInstanceID(ctx context.Context) {
|
||||||
|
if b.instanceID == nil && authz.GetInstance(ctx).InstanceID() != "" {
|
||||||
|
b.InstanceID(authz.GetInstance(ctx).InstanceID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type SearchQuery struct {
|
type SearchQuery struct {
|
||||||
builder *SearchQueryBuilder
|
builder *SearchQueryBuilder
|
||||||
aggregateTypes []AggregateType
|
aggregateTypes []AggregateType
|
||||||
|
Loading…
x
Reference in New Issue
Block a user