fix: reduce load on view tables (#4716)

* fix: reduce load on view tables

* create prerelease

* linting: pass context to view handlers

* fix error handling of refresh token handler

* fix: improve processing of successful instanceIDs on views

* fix revert intended change in .golangci.yaml

* fix: set timeout for processInstances in spooler

* fix: reduce update to active tokens on profile change

* change token expiration query to db now()

* remove branch from .releaserc.js
This commit is contained in:
Livio Spring 2022-11-22 07:36:48 +01:00 committed by GitHub
parent 29441ce4b6
commit e8babf1048
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 295 additions and 238 deletions

View File

@ -2,7 +2,6 @@ module.exports = {
branches: [
{name: 'main'},
{name: '1.87.x', range: '1.87.x', channel: '1.87.x'},
{name: 'startup-times', prerelease: 'beta'}
],
plugins: [
"@semantic-release/commit-analyzer"

View File

@ -160,7 +160,7 @@ Auth:
SearchLimit: 1000
Spooler:
ConcurrentWorkers: 1
ConcurrentInstances: 10
ConcurrentInstances: 1
BulkLimit: 10000
FailureCountUntilSkip: 5
@ -168,7 +168,7 @@ Admin:
SearchLimit: 1000
Spooler:
ConcurrentWorkers: 1
ConcurrentInstances: 10
ConcurrentInstances: 1
BulkLimit: 10000
FailureCountUntilSkip: 5

View File

@ -173,11 +173,11 @@ func startAPIs(ctx context.Context, router *mux.Router, commands *command.Comman
return err
}
apis := api.New(config.Port, router, queries, verifier, config.InternalAuthZ, config.ExternalSecure, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader)
authRepo, err := auth_es.Start(config.Auth, config.SystemDefaults, commands, queries, dbClient, keys.OIDC, keys.User)
authRepo, err := auth_es.Start(ctx, config.Auth, config.SystemDefaults, commands, queries, dbClient, eventstore, keys.OIDC, keys.User)
if err != nil {
return fmt.Errorf("error starting auth repo: %w", err)
}
adminRepo, err := admin_es.Start(config.Admin, store, dbClient)
adminRepo, err := admin_es.Start(ctx, config.Admin, store, dbClient, eventstore)
if err != nil {
return fmt.Errorf("error starting admin repo: %w", err)
}

View File

@ -1,6 +1,7 @@
package handler
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
@ -28,10 +29,10 @@ func (h *handler) Eventstore() v1.Eventstore {
return h.es
}
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler {
func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler {
handlers := []query.Handler{}
if static != nil {
handlers = append(handlers, newStyling(
handlers = append(handlers, newStyling(ctx,
handler{view, bulkLimit, configs.cycleDuration("Styling"), errorCount, es},
static))
}

View File

@ -34,21 +34,21 @@ type Styling struct {
subscription *v1.Subscription
}
func newStyling(handler handler, static static.Storage) *Styling {
func newStyling(ctx context.Context, handler handler, static static.Storage) *Styling {
h := &Styling{
handler: handler,
static: static,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (m *Styling) subscribe() {
func (m *Styling) subscribe(ctx context.Context) {
m.subscription = m.es.Subscribe(m.AggregateTypes()...)
go func() {
for event := range m.subscription.Events {
query.ReduceEvent(m, event)
query.ReduceEvent(ctx, m, event)
}
}()
}
@ -73,15 +73,15 @@ func (m *Styling) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) {
sequences, err := m.view.GetLatestStylingSequences(instanceIDs...)
func (m *Styling) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := m.view.GetLatestStylingSequences(instanceIDs)
if err != nil {
return nil, err
}
searchQuery := models.NewSearchQuery()
for _, sequence := range sequences {
var seq uint64
for _, instanceID := range instanceIDs {
var seq uint64
for _, sequence := range sequences {
if sequence.InstanceID == instanceID {
seq = sequence.CurrentSequence
break
@ -90,7 +90,7 @@ func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error)
searchQuery.AddQuery().
AggregateTypeFilter(m.AggregateTypes()...).
LatestSequenceFilter(seq).
InstanceIDFilter(sequence.InstanceID)
InstanceIDFilter(instanceID)
}
return searchQuery, nil
}
@ -166,12 +166,12 @@ func (m *Styling) processLabelPolicy(event *models.Event) (err error) {
}
func (m *Styling) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-2m9fs", "id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler")
return spooler.HandleError(event, err, m.view.GetLatestStylingFailedEvent, m.view.ProcessedStylingFailedEvent, m.view.ProcessedStylingSequence, m.errorCountUntilSkip)
}
func (m *Styling) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp)
func (m *Styling) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp, instanceIDs)
}
func (m *Styling) generateStylingFile(policy *iam_model.LabelPolicyView) error {

View File

@ -7,6 +7,7 @@ import (
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/eventstore"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/spooler"
admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
eventstore2 "github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/static"
@ -22,7 +23,7 @@ type EsRepository struct {
eventstore.AdministratorRepo
}
func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, error) {
func Start(ctx context.Context, conf Config, static static.Storage, dbClient *sql.DB, esV2 *eventstore2.Eventstore) (*EsRepository, error) {
es, err := v1.Start(dbClient)
if err != nil {
return nil, err
@ -32,7 +33,7 @@ func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository,
return nil, err
}
spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, static)
spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, static)
return &EsRepository{
spooler: spool,

View File

@ -1,14 +1,15 @@
package spooler
import (
"context"
"database/sql"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/static"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/static"
)
type SpoolerConfig struct {
@ -19,13 +20,14 @@ type SpoolerConfig struct {
Handlers handler.Configs
}
func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler {
func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler {
spoolerConfig := spooler.Config{
Eventstore: es,
EventstoreV2: esV2,
Locker: &locker{dbClient: sql},
ConcurrentWorkers: c.ConcurrentWorkers,
ConcurrentInstances: c.ConcurrentInstances,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static),
ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static),
}
spool := spoolerConfig.New()
spool.Start()

View File

@ -19,16 +19,16 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
}
func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...)
func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
}
func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) {
return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID)
}
func (v *View) updateSpoolerRunSequence(viewName string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName)
func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
if err != nil {
return err
}
@ -41,12 +41,6 @@ func (v *View) updateSpoolerRunSequence(viewName string) error {
return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences)
}
func (v *View) GetCurrentSequence(db, viewName string) ([]*repository.CurrentSequence, error) {
sequenceTable := db + ".current_sequences"
fullView := db + "." + viewName
return repository.LatestSequences(v.Db, sequenceTable, fullView)
}
func (v *View) ClearView(db, viewName string) error {
truncateView := db + "." + viewName
sequenceTable := db + ".current_sequences"

View File

@ -35,16 +35,16 @@ func (v *View) GetLatestStylingSequence(instanceID string) (*global_view.Current
return v.latestSequence(stylingTyble, instanceID)
}
func (v *View) GetLatestStylingSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(stylingTyble, instanceIDs...)
func (v *View) GetLatestStylingSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(stylingTyble, instanceIDs)
}
func (v *View) ProcessedStylingSequence(event *models.Event) error {
return v.saveCurrentSequence(stylingTyble, event)
}
func (v *View) UpdateStylingSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(stylingTyble)
func (v *View) UpdateStylingSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(stylingTyble, instanceIDs)
}
func (v *View) GetLatestStylingFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -33,24 +33,24 @@ func (h *handler) Eventstore() v1.Eventstore {
return h.es
}
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler {
func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler {
return []query.Handler{
newUser(
newUser(ctx,
handler{view, bulkLimit, configs.cycleDuration("User"), errorCount, es}, queries),
newUserSession(
newUserSession(ctx,
handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount, es}, queries),
newToken(
newToken(ctx,
handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount, es}),
newIDPConfig(
newIDPConfig(ctx,
handler{view, bulkLimit, configs.cycleDuration("IDPConfig"), errorCount, es}),
newIDPProvider(
newIDPProvider(ctx,
handler{view, bulkLimit, configs.cycleDuration("IDPProvider"), errorCount, es},
systemDefaults, queries),
newExternalIDP(
newExternalIDP(ctx,
handler{view, bulkLimit, configs.cycleDuration("ExternalIDP"), errorCount, es},
systemDefaults, queries),
newRefreshToken(handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}),
newOrgProjectMapping(handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}),
newRefreshToken(ctx, handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}),
newOrgProjectMapping(ctx, handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}),
}
}
@ -80,9 +80,9 @@ func withInstanceID(ctx context.Context, instanceID string) context.Context {
func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []models.AggregateType, instanceIDs []string) *models.SearchQuery {
searchQuery := models.NewSearchQuery()
for _, sequence := range sequences {
var seq uint64
for _, instanceID := range instanceIDs {
var seq uint64
for _, sequence := range sequences {
if sequence.InstanceID == instanceID {
seq = sequence.CurrentSequence
break
@ -91,7 +91,7 @@ func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []mo
searchQuery.AddQuery().
AggregateTypeFilter(aggregateTypes...).
LatestSequenceFilter(seq).
InstanceIDFilter(sequence.InstanceID)
InstanceIDFilter(instanceID)
}
return searchQuery
}

View File

@ -1,6 +1,8 @@
package handler
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
@ -23,21 +25,21 @@ type IDPConfig struct {
subscription *v1.Subscription
}
func newIDPConfig(h handler) *IDPConfig {
func newIDPConfig(ctx context.Context, h handler) *IDPConfig {
idpConfig := &IDPConfig{
handler: h,
}
idpConfig.subscribe()
idpConfig.subscribe(ctx)
return idpConfig
}
func (i *IDPConfig) subscribe() {
func (i *IDPConfig) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() {
for event := range i.subscription.Events {
query.ReduceEvent(i, event)
query.ReduceEvent(ctx, i, event)
}
}()
}
@ -62,8 +64,8 @@ func (i *IDPConfig) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (i *IDPConfig) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs...)
func (i *IDPConfig) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -129,10 +131,10 @@ func (i *IDPConfig) processIdpConfig(providerType iam_model.IDPProviderType, eve
}
func (i *IDPConfig) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Ejf8s", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler")
return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip)
}
func (i *IDPConfig) OnSuccess() error {
return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp)
func (i *IDPConfig) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp, instanceIDs)
}

View File

@ -32,6 +32,7 @@ type IDPProvider struct {
}
func newIDPProvider(
ctx context.Context,
h handler,
defaults systemdefaults.SystemDefaults,
queries *query2.Queries,
@ -42,16 +43,16 @@ func newIDPProvider(
queries: queries,
}
idpProvider.subscribe()
idpProvider.subscribe(ctx)
return idpProvider
}
func (i *IDPProvider) subscribe() {
func (i *IDPProvider) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() {
for event := range i.subscription.Events {
query.ReduceEvent(i, event)
query.ReduceEvent(ctx, i, event)
}
}()
}
@ -76,8 +77,8 @@ func (i *IDPProvider) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (i *IDPProvider) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs...)
func (i *IDPProvider) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -188,14 +189,14 @@ func (i *IDPProvider) OnError(event *es_models.Event, err error) error {
return spooler.HandleError(event, err, i.view.GetLatestIDPProviderFailedEvent, i.view.ProcessedIDPProviderFailedEvent, i.view.ProcessedIDPProviderSequence, i.errorCountUntilSkip)
}
func (i *IDPProvider) OnSuccess() error {
return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp)
func (i *IDPProvider) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp, instanceIDs)
}
func (i *IDPProvider) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) {
return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, aggregateID)
}
func (u *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) {
return u.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID)
func (i *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) {
return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID)
}

View File

@ -1,6 +1,8 @@
package handler
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
@ -23,22 +25,23 @@ type OrgProjectMapping struct {
}
func newOrgProjectMapping(
ctx context.Context,
handler handler,
) *OrgProjectMapping {
h := &OrgProjectMapping{
handler: handler,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (k *OrgProjectMapping) subscribe() {
k.subscription = k.es.Subscribe(k.AggregateTypes()...)
func (p *OrgProjectMapping) subscribe(ctx context.Context) {
p.subscription = p.es.Subscribe(p.AggregateTypes()...)
go func() {
for event := range k.subscription.Events {
query.ReduceEvent(k, event)
for event := range p.subscription.Events {
query.ReduceEvent(ctx, p, event)
}
}()
}
@ -63,8 +66,8 @@ func (p *OrgProjectMapping) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (p *OrgProjectMapping) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) {
sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs...)
func (p *OrgProjectMapping) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -85,15 +88,21 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) {
}
case project.GrantAddedType:
projectGrant := new(view_model.ProjectGrant)
projectGrant.SetData(event)
err := projectGrant.SetData(event)
if err != nil {
return err
}
mapping.OrgID = projectGrant.GrantedOrgID
mapping.ProjectID = event.AggregateID
mapping.ProjectGrantID = projectGrant.GrantID
mapping.InstanceID = event.InstanceID
case project.GrantRemovedType:
projectGrant := new(view_model.ProjectGrant)
projectGrant.SetData(event)
err := p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID)
err := projectGrant.SetData(event)
if err != nil {
return err
}
err = p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID)
if err == nil {
return p.view.ProcessedOrgProjectMappingSequence(event)
}
@ -109,10 +118,10 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) {
}
func (p *OrgProjectMapping) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-2k0fS", "id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler")
return spooler.HandleError(event, err, p.view.GetLatestOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingSequence, p.errorCountUntilSkip)
}
func (p *OrgProjectMapping) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp)
func (p *OrgProjectMapping) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp, instanceIDs)
}

View File

@ -1,6 +1,7 @@
package handler
import (
"context"
"encoding/json"
"github.com/zitadel/logging"
@ -27,22 +28,23 @@ type RefreshToken struct {
}
func newRefreshToken(
ctx context.Context,
handler handler,
) *RefreshToken {
h := &RefreshToken{
handler: handler,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (t *RefreshToken) subscribe() {
func (t *RefreshToken) subscribe(ctx context.Context) {
t.subscription = t.es.Subscribe(t.AggregateTypes()...)
go func() {
for event := range t.subscription.Events {
query.ReduceEvent(t, event)
query.ReduceEvent(ctx, t, event)
}
}()
}
@ -67,8 +69,8 @@ func (t *RefreshToken) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (t *RefreshToken) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs...)
func (t *RefreshToken) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -87,7 +89,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
case user.HumanRefreshTokenRenewedType:
e := new(user.HumanRefreshTokenRenewedEvent)
if err := json.Unmarshal(event.Data, e); err != nil {
logging.Log("EVEN-DBbn4").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return caos_errs.ThrowInternal(nil, "MODEL-BHn75", "could not unmarshal data")
}
token, err := t.view.RefreshTokenByID(e.TokenID, event.InstanceID)
@ -102,7 +104,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
case user.HumanRefreshTokenRemovedType:
e := new(user.HumanRefreshTokenRemovedEvent)
if err := json.Unmarshal(event.Data, e); err != nil {
logging.Log("EVEN-BDbh3").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return caos_errs.ThrowInternal(nil, "MODEL-Bz653", "could not unmarshal data")
}
return t.view.DeleteRefreshToken(e.TokenID, event.InstanceID, event)
@ -118,10 +120,10 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
}
func (t *RefreshToken) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip)
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
return spooler.HandleError(event, err, t.view.GetLatestRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenSequence, t.errorCountUntilSkip)
}
func (t *RefreshToken) OnSuccess() error {
return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp)
func (t *RefreshToken) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(t.view.UpdateRefreshTokenSpoolerRunTimestamp, instanceIDs)
}

View File

@ -33,22 +33,23 @@ type Token struct {
}
func newToken(
ctx context.Context,
handler handler,
) *Token {
h := &Token{
handler: handler,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (t *Token) subscribe() {
func (t *Token) subscribe(ctx context.Context) {
t.subscription = t.es.Subscribe(t.AggregateTypes()...)
go func() {
for event := range t.subscription.Events {
query.ReduceEvent(t, event)
query.ReduceEvent(ctx, t, event)
}
}()
}
@ -65,16 +66,16 @@ func (_ *Token) AggregateTypes() []es_models.AggregateType {
return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType}
}
func (p *Token) CurrentSequence(instanceID string) (uint64, error) {
sequence, err := p.view.GetLatestTokenSequence(instanceID)
func (t *Token) CurrentSequence(instanceID string) (uint64, error) {
sequence, err := t.view.GetLatestTokenSequence(instanceID)
if err != nil {
return 0, err
}
return sequence.CurrentSequence, nil
}
func (t *Token) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestTokenSequences(instanceIDs...)
func (t *Token) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestTokenSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -94,7 +95,10 @@ func (t *Token) Reduce(event *es_models.Event) (err error) {
case user.UserV1ProfileChangedType,
user.HumanProfileChangedType:
user := new(view_model.UserView)
user.AppendEvent(event)
err := user.AppendEvent(event)
if err != nil {
return err
}
tokens, err := t.view.TokensByUserID(event.AggregateID, event.InstanceID)
if err != nil {
return err
@ -153,14 +157,14 @@ func (t *Token) Reduce(event *es_models.Event) (err error) {
}
func (t *Token) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip)
}
func agentIDFromSession(event *es_models.Event) (string, error) {
session := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &session); err != nil {
logging.Log("EVEN-s3bq9").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-sd325", "could not unmarshal data")
}
return session["userAgentID"].(string), nil
@ -169,7 +173,7 @@ func agentIDFromSession(event *es_models.Event) (string, error) {
func applicationFromSession(event *es_models.Event) (*project_es_model.Application, error) {
application := new(project_es_model.Application)
if err := json.Unmarshal(event.Data, &application); err != nil {
logging.Log("EVEN-GRE2q").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return nil, caos_errs.ThrowInternal(nil, "MODEL-Hrw1q", "could not unmarshal data")
}
return application, nil
@ -178,7 +182,7 @@ func applicationFromSession(event *es_models.Event) (*project_es_model.Applicati
func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
removed := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &removed); err != nil {
logging.Log("EVEN-Sdff3").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-Sff32", "could not unmarshal data")
}
return removed["tokenId"].(string), nil
@ -187,14 +191,14 @@ func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
func refreshTokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
removed := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &removed); err != nil {
logging.Log("EVEN-Ff23g").WithError(err).Error("could not unmarshal event data")
logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-Dfb3w", "could not unmarshal data")
}
return removed["tokenId"].(string), nil
}
func (t *Token) OnSuccess() error {
return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp)
func (t *Token) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp, instanceIDs)
}
func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) {

View File

@ -34,6 +34,7 @@ type User struct {
}
func newUser(
ctx context.Context,
handler handler,
queries *query2.Queries,
) *User {
@ -42,16 +43,16 @@ func newUser(
queries: queries,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (k *User) subscribe() {
k.subscription = k.es.Subscribe(k.AggregateTypes()...)
func (u *User) subscribe(ctx context.Context) {
u.subscription = u.es.Subscribe(u.AggregateTypes()...)
go func() {
for event := range k.subscription.Events {
query.ReduceEvent(k, event)
for event := range u.subscription.Events {
query.ReduceEvent(ctx, u, event)
}
}()
}
@ -75,8 +76,8 @@ func (u *User) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (u *User) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSequences(instanceIDs...)
func (u *User) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -275,12 +276,12 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *es_models.Event) error {
}
func (u *User) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-is8aAWima", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user handler")
return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip)
}
func (u *User) OnSuccess() error {
return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp)
func (u *User) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp, instanceIDs)
}
func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) {

View File

@ -33,6 +33,7 @@ type ExternalIDP struct {
}
func newExternalIDP(
ctx context.Context,
handler handler,
defaults systemdefaults.SystemDefaults,
queries *query2.Queries,
@ -43,16 +44,16 @@ func newExternalIDP(
queries: queries,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (i *ExternalIDP) subscribe() {
func (i *ExternalIDP) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() {
for event := range i.subscription.Events {
query.ReduceEvent(i, event)
query.ReduceEvent(ctx, i, event)
}
}()
}
@ -77,8 +78,8 @@ func (i *ExternalIDP) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (i *ExternalIDP) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) {
sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs...)
func (i *ExternalIDP) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -178,8 +179,8 @@ func (i *ExternalIDP) OnError(event *es_models.Event, err error) error {
return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip)
}
func (i *ExternalIDP) OnSuccess() error {
return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp)
func (i *ExternalIDP) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp, instanceIDs)
}
func (i *ExternalIDP) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) {

View File

@ -33,22 +33,22 @@ type UserSession struct {
queries *query2.Queries
}
func newUserSession(handler handler, queries *query2.Queries) *UserSession {
func newUserSession(ctx context.Context, handler handler, queries *query2.Queries) *UserSession {
h := &UserSession{
handler: handler,
queries: queries,
}
h.subscribe()
h.subscribe(ctx)
return h
}
func (k *UserSession) subscribe() {
k.subscription = k.es.Subscribe(k.AggregateTypes()...)
func (u *UserSession) subscribe(ctx context.Context) {
u.subscription = u.es.Subscribe(u.AggregateTypes()...)
go func() {
for event := range k.subscription.Events {
query.ReduceEvent(k, event)
for event := range u.subscription.Events {
query.ReduceEvent(ctx, u, event)
}
}()
}
@ -73,8 +73,8 @@ func (u *UserSession) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil
}
func (u *UserSession) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs...)
func (u *UserSession) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs)
if err != nil {
return nil, err
}
@ -162,12 +162,12 @@ func (u *UserSession) Reduce(event *models.Event) (err error) {
}
func (u *UserSession) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-sdfw3s", "id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler")
logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler")
return spooler.HandleError(event, err, u.view.GetLatestUserSessionFailedEvent, u.view.ProcessedUserSessionFailedEvent, u.view.ProcessedUserSessionSequence, u.errorCountUntilSkip)
}
func (u *UserSession) OnSuccess() error {
return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp)
func (u *UserSession) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp, instanceIDs)
}
func (u *UserSession) updateSession(session *view_model.UserSessionView, event *models.Event) error {

View File

@ -11,6 +11,7 @@ import (
"github.com/zitadel/zitadel/internal/command"
sd "github.com/zitadel/zitadel/internal/config/systemdefaults"
"github.com/zitadel/zitadel/internal/crypto"
eventstore2 "github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/id"
@ -33,7 +34,7 @@ type EsRepository struct {
eventstore.OrgRepository
}
func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) {
func Start(ctx context.Context, conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, esV2 *eventstore2.Eventstore, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) {
es, err := v1.Start(dbClient)
if err != nil {
return nil, err
@ -47,7 +48,7 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Comma
authReq := cache.Start(dbClient)
spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, systemDefaults, queries)
spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, systemDefaults, queries)
userRepo := eventstore.UserRepo{
SearchLimit: conf.SearchLimit,

View File

@ -1,15 +1,16 @@
package spooler
import (
"context"
"database/sql"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
sd "github.com/zitadel/zitadel/internal/config/systemdefaults"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/query"
)
type SpoolerConfig struct {
@ -20,13 +21,14 @@ type SpoolerConfig struct {
Handlers handler.Configs
}
func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler {
func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler {
spoolerConfig := spooler.Config{
Eventstore: es,
EventstoreV2: esV2,
Locker: &locker{dbClient: client},
ConcurrentWorkers: c.ConcurrentWorkers,
ConcurrentInstances: c.ConcurrentInstances,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries),
ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries),
}
spool := spoolerConfig.New()
spool.Start()

View File

@ -68,16 +68,16 @@ func (v *View) GetLatestExternalIDPSequence(instanceID string) (*global_view.Cur
return v.latestSequence(externalIDPTable, instanceID)
}
func (v *View) GetLatestExternalIDPSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(externalIDPTable, instanceIDs...)
func (v *View) GetLatestExternalIDPSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(externalIDPTable, instanceIDs)
}
func (v *View) ProcessedExternalIDPSequence(event *models.Event) error {
return v.saveCurrentSequence(externalIDPTable, event)
}
func (v *View) UpdateExternalIDPSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(externalIDPTable)
func (v *View) UpdateExternalIDPSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(externalIDPTable, instanceIDs)
}
func (v *View) GetLatestExternalIDPFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -53,16 +53,16 @@ func (v *View) GetLatestIDPConfigSequence(instanceID string) (*global_view.Curre
return v.latestSequence(idpConfigTable, instanceID)
}
func (v *View) GetLatestIDPConfigSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpConfigTable, instanceIDs...)
func (v *View) GetLatestIDPConfigSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpConfigTable, instanceIDs)
}
func (v *View) ProcessedIDPConfigSequence(event *models.Event) error {
return v.saveCurrentSequence(idpConfigTable, event)
}
func (v *View) UpdateIDPConfigSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(idpConfigTable)
func (v *View) UpdateIDPConfigSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(idpConfigTable, instanceIDs)
}
func (v *View) GetLatestIDPConfigFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -73,16 +73,16 @@ func (v *View) GetLatestIDPProviderSequence(instanceID string) (*global_view.Cur
return v.latestSequence(idpProviderTable, instanceID)
}
func (v *View) GetLatestIDPProviderSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpProviderTable, instanceIDs...)
func (v *View) GetLatestIDPProviderSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpProviderTable, instanceIDs)
}
func (v *View) ProcessedIDPProviderSequence(event *models.Event) error {
return v.saveCurrentSequence(idpProviderTable, event)
}
func (v *View) UpdateIDPProviderSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(idpProviderTable)
func (v *View) UpdateIDPProviderSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(idpProviderTable, instanceIDs)
}
func (v *View) GetLatestIDPProviderFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -52,16 +52,16 @@ func (v *View) GetLatestOrgProjectMappingSequence(instanceID string) (*repositor
return v.latestSequence(orgPrgojectMappingTable, instanceID)
}
func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(orgPrgojectMappingTable, instanceIDs...)
func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(orgPrgojectMappingTable, instanceIDs)
}
func (v *View) ProcessedOrgProjectMappingSequence(event *models.Event) error {
return v.saveCurrentSequence(orgPrgojectMappingTable, event)
}
func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(orgPrgojectMappingTable)
func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(orgPrgojectMappingTable, instanceIDs)
}
func (v *View) GetLatestOrgProjectMappingFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -77,16 +77,16 @@ func (v *View) GetLatestRefreshTokenSequence(instanceID string) (*repository.Cur
return v.latestSequence(refreshTokenTable, instanceID)
}
func (v *View) GetLatestRefreshTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(refreshTokenTable, instanceIDs...)
func (v *View) GetLatestRefreshTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(refreshTokenTable, instanceIDs)
}
func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(refreshTokenTable, event)
}
func (v *View) UpdateRefreshTokenSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(refreshTokenTable)
func (v *View) UpdateRefreshTokenSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(refreshTokenTable, instanceIDs)
}
func (v *View) GetLatestRefreshTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -19,12 +19,12 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
}
func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...)
func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
}
func (v *View) updateSpoolerRunSequence(viewName string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName)
func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
if err != nil {
return err
}

View File

@ -88,16 +88,16 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq
return v.latestSequence(tokenTable, instanceID)
}
func (v *View) GetLatestTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(tokenTable, instanceIDs...)
func (v *View) GetLatestTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(tokenTable, instanceIDs)
}
func (v *View) ProcessedTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(tokenTable, event)
}
func (v *View) UpdateTokenSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(tokenTable)
func (v *View) UpdateTokenSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(tokenTable, instanceIDs)
}
func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -193,16 +193,16 @@ func (v *View) GetLatestUserSequence(instanceID string) (*repository.CurrentSequ
return v.latestSequence(userTable, instanceID)
}
func (v *View) GetLatestUserSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userTable, instanceIDs...)
func (v *View) GetLatestUserSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userTable, instanceIDs)
}
func (v *View) ProcessedUserSequence(event *models.Event) error {
return v.saveCurrentSequence(userTable, event)
}
func (v *View) UpdateUserSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(userTable)
func (v *View) UpdateUserSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(userTable, instanceIDs)
}
func (v *View) GetLatestUserFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -68,16 +68,16 @@ func (v *View) GetLatestUserSessionSequence(instanceID string) (*repository.Curr
return v.latestSequence(userSessionTable, instanceID)
}
func (v *View) GetLatestUserSessionSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userSessionTable, instanceIDs...)
func (v *View) GetLatestUserSessionSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userSessionTable, instanceIDs)
}
func (v *View) ProcessedUserSessionSequence(event *models.Event) error {
return v.saveCurrentSequence(userSessionTable, event)
}
func (v *View) UpdateUserSessionSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(userSessionTable)
func (v *View) UpdateUserSessionSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(userSessionTable, instanceIDs)
}
func (v *View) GetLatestUserSessionFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -1,8 +1,6 @@
package view
import (
"time"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/view/repository"
)
@ -18,21 +16,3 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error {
func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
}
func (v *View) latestSequences(viewName string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName)
}
func (v *View) updateSpoolerRunSequence(viewName string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName)
if err != nil {
return err
}
for _, currentSequence := range currentSequences {
if currentSequence.ViewName == "" {
currentSequence.ViewName = viewName
}
currentSequence.LastSuccessfulSpoolerRun = time.Now()
}
return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences)
}

View File

@ -47,15 +47,3 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq
func (v *View) ProcessedTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(tokenTable, event)
}
func (v *View) UpdateTokenSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(tokenTable)
}
func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {
return v.latestFailedEvent(tokenTable, instanceID, sequence)
}
func (v *View) ProcessedTokenFailedEvent(failedEvent *repository.FailedEvent) error {
return v.saveFailedEvent(failedEvent)
}

View File

@ -17,10 +17,10 @@ const (
type Handler interface {
ViewModel() string
EventQuery(instanceIDs ...string) (*models.SearchQuery, error)
EventQuery(instanceIDs []string) (*models.SearchQuery, error)
Reduce(*models.Event) error
OnError(event *models.Event, err error) error
OnSuccess() error
OnSuccess(instanceIDs []string) error
MinimumCycleDuration() time.Duration
LockDuration() time.Duration
QueryLimit() uint64
@ -32,7 +32,7 @@ type Handler interface {
Subscription() *v1.Subscription
}
func ReduceEvent(handler Handler, event *models.Event) {
func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) {
defer func() {
err := recover()
@ -42,7 +42,7 @@ func ReduceEvent(handler Handler, event *models.Event) {
"cause", err,
"stack", string(debug.Stack()),
"sequence", event.Sequence,
"instnace", event.InstanceID,
"instance", event.InstanceID,
).Error("reduce panicked")
}
}()
@ -60,7 +60,7 @@ func ReduceEvent(handler Handler, event *models.Event) {
SearchQuery().
SetLimit(eventLimit)
unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery)
unprocessedEvents, err := handler.Eventstore().FilterEvents(ctx, searchQuery)
if err != nil {
logging.WithFields("sequence", event.Sequence).Warn("filter failed")
return

View File

@ -5,6 +5,7 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/query"
"github.com/zitadel/zitadel/internal/id"
@ -12,6 +13,7 @@ import (
type Config struct {
Eventstore v1.Eventstore
EventstoreV2 *eventstore.Eventstore
Locker Locker
ViewHandlers []query.Handler
ConcurrentWorkers int
@ -31,6 +33,7 @@ func (c *Config) New() *Spooler {
handlers: c.ViewHandlers,
lockID: lockID,
eventstore: c.Eventstore,
esV2: c.EventstoreV2,
locker: c.Locker,
queue: make(chan *spooledHandler, len(c.ViewHandlers)),
workers: c.ConcurrentWorkers,

View File

@ -9,6 +9,8 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/eventstore/v1/query"
@ -16,13 +18,19 @@ import (
"github.com/zitadel/zitadel/internal/view/repository"
)
const systemID = "system"
const (
systemID = "system"
schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded")
aggregateType = eventstore.AggregateType("system")
aggregateID = "SYSTEM"
)
type Spooler struct {
handlers []query.Handler
locker Locker
lockID string
eventstore v1.Eventstore
esV2 *eventstore.Eventstore
workers int
queue chan *spooledHandler
concurrentInstances int
@ -37,7 +45,9 @@ type spooledHandler struct {
locker Locker
queuedAt time.Time
eventstore v1.Eventstore
esV2 *eventstore.Eventstore
concurrentInstances int
succeededOnce bool
}
func (s *Spooler) Start() {
@ -57,7 +67,7 @@ func (s *Spooler) Start() {
}
go func() {
for _, handler := range s.handlers {
s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, concurrentInstances: s.concurrentInstances}
s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, esV2: s.esV2, concurrentInstances: s.concurrentInstances}
}
}()
}
@ -68,6 +78,32 @@ func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) {
queue <- task
}
func (s *spooledHandler) hasSucceededOnce(ctx context.Context) (bool, error) {
events, err := s.esV2.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
AggregateTypes(aggregateType).
AggregateIDs(aggregateID).
EventTypes(schedulerSucceeded).
EventData(map[string]interface{}{
"name": s.ViewModel(),
}).
Builder(),
)
return len(events) > 0 && err == nil, err
}
func (s *spooledHandler) setSucceededOnce(ctx context.Context) error {
_, err := s.esV2.Push(ctx, &handler.ProjectionSucceededEvent{
BaseEvent: *eventstore.NewBaseEventForPush(ctx,
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
schedulerSucceeded,
),
Name: s.ViewModel(),
})
s.succeededOnce = err == nil
return err
}
func (s *spooledHandler) load(workerID string) {
errs := make(chan error)
defer func() {
@ -86,8 +122,24 @@ func (s *spooledHandler) load(workerID string) {
hasLocked := s.lock(ctx, errs, workerID)
if <-hasLocked {
if !s.succeededOnce {
var err error
s.succeededOnce, err = s.hasSucceededOnce(ctx)
if err != nil {
logging.WithFields("view", s.ViewModel()).OnError(err).Warn("initial lock failed for first schedule")
errs <- err
return
}
}
instanceIDQuery := models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("")
for {
ids, err := s.eventstore.InstanceIDs(ctx, models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("").SearchQuery())
if s.succeededOnce {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
instanceIDQuery = instanceIDQuery.CreationDateNewerFilter(time.Now().Add(-2 * s.MinimumCycleDuration()))
}
ids, err := s.eventstore.InstanceIDs(ctx, instanceIDQuery.SearchQuery())
if err != nil {
errs <- err
break
@ -97,12 +149,16 @@ func (s *spooledHandler) load(workerID string) {
if max > len(ids) {
max = len(ids)
}
err = s.processInstances(ctx, workerID, ids[i:max]...)
err = s.processInstances(ctx, workerID, ids[i:max])
if err != nil {
errs <- err
}
}
if ctx.Err() == nil {
if !s.succeededOnce {
err = s.setSucceededOnce(ctx)
logging.WithFields("view", s.ViewModel()).OnError(err).Warn("unable to push first schedule succeeded")
}
errs <- nil
}
break
@ -111,16 +167,20 @@ func (s *spooledHandler) load(workerID string) {
<-ctx.Done()
}
func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids ...string) error {
func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids []string) error {
for {
events, err := s.query(ctx, ids...)
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
events, err := s.query(processCtx, ids)
if err != nil {
cancel()
return err
}
if len(events) == 0 {
cancel()
return nil
}
err = s.process(ctx, events, workerID)
err = s.process(processCtx, events, workerID, ids)
cancel()
if err != nil {
return err
}
@ -139,7 +199,7 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str
}
}
func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error {
func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string, instanceIDs []string) error {
for i, event := range events {
select {
case <-ctx.Done():
@ -152,17 +212,17 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo
continue
}
time.Sleep(100 * time.Millisecond)
return s.process(ctx, events[i:], workerID)
return s.process(ctx, events[i:], workerID, instanceIDs)
}
}
}
err := s.OnSuccess()
err := s.OnSuccess(instanceIDs)
logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func")
return err
}
func (s *spooledHandler) query(ctx context.Context, instanceIDs ...string) ([]*models.Event, error) {
query, err := s.EventQuery(instanceIDs...)
func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) {
query, err := s.EventQuery(instanceIDs)
if err != nil {
return nil, err
}
@ -227,6 +287,6 @@ func HandleError(event *models.Event, failedErr error,
return failedErr
}
func HandleSuccess(updateSpoolerRunTimestamp func() error) error {
return updateSpoolerRunTimestamp()
func HandleSuccess(updateSpoolerRunTimestamp func([]string) error, instanceIDs []string) error {
return updateSpoolerRunTimestamp(instanceIDs)
}

View File

@ -51,7 +51,7 @@ func (h *testHandler) Subscription() *v1.Subscription {
return nil
}
func (h *testHandler) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) {
func (h *testHandler) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
if h.queryError != nil {
return nil, h.queryError
}
@ -71,7 +71,7 @@ func (h *testHandler) OnError(event *models.Event, err error) error {
return err
}
func (h *testHandler) OnSuccess() error {
func (h *testHandler) OnSuccess([]string) error {
return nil
}
@ -129,6 +129,7 @@ func TestSpooler_process(t *testing.T) {
type args struct {
timeout time.Duration
events []*models.Event
instanceIDs []string
}
tests := []struct {
name string
@ -184,7 +185,7 @@ func TestSpooler_process(t *testing.T) {
start = time.Now()
}
if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr {
if err := s.process(ctx, tt.args.events, "test", tt.args.instanceIDs); (err != nil) != tt.wantErr {
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.fields.currentHandler.maxErrCount != tt.wantRetries {

View File

@ -36,8 +36,13 @@ func TokensByUserID(db *gorm.DB, table, userID, instanceID string) ([]*usr_model
Method: domain.SearchMethodEquals,
Value: instanceID,
}
expirationQuery := &model.TokenSearchQuery{
Key: model.TokenSearchKeyExpiration,
Method: domain.SearchMethodGreaterThan,
Value: "now()",
}
query := repository.PrepareSearchQuery(table, usr_model.TokenSearchRequest{
Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery},
Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery, expirationQuery},
})
_, err := query(db, &tokens)
return tokens, err

View File

@ -169,7 +169,7 @@ func LatestSequence(db *gorm.DB, table, viewName, instanceID string) (*CurrentSe
return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName)
}
func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs ...string) ([]*CurrentSequence, error) {
func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs []string) ([]*CurrentSequence, error) {
searchQueries := []sequenceSearchQuery{
{key: sequenceSearchKey(SequenceSearchKeyViewName), value: viewName, method: domain.SearchMethodEquals},
}