From edaa41903edd4a480c50a8e595b968fb9fe21613 Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Tue, 19 Dec 2023 13:32:08 +0200 Subject: [PATCH] fix(projections): handle every instance by default and randomize start (#7093) --- cmd/defaults.yaml | 31 ++++++++--------------- internal/eventstore/handler/v2/handler.go | 22 +++++++++++++--- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 7615eb1a26..bff039bdf8 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -205,8 +205,8 @@ Projections: BulkLimit: 200 # ZITADEL_PROJECTIONS_BULKLIMIT # Only instances are projected, for which at least a projection-relevant event exists within the timeframe # from HandleActiveInstances duration in the past until the projection's current time - # Defaults to twice the RequeueEvery duration - HandleActiveInstances: 120s # ZITADEL_PROJECTIONS_HANDLEACTIVEINSTANCES + # If set to 0 (default), every instance is always considered active + HandleActiveInstances: 0s # ZITADEL_PROJECTIONS_HANDLEACTIVEINSTANCES # In the Customizations section, all settings from above can be overwritten for each specific projection Customizations: Projects: @@ -229,8 +229,8 @@ Projections: # In case of failed deliveries, ZITADEL retries to send the data points to the configured endpoints, but only for active instances. # An instance is active, as long as there are projected events on the instance, that are not older than the HandleActiveInstances duration. # Delivery guarantee requirements are higher for quota webhooks - # Defaults to 45 days - HandleActiveInstances: 1080h # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_NOTIFICATIONSQUOTAS_HANDLEACTIVEINSTANCES + # If set to 0 (default), every instance is always considered active + HandleActiveInstances: 0s # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_NOTIFICATIONSQUOTAS_HANDLEACTIVEINSTANCES # As quota notification projections don't result in database statements, retries don't have an effect MaxFailureCount: 10 # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_NOTIFICATIONSQUOTAS_MAXFAILURECOUNT # Quota notifications are not so time critical. Setting RequeueEvery every five minutes doesn't annoy the db too much. @@ -244,8 +244,8 @@ Projections: # In case of failed deliveries, ZITADEL retries to send the data points to the configured endpoints, but only for active instances. # An instance is active, as long as there are projected events on the instance, that are not older than the HandleActiveInstances duration. # Telemetry delivery guarantee requirements are a bit higher than normal data projections, as they are not interactively retryable. - # Defaults to 15 days - HandleActiveInstances: 360h # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_TELEMETRY_HANDLEACTIVEINSTANCES + # If set to 0 (default), every instance is always considered active + HandleActiveInstances: 0s # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_TELEMETRY_HANDLEACTIVEINSTANCES # As sending telemetry data doesn't result in database statements, retries don't have any effects MaxFailureCount: 0 # ZITADEL_PROJECTIONS_CUSTOMIZATIONS_TELEMETRY_MAXFAILURECOUNT # Telemetry data synchronization is not time critical. Setting RequeueEvery to 55 minutes doesn't annoy the database too much. @@ -263,8 +263,8 @@ Auth: FailureCountUntilSkip: 5 #ZITADEL_AUTH_SPOOLER_FAILURECOUNTUNTILSKIP # Only instance are projected, for which at least a projection relevant event exists withing the timeframe # from HandleActiveInstances duration in the past until the projections current time - # Defaults to twice the RequeueEvery duration - HandleActiveInstances: 120s #ZITADEL_AUTH_SPOOLER_HANDLEACTIVEINSTANCES + # If set to 0 (default), every instance is always considered active + HandleActiveInstances: 0s #ZITADEL_AUTH_SPOOLER_HANDLEACTIVEINSTANCES Admin: # See Projections.BulkLimit @@ -278,8 +278,8 @@ Admin: FailureCountUntilSkip: 5 # Only instance are projected, for which at least a projection relevant event exists withing the timeframe # from HandleActiveInstances duration in the past until the projections current time - # Defaults to twice the RequeueEvery duration - HandleActiveInstances: 120s + # If set to 0 (default), every instance is always considered active + HandleActiveInstances: 0s UserAgentCookie: Name: zitadel.useragent # ZITADEL_USERAGENTCOOKIE_NAME @@ -367,17 +367,6 @@ Console: SharedMaxAge: 168h # ZITADEL_CONSOLE_LONGCACHE_SHAREDMAXAGE InstanceManagementURL: "" # ZITADEL_CONSOLE_INSTANCEMANAGEMENTURL -Notification: - Repository: - Spooler: - # See Projections.TransactionDuration - TransactionDuration: 10s #ZITADEL_NOTIFICATION_REPOSITORY_SPOOLER_TRANSACTIONDURATION - # See Projections.BulkLimit - BulkLimit: 200 #ZITADEL_NOTIFICATION_REPOSITORY_SPOOLER_BULKLIMIT - # See Projections.MaxFailureCount - FailureCountUntilSkip: 5 #ZITADEL_NOTIFICATION_REPOSITORY_SPOOLER_FAILURECOUNTUNTILSKIP - Handlers: - EncryptionKeys: DomainVerification: EncryptionKeyID: "domainVerificationKey" # ZITADEL_ENCRYPTIONKEYS_DOMAINVERIFICATION_ENCRYPTIONKEYID diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index ada12c4611..fb20ebbb9e 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "math" + "math/rand" "sync" "time" @@ -111,11 +112,19 @@ func (h *Handler) Start(ctx context.Context) { } func (h *Handler) schedule(ctx context.Context) { - // if there was no run before trigger instantly - t := time.NewTimer(0) + // if there was no run before trigger within half a second + start := randomizeStart(0, 0.5) + t := time.NewTimer(start) didInitialize := h.didProjectionInitialize(ctx) if didInitialize { - t.Reset(h.requeueEvery) + if !t.Stop() { + <-t.C + } + // if there was a trigger before, start the projection + // after a second (should generally be after the not initialized projections) + // and its configured `RequeueEvery` + reset := randomizeStart(1, h.requeueEvery.Seconds()) + t.Reset(reset) } for { @@ -157,6 +166,11 @@ func (h *Handler) schedule(ctx context.Context) { } } +func randomizeStart(min, maxSeconds float64) time.Duration { + d := min + rand.Float64()*(maxSeconds-min) + return time.Duration(d*1000) * time.Millisecond +} + func (h *Handler) subscribe(ctx context.Context) { queue := make(chan eventstore.Event, 100) subscription := eventstore.SubscribeEventTypes(queue, h.eventTypes) @@ -213,7 +227,7 @@ func (h *Handler) queryInstances(ctx context.Context, didInitialize bool) ([]str AwaitOpenTransactions(). AllowTimeTravel(). ExcludedInstanceID("") - if didInitialize { + if didInitialize && h.handleActiveInstances > 0 { query = query. CreationDateAfter(h.now().Add(-1 * h.handleActiveInstances)) }