2024-05-30 09:35:30 +00:00
package mirror
import (
"context"
"database/sql"
"net/http"
"sync"
"time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/cmd/encryption"
"github.com/zitadel/zitadel/cmd/key"
"github.com/zitadel/zitadel/cmd/tls"
admin_es "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing"
admin_handler "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler"
admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
internal_authz "github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/oidc"
"github.com/zitadel/zitadel/internal/api/ui/login"
auth_es "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing"
auth_handler "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler"
auth_view "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/authz"
authz_es "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/eventstore"
2024-11-04 10:44:51 +00:00
"github.com/zitadel/zitadel/internal/cache/connector"
2024-05-30 09:35:30 +00:00
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/config/systemdefaults"
crypto_db "github.com/zitadel/zitadel/internal/crypto/database"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
"github.com/zitadel/zitadel/internal/i18n"
"github.com/zitadel/zitadel/internal/id"
"github.com/zitadel/zitadel/internal/notification"
"github.com/zitadel/zitadel/internal/notification/handlers"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
static_config "github.com/zitadel/zitadel/internal/static/config"
es_v4 "github.com/zitadel/zitadel/internal/v2/eventstore"
es_v4_pg "github.com/zitadel/zitadel/internal/v2/eventstore/postgres"
"github.com/zitadel/zitadel/internal/webauthn"
)
func projectionsCmd ( ) * cobra . Command {
cmd := & cobra . Command {
Use : "projections" ,
Short : "calls the projections synchronously" ,
Run : func ( cmd * cobra . Command , args [ ] string ) {
config := mustNewProjectionsConfig ( viper . GetViper ( ) )
masterKey , err := key . MasterKey ( cmd )
logging . OnError ( err ) . Fatal ( "unable to read master key" )
projections ( cmd . Context ( ) , config , masterKey )
} ,
}
migrateProjectionsFlags ( cmd )
return cmd
}
type ProjectionsConfig struct {
Destination database . Config
Projections projection . Config
feat(notification): use event worker pool (#8962)
# Which Problems Are Solved
The current handling of notification follows the same pattern as all
other projections:
Created events are handled sequentially (based on "position") by a
handler. During the process, a lot of information is aggregated (user,
texts, templates, ...).
This leads to back pressure on the projection since the handling of
events might take longer than the time before a new event (to be
handled) is created.
# How the Problems Are Solved
- The current user notification handler creates separate notification
events based on the user / session events.
- These events contain all the present and required information
including the userID.
- These notification events get processed by notification workers, which
gather the necessary information (recipient address, texts, templates)
to send out these notifications.
- If a notification fails, a retry event is created based on the current
notification request including the current state of the user (this
prevents race conditions, where a user is changed in the meantime and
the notification already gets the new state).
- The retry event will be handled after a backoff delay. This delay
increases with every attempt.
- If the configured amount of attempts is reached or the message expired
(based on config), a cancel event is created, letting the workers know,
the notification must no longer be handled.
- In case of successful send, a sent event is created for the
notification aggregate and the existing "sent" events for the user /
session object is stored.
- The following is added to the defaults.yaml to allow configuration of
the notification workers:
```yaml
Notifications:
# The amount of workers processing the notification request events.
# If set to 0, no notification request events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
Workers: 1 # ZITADEL_NOTIFIACATIONS_WORKERS
# The amount of events a single worker will process in a run.
BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT
# Time interval between scheduled notifications for request events
RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY
# The amount of workers processing the notification retry events.
# If set to 0, no notification retry events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS
# Time interval between scheduled notifications for retry events
RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY
# Only instances are projected, for which at least a projection-relevant event exists within the timeframe
# from HandleActiveInstances duration in the past until the projection's current time
# If set to 0 (default), every instance is always considered active
HandleActiveInstances: 0s # ZITADEL_NOTIFIACATIONS_HANDLEACTIVEINSTANCES
# The maximum duration a transaction remains open
# before it spots left folding additional events
# and updates the table.
TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION
# Automatically cancel the notification after the amount of failed attempts
MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS
# Automatically cancel the notification if it cannot be handled within a specific time
MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL
# Failed attempts are retried after a confogired delay (with exponential backoff).
# Set a minimum and maximum delay and a factor for the backoff
MinRetryDelay: 1s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY
MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY
# Any factor below 1 will be set to 1
RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR
```
# Additional Changes
None
# Additional Context
- closes #8931
2024-11-27 15:01:17 +00:00
Notifications handlers . WorkerConfig
2024-05-30 09:35:30 +00:00
EncryptionKeys * encryption . EncryptionKeyConfig
SystemAPIUsers map [ string ] * internal_authz . SystemAPIUser
Eventstore * eventstore . Config
2024-11-04 10:44:51 +00:00
Caches * connector . CachesConfig
2024-05-30 09:35:30 +00:00
Admin admin_es . Config
Auth auth_es . Config
Log * logging . Config
Machine * id . Config
ExternalPort uint16
ExternalDomain string
ExternalSecure bool
InternalAuthZ internal_authz . Config
SystemDefaults systemdefaults . SystemDefaults
Telemetry * handlers . TelemetryPusherConfig
Login login . Config
OIDC oidc . Config
WebAuthNName string
DefaultInstance command . InstanceSetup
AssetStorage static_config . AssetStorageConfig
}
func migrateProjectionsFlags ( cmd * cobra . Command ) {
key . AddMasterKeyFlag ( cmd )
tls . AddTLSModeFlag ( cmd )
}
func projections (
ctx context . Context ,
config * ProjectionsConfig ,
masterKey string ,
) {
start := time . Now ( )
client , err := database . Connect ( config . Destination , false , dialect . DBPurposeQuery )
logging . OnError ( err ) . Fatal ( "unable to connect to database" )
keyStorage , err := crypto_db . NewKeyStorage ( client , masterKey )
logging . OnError ( err ) . Fatal ( "cannot start key storage" )
keys , err := encryption . EnsureEncryptionKeys ( ctx , config . EncryptionKeys , keyStorage )
logging . OnError ( err ) . Fatal ( "unable to read encryption keys" )
staticStorage , err := config . AssetStorage . NewStorage ( client . DB )
logging . OnError ( err ) . Fatal ( "unable create static storage" )
config . Eventstore . Querier = old_es . NewCRDB ( client )
esPusherDBClient , err := database . Connect ( config . Destination , false , dialect . DBPurposeEventPusher )
logging . OnError ( err ) . Fatal ( "unable to connect eventstore push client" )
config . Eventstore . Pusher = new_es . NewEventstore ( esPusherDBClient )
es := eventstore . NewEventstore ( config . Eventstore )
esV4 := es_v4 . NewEventstoreFromOne ( es_v4_pg . New ( client , & es_v4_pg . Config {
MaxRetries : config . Eventstore . MaxRetries ,
} ) )
sessionTokenVerifier := internal_authz . SessionTokenVerifier ( keys . OIDC )
2024-11-04 10:44:51 +00:00
cacheConnectors , err := connector . StartConnectors ( config . Caches , client )
logging . OnError ( err ) . Fatal ( "unable to start caches" )
2024-05-30 09:35:30 +00:00
queries , err := query . StartQueries (
ctx ,
es ,
esV4 . Querier ,
client ,
client ,
2024-11-04 10:44:51 +00:00
cacheConnectors ,
2024-05-30 09:35:30 +00:00
config . Projections ,
config . SystemDefaults ,
keys . IDPConfig ,
keys . OTP ,
keys . OIDC ,
keys . SAML ,
2024-11-28 10:06:52 +00:00
keys . Target ,
2024-05-30 09:35:30 +00:00
config . InternalAuthZ . RolePermissionMappings ,
sessionTokenVerifier ,
func ( q * query . Queries ) domain . PermissionCheck {
return func ( ctx context . Context , permission , orgID , resourceID string ) ( err error ) {
return internal_authz . CheckPermission ( ctx , & authz_es . UserMembershipRepo { Queries : q } , config . InternalAuthZ . RolePermissionMappings , permission , orgID , resourceID )
}
} ,
0 ,
config . SystemAPIUsers ,
false ,
)
logging . OnError ( err ) . Fatal ( "unable to start queries" )
authZRepo , err := authz . Start ( queries , es , client , keys . OIDC , config . ExternalSecure )
logging . OnError ( err ) . Fatal ( "unable to start authz repo" )
webAuthNConfig := & webauthn . Config {
DisplayName : config . WebAuthNName ,
ExternalSecure : config . ExternalSecure ,
}
2024-11-04 10:44:51 +00:00
commands , err := command . StartCommands ( ctx ,
2024-05-30 09:35:30 +00:00
es ,
2024-11-04 10:44:51 +00:00
cacheConnectors ,
2024-05-30 09:35:30 +00:00
config . SystemDefaults ,
config . InternalAuthZ . RolePermissionMappings ,
staticStorage ,
webAuthNConfig ,
config . ExternalDomain ,
config . ExternalSecure ,
config . ExternalPort ,
keys . IDPConfig ,
keys . OTP ,
keys . SMTP ,
keys . SMS ,
keys . User ,
keys . DomainVerification ,
keys . OIDC ,
keys . SAML ,
2024-11-28 10:06:52 +00:00
keys . Target ,
2024-05-30 09:35:30 +00:00
& http . Client { } ,
func ( ctx context . Context , permission , orgID , resourceID string ) ( err error ) {
return internal_authz . CheckPermission ( ctx , authZRepo , config . InternalAuthZ . RolePermissionMappings , permission , orgID , resourceID )
} ,
sessionTokenVerifier ,
config . OIDC . DefaultAccessTokenLifetime ,
config . OIDC . DefaultRefreshTokenExpiration ,
config . OIDC . DefaultRefreshTokenIdleExpiration ,
config . DefaultInstance . SecretGenerators ,
)
logging . OnError ( err ) . Fatal ( "unable to start commands" )
err = projection . Create ( ctx , client , es , config . Projections , keys . OIDC , keys . SAML , config . SystemAPIUsers )
logging . OnError ( err ) . Fatal ( "unable to start projections" )
i18n . MustLoadSupportedLanguagesFromDir ( )
notification . Register (
ctx ,
config . Projections . Customizations [ "notifications" ] ,
config . Projections . Customizations [ "notificationsquotas" ] ,
2024-10-31 14:57:17 +00:00
config . Projections . Customizations [ "backchannel" ] ,
2024-05-30 09:35:30 +00:00
config . Projections . Customizations [ "telemetry" ] ,
feat(notification): use event worker pool (#8962)
# Which Problems Are Solved
The current handling of notification follows the same pattern as all
other projections:
Created events are handled sequentially (based on "position") by a
handler. During the process, a lot of information is aggregated (user,
texts, templates, ...).
This leads to back pressure on the projection since the handling of
events might take longer than the time before a new event (to be
handled) is created.
# How the Problems Are Solved
- The current user notification handler creates separate notification
events based on the user / session events.
- These events contain all the present and required information
including the userID.
- These notification events get processed by notification workers, which
gather the necessary information (recipient address, texts, templates)
to send out these notifications.
- If a notification fails, a retry event is created based on the current
notification request including the current state of the user (this
prevents race conditions, where a user is changed in the meantime and
the notification already gets the new state).
- The retry event will be handled after a backoff delay. This delay
increases with every attempt.
- If the configured amount of attempts is reached or the message expired
(based on config), a cancel event is created, letting the workers know,
the notification must no longer be handled.
- In case of successful send, a sent event is created for the
notification aggregate and the existing "sent" events for the user /
session object is stored.
- The following is added to the defaults.yaml to allow configuration of
the notification workers:
```yaml
Notifications:
# The amount of workers processing the notification request events.
# If set to 0, no notification request events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
Workers: 1 # ZITADEL_NOTIFIACATIONS_WORKERS
# The amount of events a single worker will process in a run.
BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT
# Time interval between scheduled notifications for request events
RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY
# The amount of workers processing the notification retry events.
# If set to 0, no notification retry events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS
# Time interval between scheduled notifications for retry events
RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY
# Only instances are projected, for which at least a projection-relevant event exists within the timeframe
# from HandleActiveInstances duration in the past until the projection's current time
# If set to 0 (default), every instance is always considered active
HandleActiveInstances: 0s # ZITADEL_NOTIFIACATIONS_HANDLEACTIVEINSTANCES
# The maximum duration a transaction remains open
# before it spots left folding additional events
# and updates the table.
TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION
# Automatically cancel the notification after the amount of failed attempts
MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS
# Automatically cancel the notification if it cannot be handled within a specific time
MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL
# Failed attempts are retried after a confogired delay (with exponential backoff).
# Set a minimum and maximum delay and a factor for the backoff
MinRetryDelay: 1s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY
MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY
# Any factor below 1 will be set to 1
RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR
```
# Additional Changes
None
# Additional Context
- closes #8931
2024-11-27 15:01:17 +00:00
config . Notifications ,
2024-05-30 09:35:30 +00:00
* config . Telemetry ,
config . ExternalDomain ,
config . ExternalPort ,
config . ExternalSecure ,
commands ,
queries ,
es ,
config . Login . DefaultOTPEmailURLV2 ,
config . SystemDefaults . Notifications . FileSystemPath ,
keys . User ,
keys . SMTP ,
keys . SMS ,
2024-10-31 14:57:17 +00:00
keys . OIDC ,
config . OIDC . DefaultBackChannelLogoutLifetime ,
feat(notification): use event worker pool (#8962)
# Which Problems Are Solved
The current handling of notification follows the same pattern as all
other projections:
Created events are handled sequentially (based on "position") by a
handler. During the process, a lot of information is aggregated (user,
texts, templates, ...).
This leads to back pressure on the projection since the handling of
events might take longer than the time before a new event (to be
handled) is created.
# How the Problems Are Solved
- The current user notification handler creates separate notification
events based on the user / session events.
- These events contain all the present and required information
including the userID.
- These notification events get processed by notification workers, which
gather the necessary information (recipient address, texts, templates)
to send out these notifications.
- If a notification fails, a retry event is created based on the current
notification request including the current state of the user (this
prevents race conditions, where a user is changed in the meantime and
the notification already gets the new state).
- The retry event will be handled after a backoff delay. This delay
increases with every attempt.
- If the configured amount of attempts is reached or the message expired
(based on config), a cancel event is created, letting the workers know,
the notification must no longer be handled.
- In case of successful send, a sent event is created for the
notification aggregate and the existing "sent" events for the user /
session object is stored.
- The following is added to the defaults.yaml to allow configuration of
the notification workers:
```yaml
Notifications:
# The amount of workers processing the notification request events.
# If set to 0, no notification request events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
Workers: 1 # ZITADEL_NOTIFIACATIONS_WORKERS
# The amount of events a single worker will process in a run.
BulkLimit: 10 # ZITADEL_NOTIFIACATIONS_BULKLIMIT
# Time interval between scheduled notifications for request events
RequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_REQUEUEEVERY
# The amount of workers processing the notification retry events.
# If set to 0, no notification retry events will be handled. This can be useful when running in
# multi binary / pod setup and allowing only certain executables to process the events.
RetryWorkers: 1 # ZITADEL_NOTIFIACATIONS_RETRYWORKERS
# Time interval between scheduled notifications for retry events
RetryRequeueEvery: 2s # ZITADEL_NOTIFIACATIONS_RETRYREQUEUEEVERY
# Only instances are projected, for which at least a projection-relevant event exists within the timeframe
# from HandleActiveInstances duration in the past until the projection's current time
# If set to 0 (default), every instance is always considered active
HandleActiveInstances: 0s # ZITADEL_NOTIFIACATIONS_HANDLEACTIVEINSTANCES
# The maximum duration a transaction remains open
# before it spots left folding additional events
# and updates the table.
TransactionDuration: 1m # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION
# Automatically cancel the notification after the amount of failed attempts
MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS
# Automatically cancel the notification if it cannot be handled within a specific time
MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL
# Failed attempts are retried after a confogired delay (with exponential backoff).
# Set a minimum and maximum delay and a factor for the backoff
MinRetryDelay: 1s # ZITADEL_NOTIFIACATIONS_MINRETRYDELAY
MaxRetryDelay: 20s # ZITADEL_NOTIFIACATIONS_MAXRETRYDELAY
# Any factor below 1 will be set to 1
RetryDelayFactor: 1.5 # ZITADEL_NOTIFIACATIONS_RETRYDELAYFACTOR
```
# Additional Changes
None
# Additional Context
- closes #8931
2024-11-27 15:01:17 +00:00
client ,
2024-05-30 09:35:30 +00:00
)
config . Auth . Spooler . Client = client
config . Auth . Spooler . Eventstore = es
authView , err := auth_view . StartView ( config . Auth . Spooler . Client , keys . OIDC , queries , config . Auth . Spooler . Eventstore )
logging . OnError ( err ) . Fatal ( "unable to start auth view" )
auth_handler . Register ( ctx , config . Auth . Spooler , authView , queries )
config . Admin . Spooler . Client = client
config . Admin . Spooler . Eventstore = es
adminView , err := admin_view . StartView ( config . Admin . Spooler . Client )
logging . OnError ( err ) . Fatal ( "unable to start admin view" )
admin_handler . Register ( ctx , config . Admin . Spooler , adminView , staticStorage )
instances := make ( chan string , config . Projections . ConcurrentInstances )
failedInstances := make ( chan string )
wg := sync . WaitGroup { }
wg . Add ( int ( config . Projections . ConcurrentInstances ) )
go func ( ) {
for instance := range failedInstances {
logging . WithFields ( "instance" , instance ) . Error ( "projection failed" )
}
} ( )
for i := 0 ; i < int ( config . Projections . ConcurrentInstances ) ; i ++ {
go execProjections ( ctx , instances , failedInstances , & wg )
}
for _ , instance := range queryInstanceIDs ( ctx , client ) {
instances <- instance
}
close ( instances )
wg . Wait ( )
close ( failedInstances )
logging . WithFields ( "took" , time . Since ( start ) ) . Info ( "projections executed" )
}
func execProjections ( ctx context . Context , instances <- chan string , failedInstances chan <- string , wg * sync . WaitGroup ) {
for instance := range instances {
logging . WithFields ( "instance" , instance ) . Info ( "start projections" )
ctx = internal_authz . WithInstanceID ( ctx , instance )
err := projection . ProjectInstance ( ctx )
if err != nil {
logging . WithFields ( "instance" , instance ) . OnError ( err ) . Info ( "trigger failed" )
failedInstances <- instance
continue
}
err = admin_handler . ProjectInstance ( ctx )
if err != nil {
logging . WithFields ( "instance" , instance ) . OnError ( err ) . Info ( "trigger admin handler failed" )
failedInstances <- instance
continue
}
err = auth_handler . ProjectInstance ( ctx )
if err != nil {
logging . WithFields ( "instance" , instance ) . OnError ( err ) . Info ( "trigger auth handler failed" )
failedInstances <- instance
continue
}
err = notification . ProjectInstance ( ctx )
if err != nil {
logging . WithFields ( "instance" , instance ) . OnError ( err ) . Info ( "trigger notification failed" )
failedInstances <- instance
continue
}
logging . WithFields ( "instance" , instance ) . Info ( "projections done" )
}
wg . Done ( )
}
// returns the instance configured by flag
// or all instances which are not removed
func queryInstanceIDs ( ctx context . Context , source * database . DB ) [ ] string {
if len ( instanceIDs ) > 0 {
return instanceIDs
}
instances := [ ] string { }
err := source . QueryContext (
ctx ,
func ( r * sql . Rows ) error {
for r . Next ( ) {
var instance string
if err := r . Scan ( & instance ) ; err != nil {
return err
}
instances = append ( instances , instance )
}
return r . Err ( )
} ,
"SELECT DISTINCT instance_id FROM eventstore.events2 WHERE instance_id <> '' AND aggregate_type = 'instance' AND event_type = 'instance.added' AND instance_id NOT IN (SELECT instance_id FROM eventstore.events2 WHERE instance_id <> '' AND aggregate_type = 'instance' AND event_type = 'instance.removed')" ,
)
logging . OnError ( err ) . Fatal ( "unable to query instances" )
return instances
}