From e8babf1048e93070b003261a923ea470e9925268 Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Tue, 22 Nov 2022 07:36:48 +0100 Subject: [PATCH] fix: reduce load on view tables (#4716) * fix: reduce load on view tables * create prerelease * linting: pass context to view handlers * fix error handling of refresh token handler * fix: improve processing of successful instanceIDs on views * fix revert intended change in .golangci.yaml * fix: set timeout for processInstances in spooler * fix: reduce update to active tokens on profile change * change token expiration query to db now() * remove branch from .releaserc.js --- .releaserc.js | 1 - cmd/defaults.yaml | 4 +- cmd/start/start.go | 4 +- .../eventsourcing/handler/handler.go | 5 +- .../eventsourcing/handler/styling.go | 24 ++--- .../repository/eventsourcing/repository.go | 5 +- .../eventsourcing/spooler/spooler.go | 12 +-- .../repository/eventsourcing/view/sequence.go | 14 +-- .../repository/eventsourcing/view/styling.go | 8 +- .../eventsourcing/handler/handler.go | 24 ++--- .../eventsourcing/handler/idp_config.go | 20 +++-- .../eventsourcing/handler/idp_providers.go | 19 ++-- .../handler/org_project_mapping.go | 35 +++++--- .../eventsourcing/handler/refresh_token.go | 24 ++--- .../repository/eventsourcing/handler/token.go | 34 +++---- .../repository/eventsourcing/handler/user.go | 21 ++--- .../handler/user_external_idps.go | 15 ++-- .../eventsourcing/handler/user_session.go | 22 ++--- .../repository/eventsourcing/repository.go | 5 +- .../eventsourcing/spooler/spooler.go | 12 +-- .../eventsourcing/view/external_idps.go | 8 +- .../eventsourcing/view/idp_configs.go | 8 +- .../eventsourcing/view/idp_providers.go | 8 +- .../eventsourcing/view/org_project_mapping.go | 8 +- .../eventsourcing/view/refresh_token.go | 8 +- .../repository/eventsourcing/view/sequence.go | 8 +- .../repository/eventsourcing/view/token.go | 8 +- .../repository/eventsourcing/view/user.go | 8 +- .../eventsourcing/view/user_session.go | 8 +- .../repository/eventsourcing/view/sequence.go | 20 ----- .../repository/eventsourcing/view/token.go | 12 --- internal/eventstore/v1/query/handler.go | 10 +-- internal/eventstore/v1/spooler/config.go | 3 + internal/eventstore/v1/spooler/spooler.go | 88 ++++++++++++++++--- .../eventstore/v1/spooler/spooler_test.go | 11 +-- internal/user/repository/view/token_view.go | 7 +- internal/view/repository/sequence.go | 2 +- 37 files changed, 295 insertions(+), 238 deletions(-) diff --git a/.releaserc.js b/.releaserc.js index f0238e0dc9..74da457461 100644 --- a/.releaserc.js +++ b/.releaserc.js @@ -2,7 +2,6 @@ module.exports = { branches: [ {name: 'main'}, {name: '1.87.x', range: '1.87.x', channel: '1.87.x'}, - {name: 'startup-times', prerelease: 'beta'} ], plugins: [ "@semantic-release/commit-analyzer" diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 1ce991644d..76c6def756 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -160,7 +160,7 @@ Auth: SearchLimit: 1000 Spooler: ConcurrentWorkers: 1 - ConcurrentInstances: 10 + ConcurrentInstances: 1 BulkLimit: 10000 FailureCountUntilSkip: 5 @@ -168,7 +168,7 @@ Admin: SearchLimit: 1000 Spooler: ConcurrentWorkers: 1 - ConcurrentInstances: 10 + ConcurrentInstances: 1 BulkLimit: 10000 FailureCountUntilSkip: 5 diff --git a/cmd/start/start.go b/cmd/start/start.go index 3ce7d759ac..486d551689 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -173,11 +173,11 @@ func startAPIs(ctx context.Context, router *mux.Router, commands *command.Comman return err } apis := api.New(config.Port, router, queries, verifier, config.InternalAuthZ, config.ExternalSecure, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader) - authRepo, err := auth_es.Start(config.Auth, config.SystemDefaults, commands, queries, dbClient, keys.OIDC, keys.User) + authRepo, err := auth_es.Start(ctx, config.Auth, config.SystemDefaults, commands, queries, dbClient, eventstore, keys.OIDC, keys.User) if err != nil { return fmt.Errorf("error starting auth repo: %w", err) } - adminRepo, err := admin_es.Start(config.Admin, store, dbClient) + adminRepo, err := admin_es.Start(ctx, config.Admin, store, dbClient, eventstore) if err != nil { return fmt.Errorf("error starting admin repo: %w", err) } diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index 4a89030a09..e95e0232e3 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -1,6 +1,7 @@ package handler import ( + "context" "time" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" @@ -28,10 +29,10 @@ func (h *handler) Eventstore() v1.Eventstore { return h.es } -func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler { +func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler { handlers := []query.Handler{} if static != nil { - handlers = append(handlers, newStyling( + handlers = append(handlers, newStyling(ctx, handler{view, bulkLimit, configs.cycleDuration("Styling"), errorCount, es}, static)) } diff --git a/internal/admin/repository/eventsourcing/handler/styling.go b/internal/admin/repository/eventsourcing/handler/styling.go index 028e116cc0..8a8553b528 100644 --- a/internal/admin/repository/eventsourcing/handler/styling.go +++ b/internal/admin/repository/eventsourcing/handler/styling.go @@ -34,21 +34,21 @@ type Styling struct { subscription *v1.Subscription } -func newStyling(handler handler, static static.Storage) *Styling { +func newStyling(ctx context.Context, handler handler, static static.Storage) *Styling { h := &Styling{ handler: handler, static: static, } - h.subscribe() + h.subscribe(ctx) return h } -func (m *Styling) subscribe() { +func (m *Styling) subscribe(ctx context.Context) { m.subscription = m.es.Subscribe(m.AggregateTypes()...) go func() { for event := range m.subscription.Events { - query.ReduceEvent(m, event) + query.ReduceEvent(ctx, m, event) } }() } @@ -73,15 +73,15 @@ func (m *Styling) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := m.view.GetLatestStylingSequences(instanceIDs...) +func (m *Styling) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := m.view.GetLatestStylingSequences(instanceIDs) if err != nil { return nil, err } searchQuery := models.NewSearchQuery() - for _, sequence := range sequences { + for _, instanceID := range instanceIDs { var seq uint64 - for _, instanceID := range instanceIDs { + for _, sequence := range sequences { if sequence.InstanceID == instanceID { seq = sequence.CurrentSequence break @@ -90,7 +90,7 @@ func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) searchQuery.AddQuery(). AggregateTypeFilter(m.AggregateTypes()...). LatestSequenceFilter(seq). - InstanceIDFilter(sequence.InstanceID) + InstanceIDFilter(instanceID) } return searchQuery, nil } @@ -166,12 +166,12 @@ func (m *Styling) processLabelPolicy(event *models.Event) (err error) { } func (m *Styling) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-2m9fs", "id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler") return spooler.HandleError(event, err, m.view.GetLatestStylingFailedEvent, m.view.ProcessedStylingFailedEvent, m.view.ProcessedStylingSequence, m.errorCountUntilSkip) } -func (m *Styling) OnSuccess() error { - return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp) +func (m *Styling) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp, instanceIDs) } func (m *Styling) generateStylingFile(policy *iam_model.LabelPolicyView) error { diff --git a/internal/admin/repository/eventsourcing/repository.go b/internal/admin/repository/eventsourcing/repository.go index 0d8167cca8..63bcb6b15f 100644 --- a/internal/admin/repository/eventsourcing/repository.go +++ b/internal/admin/repository/eventsourcing/repository.go @@ -7,6 +7,7 @@ import ( "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/eventstore" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/spooler" admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" + eventstore2 "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/static" @@ -22,7 +23,7 @@ type EsRepository struct { eventstore.AdministratorRepo } -func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, error) { +func Start(ctx context.Context, conf Config, static static.Storage, dbClient *sql.DB, esV2 *eventstore2.Eventstore) (*EsRepository, error) { es, err := v1.Start(dbClient) if err != nil { return nil, err @@ -32,7 +33,7 @@ func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, return nil, err } - spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, static) + spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, static) return &EsRepository{ spooler: spool, diff --git a/internal/admin/repository/eventsourcing/spooler/spooler.go b/internal/admin/repository/eventsourcing/spooler/spooler.go index 6ab695fd75..98542b309f 100644 --- a/internal/admin/repository/eventsourcing/spooler/spooler.go +++ b/internal/admin/repository/eventsourcing/spooler/spooler.go @@ -1,14 +1,15 @@ package spooler import ( + "context" "database/sql" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - "github.com/zitadel/zitadel/internal/static" - "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" + "github.com/zitadel/zitadel/internal/eventstore" + v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" + "github.com/zitadel/zitadel/internal/static" ) type SpoolerConfig struct { @@ -19,13 +20,14 @@ type SpoolerConfig struct { Handlers handler.Configs } -func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler { +func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler { spoolerConfig := spooler.Config{ Eventstore: es, + EventstoreV2: esV2, Locker: &locker{dbClient: sql}, ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentInstances: c.ConcurrentInstances, - ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static), + ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static), } spool := spoolerConfig.New() spool.Start() diff --git a/internal/admin/repository/eventsourcing/view/sequence.go b/internal/admin/repository/eventsourcing/view/sequence.go index 868d9633b3..4985fbef48 100644 --- a/internal/admin/repository/eventsourcing/view/sequence.go +++ b/internal/admin/repository/eventsourcing/view/sequence.go @@ -19,16 +19,16 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } -func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) +func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { + return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) } func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) { return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID) } -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) +func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error { + currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) if err != nil { return err } @@ -41,12 +41,6 @@ func (v *View) updateSpoolerRunSequence(viewName string) error { return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences) } -func (v *View) GetCurrentSequence(db, viewName string) ([]*repository.CurrentSequence, error) { - sequenceTable := db + ".current_sequences" - fullView := db + "." + viewName - return repository.LatestSequences(v.Db, sequenceTable, fullView) -} - func (v *View) ClearView(db, viewName string) error { truncateView := db + "." + viewName sequenceTable := db + ".current_sequences" diff --git a/internal/admin/repository/eventsourcing/view/styling.go b/internal/admin/repository/eventsourcing/view/styling.go index e0b3aa24a4..b688a5544b 100644 --- a/internal/admin/repository/eventsourcing/view/styling.go +++ b/internal/admin/repository/eventsourcing/view/styling.go @@ -35,16 +35,16 @@ func (v *View) GetLatestStylingSequence(instanceID string) (*global_view.Current return v.latestSequence(stylingTyble, instanceID) } -func (v *View) GetLatestStylingSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(stylingTyble, instanceIDs...) +func (v *View) GetLatestStylingSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(stylingTyble, instanceIDs) } func (v *View) ProcessedStylingSequence(event *models.Event) error { return v.saveCurrentSequence(stylingTyble, event) } -func (v *View) UpdateStylingSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(stylingTyble) +func (v *View) UpdateStylingSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(stylingTyble, instanceIDs) } func (v *View) GetLatestStylingFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 59f50554d5..6eb3a836c6 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -33,24 +33,24 @@ func (h *handler) Eventstore() v1.Eventstore { return h.es } -func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler { +func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler { return []query.Handler{ - newUser( + newUser(ctx, handler{view, bulkLimit, configs.cycleDuration("User"), errorCount, es}, queries), - newUserSession( + newUserSession(ctx, handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount, es}, queries), - newToken( + newToken(ctx, handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount, es}), - newIDPConfig( + newIDPConfig(ctx, handler{view, bulkLimit, configs.cycleDuration("IDPConfig"), errorCount, es}), - newIDPProvider( + newIDPProvider(ctx, handler{view, bulkLimit, configs.cycleDuration("IDPProvider"), errorCount, es}, systemDefaults, queries), - newExternalIDP( + newExternalIDP(ctx, handler{view, bulkLimit, configs.cycleDuration("ExternalIDP"), errorCount, es}, systemDefaults, queries), - newRefreshToken(handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}), - newOrgProjectMapping(handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}), + newRefreshToken(ctx, handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}), + newOrgProjectMapping(ctx, handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}), } } @@ -80,9 +80,9 @@ func withInstanceID(ctx context.Context, instanceID string) context.Context { func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []models.AggregateType, instanceIDs []string) *models.SearchQuery { searchQuery := models.NewSearchQuery() - for _, sequence := range sequences { + for _, instanceID := range instanceIDs { var seq uint64 - for _, instanceID := range instanceIDs { + for _, sequence := range sequences { if sequence.InstanceID == instanceID { seq = sequence.CurrentSequence break @@ -91,7 +91,7 @@ func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []mo searchQuery.AddQuery(). AggregateTypeFilter(aggregateTypes...). LatestSequenceFilter(seq). - InstanceIDFilter(sequence.InstanceID) + InstanceIDFilter(instanceID) } return searchQuery } diff --git a/internal/auth/repository/eventsourcing/handler/idp_config.go b/internal/auth/repository/eventsourcing/handler/idp_config.go index de0315d85e..c2157f1df2 100644 --- a/internal/auth/repository/eventsourcing/handler/idp_config.go +++ b/internal/auth/repository/eventsourcing/handler/idp_config.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -23,21 +25,21 @@ type IDPConfig struct { subscription *v1.Subscription } -func newIDPConfig(h handler) *IDPConfig { +func newIDPConfig(ctx context.Context, h handler) *IDPConfig { idpConfig := &IDPConfig{ handler: h, } - idpConfig.subscribe() + idpConfig.subscribe(ctx) return idpConfig } -func (i *IDPConfig) subscribe() { +func (i *IDPConfig) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -62,8 +64,8 @@ func (i *IDPConfig) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *IDPConfig) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs...) +func (i *IDPConfig) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs) if err != nil { return nil, err } @@ -129,10 +131,10 @@ func (i *IDPConfig) processIdpConfig(providerType iam_model.IDPProviderType, eve } func (i *IDPConfig) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-Ejf8s", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler") return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip) } -func (i *IDPConfig) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp) +func (i *IDPConfig) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/idp_providers.go b/internal/auth/repository/eventsourcing/handler/idp_providers.go index 7d71340bad..35bec063d4 100644 --- a/internal/auth/repository/eventsourcing/handler/idp_providers.go +++ b/internal/auth/repository/eventsourcing/handler/idp_providers.go @@ -32,6 +32,7 @@ type IDPProvider struct { } func newIDPProvider( + ctx context.Context, h handler, defaults systemdefaults.SystemDefaults, queries *query2.Queries, @@ -42,16 +43,16 @@ func newIDPProvider( queries: queries, } - idpProvider.subscribe() + idpProvider.subscribe(ctx) return idpProvider } -func (i *IDPProvider) subscribe() { +func (i *IDPProvider) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -76,8 +77,8 @@ func (i *IDPProvider) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *IDPProvider) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs...) +func (i *IDPProvider) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs) if err != nil { return nil, err } @@ -188,14 +189,14 @@ func (i *IDPProvider) OnError(event *es_models.Event, err error) error { return spooler.HandleError(event, err, i.view.GetLatestIDPProviderFailedEvent, i.view.ProcessedIDPProviderFailedEvent, i.view.ProcessedIDPProviderSequence, i.errorCountUntilSkip) } -func (i *IDPProvider) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp) +func (i *IDPProvider) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp, instanceIDs) } func (i *IDPProvider) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, aggregateID) } -func (u *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) { - return u.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID) +func (i *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) { + return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID) } diff --git a/internal/auth/repository/eventsourcing/handler/org_project_mapping.go b/internal/auth/repository/eventsourcing/handler/org_project_mapping.go index 4148292829..7236c8a115 100644 --- a/internal/auth/repository/eventsourcing/handler/org_project_mapping.go +++ b/internal/auth/repository/eventsourcing/handler/org_project_mapping.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -23,22 +25,23 @@ type OrgProjectMapping struct { } func newOrgProjectMapping( + ctx context.Context, handler handler, ) *OrgProjectMapping { h := &OrgProjectMapping{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *OrgProjectMapping) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (p *OrgProjectMapping) subscribe(ctx context.Context) { + p.subscription = p.es.Subscribe(p.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range p.subscription.Events { + query.ReduceEvent(ctx, p, event) } }() } @@ -63,8 +66,8 @@ func (p *OrgProjectMapping) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (p *OrgProjectMapping) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs...) +func (p *OrgProjectMapping) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs) if err != nil { return nil, err } @@ -85,15 +88,21 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) { } case project.GrantAddedType: projectGrant := new(view_model.ProjectGrant) - projectGrant.SetData(event) + err := projectGrant.SetData(event) + if err != nil { + return err + } mapping.OrgID = projectGrant.GrantedOrgID mapping.ProjectID = event.AggregateID mapping.ProjectGrantID = projectGrant.GrantID mapping.InstanceID = event.InstanceID case project.GrantRemovedType: projectGrant := new(view_model.ProjectGrant) - projectGrant.SetData(event) - err := p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID) + err := projectGrant.SetData(event) + if err != nil { + return err + } + err = p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID) if err == nil { return p.view.ProcessedOrgProjectMappingSequence(event) } @@ -109,10 +118,10 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) { } func (p *OrgProjectMapping) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-2k0fS", "id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler") return spooler.HandleError(event, err, p.view.GetLatestOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingSequence, p.errorCountUntilSkip) } -func (p *OrgProjectMapping) OnSuccess() error { - return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp) +func (p *OrgProjectMapping) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/refresh_token.go b/internal/auth/repository/eventsourcing/handler/refresh_token.go index 7dce7d7c34..99499f73b6 100644 --- a/internal/auth/repository/eventsourcing/handler/refresh_token.go +++ b/internal/auth/repository/eventsourcing/handler/refresh_token.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "github.com/zitadel/logging" @@ -27,22 +28,23 @@ type RefreshToken struct { } func newRefreshToken( + ctx context.Context, handler handler, ) *RefreshToken { h := &RefreshToken{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (t *RefreshToken) subscribe() { +func (t *RefreshToken) subscribe(ctx context.Context) { t.subscription = t.es.Subscribe(t.AggregateTypes()...) go func() { for event := range t.subscription.Events { - query.ReduceEvent(t, event) + query.ReduceEvent(ctx, t, event) } }() } @@ -67,8 +69,8 @@ func (t *RefreshToken) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (t *RefreshToken) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs...) +func (t *RefreshToken) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs) if err != nil { return nil, err } @@ -87,7 +89,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { case user.HumanRefreshTokenRenewedType: e := new(user.HumanRefreshTokenRenewedEvent) if err := json.Unmarshal(event.Data, e); err != nil { - logging.Log("EVEN-DBbn4").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return caos_errs.ThrowInternal(nil, "MODEL-BHn75", "could not unmarshal data") } token, err := t.view.RefreshTokenByID(e.TokenID, event.InstanceID) @@ -102,7 +104,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { case user.HumanRefreshTokenRemovedType: e := new(user.HumanRefreshTokenRemovedEvent) if err := json.Unmarshal(event.Data, e); err != nil { - logging.Log("EVEN-BDbh3").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return caos_errs.ThrowInternal(nil, "MODEL-Bz653", "could not unmarshal data") } return t.view.DeleteRefreshToken(e.TokenID, event.InstanceID, event) @@ -118,10 +120,10 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { } func (t *RefreshToken) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") - return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") + return spooler.HandleError(event, err, t.view.GetLatestRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenSequence, t.errorCountUntilSkip) } -func (t *RefreshToken) OnSuccess() error { - return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) +func (t *RefreshToken) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(t.view.UpdateRefreshTokenSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/token.go b/internal/auth/repository/eventsourcing/handler/token.go index e93ef7b14e..22c531ccb4 100644 --- a/internal/auth/repository/eventsourcing/handler/token.go +++ b/internal/auth/repository/eventsourcing/handler/token.go @@ -33,22 +33,23 @@ type Token struct { } func newToken( + ctx context.Context, handler handler, ) *Token { h := &Token{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (t *Token) subscribe() { +func (t *Token) subscribe(ctx context.Context) { t.subscription = t.es.Subscribe(t.AggregateTypes()...) go func() { for event := range t.subscription.Events { - query.ReduceEvent(t, event) + query.ReduceEvent(ctx, t, event) } }() } @@ -65,16 +66,16 @@ func (_ *Token) AggregateTypes() []es_models.AggregateType { return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType} } -func (p *Token) CurrentSequence(instanceID string) (uint64, error) { - sequence, err := p.view.GetLatestTokenSequence(instanceID) +func (t *Token) CurrentSequence(instanceID string) (uint64, error) { + sequence, err := t.view.GetLatestTokenSequence(instanceID) if err != nil { return 0, err } return sequence.CurrentSequence, nil } -func (t *Token) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := t.view.GetLatestTokenSequences(instanceIDs...) +func (t *Token) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := t.view.GetLatestTokenSequences(instanceIDs) if err != nil { return nil, err } @@ -94,7 +95,10 @@ func (t *Token) Reduce(event *es_models.Event) (err error) { case user.UserV1ProfileChangedType, user.HumanProfileChangedType: user := new(view_model.UserView) - user.AppendEvent(event) + err := user.AppendEvent(event) + if err != nil { + return err + } tokens, err := t.view.TokensByUserID(event.AggregateID, event.InstanceID) if err != nil { return err @@ -153,14 +157,14 @@ func (t *Token) Reduce(event *es_models.Event) (err error) { } func (t *Token) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) } func agentIDFromSession(event *es_models.Event) (string, error) { session := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &session); err != nil { - logging.Log("EVEN-s3bq9").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-sd325", "could not unmarshal data") } return session["userAgentID"].(string), nil @@ -169,7 +173,7 @@ func agentIDFromSession(event *es_models.Event) (string, error) { func applicationFromSession(event *es_models.Event) (*project_es_model.Application, error) { application := new(project_es_model.Application) if err := json.Unmarshal(event.Data, &application); err != nil { - logging.Log("EVEN-GRE2q").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return nil, caos_errs.ThrowInternal(nil, "MODEL-Hrw1q", "could not unmarshal data") } return application, nil @@ -178,7 +182,7 @@ func applicationFromSession(event *es_models.Event) (*project_es_model.Applicati func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) { removed := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &removed); err != nil { - logging.Log("EVEN-Sdff3").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-Sff32", "could not unmarshal data") } return removed["tokenId"].(string), nil @@ -187,14 +191,14 @@ func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) { func refreshTokenIDFromRemovedEvent(event *es_models.Event) (string, error) { removed := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &removed); err != nil { - logging.Log("EVEN-Ff23g").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-Dfb3w", "could not unmarshal data") } return removed["tokenId"].(string), nil } -func (t *Token) OnSuccess() error { - return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) +func (t *Token) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp, instanceIDs) } func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user.go b/internal/auth/repository/eventsourcing/handler/user.go index e17ecffc9c..34130219be 100644 --- a/internal/auth/repository/eventsourcing/handler/user.go +++ b/internal/auth/repository/eventsourcing/handler/user.go @@ -34,6 +34,7 @@ type User struct { } func newUser( + ctx context.Context, handler handler, queries *query2.Queries, ) *User { @@ -42,16 +43,16 @@ func newUser( queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *User) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (u *User) subscribe(ctx context.Context) { + u.subscription = u.es.Subscribe(u.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range u.subscription.Events { + query.ReduceEvent(ctx, u, event) } }() } @@ -75,8 +76,8 @@ func (u *User) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (u *User) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := u.view.GetLatestUserSequences(instanceIDs...) +func (u *User) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := u.view.GetLatestUserSequences(instanceIDs) if err != nil { return nil, err } @@ -275,12 +276,12 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *es_models.Event) error { } func (u *User) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-is8aAWima", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user handler") return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip) } -func (u *User) OnSuccess() error { - return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp) +func (u *User) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp, instanceIDs) } func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user_external_idps.go b/internal/auth/repository/eventsourcing/handler/user_external_idps.go index 101c9e05f0..08e5a5ccef 100644 --- a/internal/auth/repository/eventsourcing/handler/user_external_idps.go +++ b/internal/auth/repository/eventsourcing/handler/user_external_idps.go @@ -33,6 +33,7 @@ type ExternalIDP struct { } func newExternalIDP( + ctx context.Context, handler handler, defaults systemdefaults.SystemDefaults, queries *query2.Queries, @@ -43,16 +44,16 @@ func newExternalIDP( queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (i *ExternalIDP) subscribe() { +func (i *ExternalIDP) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -77,8 +78,8 @@ func (i *ExternalIDP) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *ExternalIDP) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs...) +func (i *ExternalIDP) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs) if err != nil { return nil, err } @@ -178,8 +179,8 @@ func (i *ExternalIDP) OnError(event *es_models.Event, err error) error { return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip) } -func (i *ExternalIDP) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp) +func (i *ExternalIDP) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp, instanceIDs) } func (i *ExternalIDP) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user_session.go b/internal/auth/repository/eventsourcing/handler/user_session.go index 2d81f189fb..f04105cbc0 100644 --- a/internal/auth/repository/eventsourcing/handler/user_session.go +++ b/internal/auth/repository/eventsourcing/handler/user_session.go @@ -33,22 +33,22 @@ type UserSession struct { queries *query2.Queries } -func newUserSession(handler handler, queries *query2.Queries) *UserSession { +func newUserSession(ctx context.Context, handler handler, queries *query2.Queries) *UserSession { h := &UserSession{ handler: handler, queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *UserSession) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (u *UserSession) subscribe(ctx context.Context) { + u.subscription = u.es.Subscribe(u.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range u.subscription.Events { + query.ReduceEvent(ctx, u, event) } }() } @@ -73,8 +73,8 @@ func (u *UserSession) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (u *UserSession) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs...) +func (u *UserSession) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs) if err != nil { return nil, err } @@ -162,12 +162,12 @@ func (u *UserSession) Reduce(event *models.Event) (err error) { } func (u *UserSession) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-sdfw3s", "id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler") return spooler.HandleError(event, err, u.view.GetLatestUserSessionFailedEvent, u.view.ProcessedUserSessionFailedEvent, u.view.ProcessedUserSessionSequence, u.errorCountUntilSkip) } -func (u *UserSession) OnSuccess() error { - return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp) +func (u *UserSession) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp, instanceIDs) } func (u *UserSession) updateSession(session *view_model.UserSessionView, event *models.Event) error { diff --git a/internal/auth/repository/eventsourcing/repository.go b/internal/auth/repository/eventsourcing/repository.go index 4efdd041d0..a523108622 100644 --- a/internal/auth/repository/eventsourcing/repository.go +++ b/internal/auth/repository/eventsourcing/repository.go @@ -11,6 +11,7 @@ import ( "github.com/zitadel/zitadel/internal/command" sd "github.com/zitadel/zitadel/internal/config/systemdefaults" "github.com/zitadel/zitadel/internal/crypto" + eventstore2 "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/id" @@ -33,7 +34,7 @@ type EsRepository struct { eventstore.OrgRepository } -func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) { +func Start(ctx context.Context, conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, esV2 *eventstore2.Eventstore, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) { es, err := v1.Start(dbClient) if err != nil { return nil, err @@ -47,7 +48,7 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Comma authReq := cache.Start(dbClient) - spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, systemDefaults, queries) + spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, systemDefaults, queries) userRepo := eventstore.UserRepo{ SearchLimit: conf.SearchLimit, diff --git a/internal/auth/repository/eventsourcing/spooler/spooler.go b/internal/auth/repository/eventsourcing/spooler/spooler.go index a45c53a870..63185e2b0d 100644 --- a/internal/auth/repository/eventsourcing/spooler/spooler.go +++ b/internal/auth/repository/eventsourcing/spooler/spooler.go @@ -1,15 +1,16 @@ package spooler import ( + "context" "database/sql" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - "github.com/zitadel/zitadel/internal/query" - "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" sd "github.com/zitadel/zitadel/internal/config/systemdefaults" + "github.com/zitadel/zitadel/internal/eventstore" + v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" + "github.com/zitadel/zitadel/internal/query" ) type SpoolerConfig struct { @@ -20,13 +21,14 @@ type SpoolerConfig struct { Handlers handler.Configs } -func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler { +func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler { spoolerConfig := spooler.Config{ Eventstore: es, + EventstoreV2: esV2, Locker: &locker{dbClient: client}, ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentInstances: c.ConcurrentInstances, - ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries), + ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries), } spool := spoolerConfig.New() spool.Start() diff --git a/internal/auth/repository/eventsourcing/view/external_idps.go b/internal/auth/repository/eventsourcing/view/external_idps.go index 5b7506c18c..103666e69d 100644 --- a/internal/auth/repository/eventsourcing/view/external_idps.go +++ b/internal/auth/repository/eventsourcing/view/external_idps.go @@ -68,16 +68,16 @@ func (v *View) GetLatestExternalIDPSequence(instanceID string) (*global_view.Cur return v.latestSequence(externalIDPTable, instanceID) } -func (v *View) GetLatestExternalIDPSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(externalIDPTable, instanceIDs...) +func (v *View) GetLatestExternalIDPSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(externalIDPTable, instanceIDs) } func (v *View) ProcessedExternalIDPSequence(event *models.Event) error { return v.saveCurrentSequence(externalIDPTable, event) } -func (v *View) UpdateExternalIDPSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(externalIDPTable) +func (v *View) UpdateExternalIDPSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(externalIDPTable, instanceIDs) } func (v *View) GetLatestExternalIDPFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/idp_configs.go b/internal/auth/repository/eventsourcing/view/idp_configs.go index 15d2f965fa..8b55d0b040 100644 --- a/internal/auth/repository/eventsourcing/view/idp_configs.go +++ b/internal/auth/repository/eventsourcing/view/idp_configs.go @@ -53,16 +53,16 @@ func (v *View) GetLatestIDPConfigSequence(instanceID string) (*global_view.Curre return v.latestSequence(idpConfigTable, instanceID) } -func (v *View) GetLatestIDPConfigSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(idpConfigTable, instanceIDs...) +func (v *View) GetLatestIDPConfigSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(idpConfigTable, instanceIDs) } func (v *View) ProcessedIDPConfigSequence(event *models.Event) error { return v.saveCurrentSequence(idpConfigTable, event) } -func (v *View) UpdateIDPConfigSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(idpConfigTable) +func (v *View) UpdateIDPConfigSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(idpConfigTable, instanceIDs) } func (v *View) GetLatestIDPConfigFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/idp_providers.go b/internal/auth/repository/eventsourcing/view/idp_providers.go index ff96e56344..7b3fbe83ae 100644 --- a/internal/auth/repository/eventsourcing/view/idp_providers.go +++ b/internal/auth/repository/eventsourcing/view/idp_providers.go @@ -73,16 +73,16 @@ func (v *View) GetLatestIDPProviderSequence(instanceID string) (*global_view.Cur return v.latestSequence(idpProviderTable, instanceID) } -func (v *View) GetLatestIDPProviderSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(idpProviderTable, instanceIDs...) +func (v *View) GetLatestIDPProviderSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(idpProviderTable, instanceIDs) } func (v *View) ProcessedIDPProviderSequence(event *models.Event) error { return v.saveCurrentSequence(idpProviderTable, event) } -func (v *View) UpdateIDPProviderSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(idpProviderTable) +func (v *View) UpdateIDPProviderSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(idpProviderTable, instanceIDs) } func (v *View) GetLatestIDPProviderFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/org_project_mapping.go b/internal/auth/repository/eventsourcing/view/org_project_mapping.go index 6c6ed4e579..4ccac5dccd 100644 --- a/internal/auth/repository/eventsourcing/view/org_project_mapping.go +++ b/internal/auth/repository/eventsourcing/view/org_project_mapping.go @@ -52,16 +52,16 @@ func (v *View) GetLatestOrgProjectMappingSequence(instanceID string) (*repositor return v.latestSequence(orgPrgojectMappingTable, instanceID) } -func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(orgPrgojectMappingTable, instanceIDs...) +func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(orgPrgojectMappingTable, instanceIDs) } func (v *View) ProcessedOrgProjectMappingSequence(event *models.Event) error { return v.saveCurrentSequence(orgPrgojectMappingTable, event) } -func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(orgPrgojectMappingTable) +func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(orgPrgojectMappingTable, instanceIDs) } func (v *View) GetLatestOrgProjectMappingFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/refresh_token.go b/internal/auth/repository/eventsourcing/view/refresh_token.go index 638c5d44ba..5f7bd50401 100644 --- a/internal/auth/repository/eventsourcing/view/refresh_token.go +++ b/internal/auth/repository/eventsourcing/view/refresh_token.go @@ -77,16 +77,16 @@ func (v *View) GetLatestRefreshTokenSequence(instanceID string) (*repository.Cur return v.latestSequence(refreshTokenTable, instanceID) } -func (v *View) GetLatestRefreshTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(refreshTokenTable, instanceIDs...) +func (v *View) GetLatestRefreshTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(refreshTokenTable, instanceIDs) } func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error { return v.saveCurrentSequence(refreshTokenTable, event) } -func (v *View) UpdateRefreshTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(refreshTokenTable) +func (v *View) UpdateRefreshTokenSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(refreshTokenTable, instanceIDs) } func (v *View) GetLatestRefreshTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/sequence.go b/internal/auth/repository/eventsourcing/view/sequence.go index 831a3e9aa2..c1e3b0b4e2 100644 --- a/internal/auth/repository/eventsourcing/view/sequence.go +++ b/internal/auth/repository/eventsourcing/view/sequence.go @@ -19,12 +19,12 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } -func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) +func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { + return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) } -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) +func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error { + currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) if err != nil { return err } diff --git a/internal/auth/repository/eventsourcing/view/token.go b/internal/auth/repository/eventsourcing/view/token.go index ffd6f2845e..dfc05a51a1 100644 --- a/internal/auth/repository/eventsourcing/view/token.go +++ b/internal/auth/repository/eventsourcing/view/token.go @@ -88,16 +88,16 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq return v.latestSequence(tokenTable, instanceID) } -func (v *View) GetLatestTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(tokenTable, instanceIDs...) +func (v *View) GetLatestTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(tokenTable, instanceIDs) } func (v *View) ProcessedTokenSequence(event *models.Event) error { return v.saveCurrentSequence(tokenTable, event) } -func (v *View) UpdateTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(tokenTable) +func (v *View) UpdateTokenSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(tokenTable, instanceIDs) } func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/user.go b/internal/auth/repository/eventsourcing/view/user.go index 300e697654..8ff62f5579 100644 --- a/internal/auth/repository/eventsourcing/view/user.go +++ b/internal/auth/repository/eventsourcing/view/user.go @@ -193,16 +193,16 @@ func (v *View) GetLatestUserSequence(instanceID string) (*repository.CurrentSequ return v.latestSequence(userTable, instanceID) } -func (v *View) GetLatestUserSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(userTable, instanceIDs...) +func (v *View) GetLatestUserSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(userTable, instanceIDs) } func (v *View) ProcessedUserSequence(event *models.Event) error { return v.saveCurrentSequence(userTable, event) } -func (v *View) UpdateUserSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(userTable) +func (v *View) UpdateUserSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(userTable, instanceIDs) } func (v *View) GetLatestUserFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/user_session.go b/internal/auth/repository/eventsourcing/view/user_session.go index 1947ee6e6d..8a8d71bf03 100644 --- a/internal/auth/repository/eventsourcing/view/user_session.go +++ b/internal/auth/repository/eventsourcing/view/user_session.go @@ -68,16 +68,16 @@ func (v *View) GetLatestUserSessionSequence(instanceID string) (*repository.Curr return v.latestSequence(userSessionTable, instanceID) } -func (v *View) GetLatestUserSessionSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(userSessionTable, instanceIDs...) +func (v *View) GetLatestUserSessionSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(userSessionTable, instanceIDs) } func (v *View) ProcessedUserSessionSequence(event *models.Event) error { return v.saveCurrentSequence(userSessionTable, event) } -func (v *View) UpdateUserSessionSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(userSessionTable) +func (v *View) UpdateUserSessionSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(userSessionTable, instanceIDs) } func (v *View) GetLatestUserSessionFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/authz/repository/eventsourcing/view/sequence.go b/internal/authz/repository/eventsourcing/view/sequence.go index 8f1c720e6f..6810e420d6 100644 --- a/internal/authz/repository/eventsourcing/view/sequence.go +++ b/internal/authz/repository/eventsourcing/view/sequence.go @@ -1,8 +1,6 @@ package view import ( - "time" - "github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/view/repository" ) @@ -18,21 +16,3 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error { func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } - -func (v *View) latestSequences(viewName string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName) -} - -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) - if err != nil { - return err - } - for _, currentSequence := range currentSequences { - if currentSequence.ViewName == "" { - currentSequence.ViewName = viewName - } - currentSequence.LastSuccessfulSpoolerRun = time.Now() - } - return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences) -} diff --git a/internal/authz/repository/eventsourcing/view/token.go b/internal/authz/repository/eventsourcing/view/token.go index 2c8aead649..486d72008d 100644 --- a/internal/authz/repository/eventsourcing/view/token.go +++ b/internal/authz/repository/eventsourcing/view/token.go @@ -47,15 +47,3 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq func (v *View) ProcessedTokenSequence(event *models.Event) error { return v.saveCurrentSequence(tokenTable, event) } - -func (v *View) UpdateTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(tokenTable) -} - -func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { - return v.latestFailedEvent(tokenTable, instanceID, sequence) -} - -func (v *View) ProcessedTokenFailedEvent(failedEvent *repository.FailedEvent) error { - return v.saveFailedEvent(failedEvent) -} diff --git a/internal/eventstore/v1/query/handler.go b/internal/eventstore/v1/query/handler.go index 1aa2a27781..7e126a8f2a 100755 --- a/internal/eventstore/v1/query/handler.go +++ b/internal/eventstore/v1/query/handler.go @@ -17,10 +17,10 @@ const ( type Handler interface { ViewModel() string - EventQuery(instanceIDs ...string) (*models.SearchQuery, error) + EventQuery(instanceIDs []string) (*models.SearchQuery, error) Reduce(*models.Event) error OnError(event *models.Event, err error) error - OnSuccess() error + OnSuccess(instanceIDs []string) error MinimumCycleDuration() time.Duration LockDuration() time.Duration QueryLimit() uint64 @@ -32,7 +32,7 @@ type Handler interface { Subscription() *v1.Subscription } -func ReduceEvent(handler Handler, event *models.Event) { +func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) { defer func() { err := recover() @@ -42,7 +42,7 @@ func ReduceEvent(handler Handler, event *models.Event) { "cause", err, "stack", string(debug.Stack()), "sequence", event.Sequence, - "instnace", event.InstanceID, + "instance", event.InstanceID, ).Error("reduce panicked") } }() @@ -60,7 +60,7 @@ func ReduceEvent(handler Handler, event *models.Event) { SearchQuery(). SetLimit(eventLimit) - unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery) + unprocessedEvents, err := handler.Eventstore().FilterEvents(ctx, searchQuery) if err != nil { logging.WithFields("sequence", event.Sequence).Warn("filter failed") return diff --git a/internal/eventstore/v1/spooler/config.go b/internal/eventstore/v1/spooler/config.go index 04f5b7e659..92f2de1299 100644 --- a/internal/eventstore/v1/spooler/config.go +++ b/internal/eventstore/v1/spooler/config.go @@ -5,6 +5,7 @@ import ( "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/query" "github.com/zitadel/zitadel/internal/id" @@ -12,6 +13,7 @@ import ( type Config struct { Eventstore v1.Eventstore + EventstoreV2 *eventstore.Eventstore Locker Locker ViewHandlers []query.Handler ConcurrentWorkers int @@ -31,6 +33,7 @@ func (c *Config) New() *Spooler { handlers: c.ViewHandlers, lockID: lockID, eventstore: c.Eventstore, + esV2: c.EventstoreV2, locker: c.Locker, queue: make(chan *spooledHandler, len(c.ViewHandlers)), workers: c.ConcurrentWorkers, diff --git a/internal/eventstore/v1/spooler/spooler.go b/internal/eventstore/v1/spooler/spooler.go index cd9133fe96..24e72ce5fd 100644 --- a/internal/eventstore/v1/spooler/spooler.go +++ b/internal/eventstore/v1/spooler/spooler.go @@ -9,6 +9,8 @@ import ( "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/query" @@ -16,13 +18,19 @@ import ( "github.com/zitadel/zitadel/internal/view/repository" ) -const systemID = "system" +const ( + systemID = "system" + schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded") + aggregateType = eventstore.AggregateType("system") + aggregateID = "SYSTEM" +) type Spooler struct { handlers []query.Handler locker Locker lockID string eventstore v1.Eventstore + esV2 *eventstore.Eventstore workers int queue chan *spooledHandler concurrentInstances int @@ -37,7 +45,9 @@ type spooledHandler struct { locker Locker queuedAt time.Time eventstore v1.Eventstore + esV2 *eventstore.Eventstore concurrentInstances int + succeededOnce bool } func (s *Spooler) Start() { @@ -57,7 +67,7 @@ func (s *Spooler) Start() { } go func() { for _, handler := range s.handlers { - s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, concurrentInstances: s.concurrentInstances} + s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, esV2: s.esV2, concurrentInstances: s.concurrentInstances} } }() } @@ -68,6 +78,32 @@ func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) { queue <- task } +func (s *spooledHandler) hasSucceededOnce(ctx context.Context) (bool, error) { + events, err := s.esV2.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes(aggregateType). + AggregateIDs(aggregateID). + EventTypes(schedulerSucceeded). + EventData(map[string]interface{}{ + "name": s.ViewModel(), + }). + Builder(), + ) + return len(events) > 0 && err == nil, err +} + +func (s *spooledHandler) setSucceededOnce(ctx context.Context) error { + _, err := s.esV2.Push(ctx, &handler.ProjectionSucceededEvent{ + BaseEvent: *eventstore.NewBaseEventForPush(ctx, + eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"), + schedulerSucceeded, + ), + Name: s.ViewModel(), + }) + s.succeededOnce = err == nil + return err +} + func (s *spooledHandler) load(workerID string) { errs := make(chan error) defer func() { @@ -86,8 +122,24 @@ func (s *spooledHandler) load(workerID string) { hasLocked := s.lock(ctx, errs, workerID) if <-hasLocked { + if !s.succeededOnce { + var err error + s.succeededOnce, err = s.hasSucceededOnce(ctx) + if err != nil { + logging.WithFields("view", s.ViewModel()).OnError(err).Warn("initial lock failed for first schedule") + errs <- err + return + } + } + + instanceIDQuery := models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("") for { - ids, err := s.eventstore.InstanceIDs(ctx, models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("").SearchQuery()) + if s.succeededOnce { + // since we have at least one successful run, we can restrict it to events not older than + // twice the requeue time (just to be sure not to miss an event) + instanceIDQuery = instanceIDQuery.CreationDateNewerFilter(time.Now().Add(-2 * s.MinimumCycleDuration())) + } + ids, err := s.eventstore.InstanceIDs(ctx, instanceIDQuery.SearchQuery()) if err != nil { errs <- err break @@ -97,12 +149,16 @@ func (s *spooledHandler) load(workerID string) { if max > len(ids) { max = len(ids) } - err = s.processInstances(ctx, workerID, ids[i:max]...) + err = s.processInstances(ctx, workerID, ids[i:max]) if err != nil { errs <- err } } if ctx.Err() == nil { + if !s.succeededOnce { + err = s.setSucceededOnce(ctx) + logging.WithFields("view", s.ViewModel()).OnError(err).Warn("unable to push first schedule succeeded") + } errs <- nil } break @@ -111,16 +167,20 @@ func (s *spooledHandler) load(workerID string) { <-ctx.Done() } -func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids ...string) error { +func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids []string) error { for { - events, err := s.query(ctx, ids...) + processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + events, err := s.query(processCtx, ids) if err != nil { + cancel() return err } if len(events) == 0 { + cancel() return nil } - err = s.process(ctx, events, workerID) + err = s.process(processCtx, events, workerID, ids) + cancel() if err != nil { return err } @@ -139,7 +199,7 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str } } -func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error { +func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string, instanceIDs []string) error { for i, event := range events { select { case <-ctx.Done(): @@ -152,17 +212,17 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo continue } time.Sleep(100 * time.Millisecond) - return s.process(ctx, events[i:], workerID) + return s.process(ctx, events[i:], workerID, instanceIDs) } } } - err := s.OnSuccess() + err := s.OnSuccess(instanceIDs) logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func") return err } -func (s *spooledHandler) query(ctx context.Context, instanceIDs ...string) ([]*models.Event, error) { - query, err := s.EventQuery(instanceIDs...) +func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) { + query, err := s.EventQuery(instanceIDs) if err != nil { return nil, err } @@ -227,6 +287,6 @@ func HandleError(event *models.Event, failedErr error, return failedErr } -func HandleSuccess(updateSpoolerRunTimestamp func() error) error { - return updateSpoolerRunTimestamp() +func HandleSuccess(updateSpoolerRunTimestamp func([]string) error, instanceIDs []string) error { + return updateSpoolerRunTimestamp(instanceIDs) } diff --git a/internal/eventstore/v1/spooler/spooler_test.go b/internal/eventstore/v1/spooler/spooler_test.go index 315a16a2a5..9aa0c75431 100644 --- a/internal/eventstore/v1/spooler/spooler_test.go +++ b/internal/eventstore/v1/spooler/spooler_test.go @@ -51,7 +51,7 @@ func (h *testHandler) Subscription() *v1.Subscription { return nil } -func (h *testHandler) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { +func (h *testHandler) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { if h.queryError != nil { return nil, h.queryError } @@ -71,7 +71,7 @@ func (h *testHandler) OnError(event *models.Event, err error) error { return err } -func (h *testHandler) OnSuccess() error { +func (h *testHandler) OnSuccess([]string) error { return nil } @@ -127,8 +127,9 @@ func TestSpooler_process(t *testing.T) { currentHandler *testHandler } type args struct { - timeout time.Duration - events []*models.Event + timeout time.Duration + events []*models.Event + instanceIDs []string } tests := []struct { name string @@ -184,7 +185,7 @@ func TestSpooler_process(t *testing.T) { start = time.Now() } - if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr { + if err := s.process(ctx, tt.args.events, "test", tt.args.instanceIDs); (err != nil) != tt.wantErr { t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr) } if tt.fields.currentHandler.maxErrCount != tt.wantRetries { diff --git a/internal/user/repository/view/token_view.go b/internal/user/repository/view/token_view.go index 405b66dc6e..e24a8f671a 100644 --- a/internal/user/repository/view/token_view.go +++ b/internal/user/repository/view/token_view.go @@ -36,8 +36,13 @@ func TokensByUserID(db *gorm.DB, table, userID, instanceID string) ([]*usr_model Method: domain.SearchMethodEquals, Value: instanceID, } + expirationQuery := &model.TokenSearchQuery{ + Key: model.TokenSearchKeyExpiration, + Method: domain.SearchMethodGreaterThan, + Value: "now()", + } query := repository.PrepareSearchQuery(table, usr_model.TokenSearchRequest{ - Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery}, + Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery, expirationQuery}, }) _, err := query(db, &tokens) return tokens, err diff --git a/internal/view/repository/sequence.go b/internal/view/repository/sequence.go index 2f1a2e1b25..5ccb7a36f7 100644 --- a/internal/view/repository/sequence.go +++ b/internal/view/repository/sequence.go @@ -169,7 +169,7 @@ func LatestSequence(db *gorm.DB, table, viewName, instanceID string) (*CurrentSe return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName) } -func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs ...string) ([]*CurrentSequence, error) { +func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs []string) ([]*CurrentSequence, error) { searchQueries := []sequenceSearchQuery{ {key: sequenceSearchKey(SequenceSearchKeyViewName), value: viewName, method: domain.SearchMethodEquals}, }