2020-04-29 10:11:13 +00:00
package spooler
import (
"context"
2022-07-22 10:08:39 +00:00
"runtime/debug"
2020-07-28 07:42:21 +00:00
"strconv"
2020-09-18 11:39:28 +00:00
"sync"
2020-12-21 17:42:34 +00:00
"time"
2020-07-02 06:08:55 +00:00
2022-04-26 23:01:45 +00:00
"github.com/zitadel/logging"
2022-03-15 06:19:02 +00:00
2022-11-22 06:36:48 +00:00
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
2022-04-26 23:01:45 +00:00
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/eventstore/v1/query"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/view/repository"
2020-04-29 10:11:13 +00:00
)
2022-11-22 06:36:48 +00:00
const (
systemID = "system"
schedulerSucceeded = eventstore . EventType ( "system.projections.scheduler.succeeded" )
aggregateType = eventstore . AggregateType ( "system" )
aggregateID = "SYSTEM"
)
2022-04-19 06:26:12 +00:00
2020-04-29 10:11:13 +00:00
type Spooler struct {
2022-07-22 10:08:39 +00:00
handlers [ ] query . Handler
locker Locker
lockID string
eventstore v1 . Eventstore
2022-11-22 06:36:48 +00:00
esV2 * eventstore . Eventstore
2022-07-22 10:08:39 +00:00
workers int
queue chan * spooledHandler
concurrentInstances int
2020-04-29 10:11:13 +00:00
}
type Locker interface {
2022-04-19 06:26:12 +00:00
Renew ( lockerID , viewModel , instanceID string , waitTime time . Duration ) error
2020-04-29 10:11:13 +00:00
}
type spooledHandler struct {
2020-07-28 07:42:21 +00:00
query . Handler
2022-07-22 10:08:39 +00:00
locker Locker
queuedAt time . Time
eventstore v1 . Eventstore
2022-11-22 06:36:48 +00:00
esV2 * eventstore . Eventstore
2022-07-22 10:08:39 +00:00
concurrentInstances int
2022-11-22 06:36:48 +00:00
succeededOnce bool
2020-04-29 10:11:13 +00:00
}
func ( s * Spooler ) Start ( ) {
2022-03-15 06:19:02 +00:00
defer logging . WithFields ( "lockerID" , s . lockID , "workers" , s . workers ) . Info ( "spooler started" )
2020-07-28 07:42:21 +00:00
if s . workers < 1 {
2020-04-29 10:11:13 +00:00
return
}
2020-07-28 07:42:21 +00:00
for i := 0 ; i < s . workers ; i ++ {
go func ( workerIdx int ) {
workerID := s . lockID + "--" + strconv . Itoa ( workerIdx )
for task := range s . queue {
2020-09-18 11:39:28 +00:00
go requeueTask ( task , s . queue )
2020-07-28 07:42:21 +00:00
task . load ( workerID )
2020-04-29 10:11:13 +00:00
}
2020-07-28 07:42:21 +00:00
} ( i )
2020-04-29 10:11:13 +00:00
}
2020-11-27 14:32:26 +00:00
go func ( ) {
for _ , handler := range s . handlers {
2022-11-22 06:36:48 +00:00
s . queue <- & spooledHandler { Handler : handler , locker : s . locker , queuedAt : time . Now ( ) , eventstore : s . eventstore , esV2 : s . esV2 , concurrentInstances : s . concurrentInstances }
2020-11-27 14:32:26 +00:00
}
} ( )
2020-04-29 10:11:13 +00:00
}
2020-09-18 11:39:28 +00:00
func requeueTask ( task * spooledHandler , queue chan <- * spooledHandler ) {
time . Sleep ( task . MinimumCycleDuration ( ) - time . Since ( task . queuedAt ) )
task . queuedAt = time . Now ( )
queue <- task
}
2022-11-22 06:36:48 +00:00
func ( s * spooledHandler ) hasSucceededOnce ( ctx context . Context ) ( bool , error ) {
events , err := s . esV2 . Filter ( ctx , eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
AddQuery ( ) .
AggregateTypes ( aggregateType ) .
AggregateIDs ( aggregateID ) .
EventTypes ( schedulerSucceeded ) .
EventData ( map [ string ] interface { } {
"name" : s . ViewModel ( ) ,
} ) .
Builder ( ) ,
)
return len ( events ) > 0 && err == nil , err
}
func ( s * spooledHandler ) setSucceededOnce ( ctx context . Context ) error {
_ , err := s . esV2 . Push ( ctx , & handler . ProjectionSucceededEvent {
BaseEvent : * eventstore . NewBaseEventForPush ( ctx ,
eventstore . NewAggregate ( ctx , aggregateID , aggregateType , "v1" ) ,
schedulerSucceeded ,
) ,
Name : s . ViewModel ( ) ,
} )
s . succeededOnce = err == nil
return err
}
2020-07-28 07:42:21 +00:00
func ( s * spooledHandler ) load ( workerID string ) {
2020-04-29 10:11:13 +00:00
errs := make ( chan error )
2021-07-06 11:36:35 +00:00
defer func ( ) {
close ( errs )
err := recover ( )
if err != nil {
2022-08-31 07:52:43 +00:00
logging . WithFields (
"cause" , err ,
"stack" , string ( debug . Stack ( ) ) ,
) . Error ( "reduce panicked" )
2021-07-06 11:36:35 +00:00
}
} ( )
2020-07-28 07:42:21 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
go s . awaitError ( cancel , errs , workerID )
hasLocked := s . lock ( ctx , errs , workerID )
2020-04-29 10:11:13 +00:00
if <- hasLocked {
2022-11-22 06:36:48 +00:00
if ! s . succeededOnce {
var err error
s . succeededOnce , err = s . hasSucceededOnce ( ctx )
if err != nil {
2023-04-25 17:20:59 +00:00
logging . WithFields ( "view" , s . ViewModel ( ) ) . OnError ( err ) . Debug ( "initial lock failed for first schedule" )
2022-11-22 06:36:48 +00:00
errs <- err
return
}
}
instanceIDQuery := models . NewSearchQuery ( ) . SetColumn ( models . Columns_InstanceIDs ) . AddQuery ( ) . ExcludedInstanceIDsFilter ( "" )
2020-12-21 17:42:34 +00:00
for {
2022-11-22 06:36:48 +00:00
if s . succeededOnce {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
instanceIDQuery = instanceIDQuery . CreationDateNewerFilter ( time . Now ( ) . Add ( - 2 * s . MinimumCycleDuration ( ) ) )
}
ids , err := s . eventstore . InstanceIDs ( ctx , instanceIDQuery . SearchQuery ( ) )
2020-12-21 17:42:34 +00:00
if err != nil {
errs <- err
break
}
2022-07-22 10:08:39 +00:00
for i := 0 ; i < len ( ids ) ; i = i + s . concurrentInstances {
max := i + s . concurrentInstances
if max > len ( ids ) {
max = len ( ids )
2020-12-21 17:42:34 +00:00
}
2022-11-22 06:36:48 +00:00
err = s . processInstances ( ctx , workerID , ids [ i : max ] )
2022-07-22 10:08:39 +00:00
if err != nil {
errs <- err
}
}
if ctx . Err ( ) == nil {
2022-11-22 06:36:48 +00:00
if ! s . succeededOnce {
err = s . setSucceededOnce ( ctx )
logging . WithFields ( "view" , s . ViewModel ( ) ) . OnError ( err ) . Warn ( "unable to push first schedule succeeded" )
}
2022-07-22 10:08:39 +00:00
errs <- nil
2020-12-21 17:42:34 +00:00
}
2022-07-22 10:08:39 +00:00
break
2020-04-29 10:11:13 +00:00
}
}
<- ctx . Done ( )
}
2022-11-22 06:36:48 +00:00
func ( s * spooledHandler ) processInstances ( ctx context . Context , workerID string , ids [ ] string ) error {
2022-07-22 10:08:39 +00:00
for {
2022-11-22 06:36:48 +00:00
processCtx , cancel := context . WithTimeout ( ctx , 30 * time . Second )
events , err := s . query ( processCtx , ids )
2022-07-22 10:08:39 +00:00
if err != nil {
2022-11-22 06:36:48 +00:00
cancel ( )
2022-07-22 10:08:39 +00:00
return err
}
if len ( events ) == 0 {
2022-11-22 06:36:48 +00:00
cancel ( )
2022-07-22 10:08:39 +00:00
return nil
}
2022-11-22 06:36:48 +00:00
err = s . process ( processCtx , events , workerID , ids )
cancel ( )
2022-07-22 10:08:39 +00:00
if err != nil {
return err
}
if uint64 ( len ( events ) ) < s . QueryLimit ( ) {
// no more events to process
return nil
}
}
}
2020-07-28 07:42:21 +00:00
func ( s * spooledHandler ) awaitError ( cancel func ( ) , errs chan error , workerID string ) {
2020-04-29 10:11:13 +00:00
select {
case err := <- errs :
cancel ( )
2022-11-18 12:49:38 +00:00
logging . OnError ( err ) . WithField ( "view" , s . ViewModel ( ) ) . WithField ( "worker" , workerID ) . Debug ( "load canceled" )
2020-04-29 10:11:13 +00:00
}
}
2022-11-22 06:36:48 +00:00
func ( s * spooledHandler ) process ( ctx context . Context , events [ ] * models . Event , workerID string , instanceIDs [ ] string ) error {
2020-12-21 17:42:34 +00:00
for i , event := range events {
2020-04-29 10:11:13 +00:00
select {
case <- ctx . Done ( ) :
2022-03-15 06:19:02 +00:00
logging . WithFields ( "view" , s . ViewModel ( ) , "worker" , workerID , "traceID" , tracing . TraceIDFromCtx ( ctx ) ) . Debug ( "context canceled" )
2020-04-29 10:11:13 +00:00
return nil
default :
2020-06-23 12:47:47 +00:00
if err := s . Reduce ( event ) ; err != nil {
2020-12-21 17:42:34 +00:00
err = s . OnError ( event , err )
if err == nil {
continue
}
time . Sleep ( 100 * time . Millisecond )
2022-11-22 06:36:48 +00:00
return s . process ( ctx , events [ i : ] , workerID , instanceIDs )
2020-04-29 10:11:13 +00:00
}
}
}
2022-11-22 06:36:48 +00:00
err := s . OnSuccess ( instanceIDs )
2022-03-15 06:19:02 +00:00
logging . WithFields ( "view" , s . ViewModel ( ) , "worker" , workerID , "traceID" , tracing . TraceIDFromCtx ( ctx ) ) . OnError ( err ) . Warn ( "could not process on success func" )
2020-12-02 07:50:59 +00:00
return err
2020-04-29 10:11:13 +00:00
}
2022-11-22 06:36:48 +00:00
func ( s * spooledHandler ) query ( ctx context . Context , instanceIDs [ ] string ) ( [ ] * models . Event , error ) {
query , err := s . EventQuery ( instanceIDs )
2020-04-29 10:11:13 +00:00
if err != nil {
return nil , err
}
2020-07-28 07:42:21 +00:00
query . Limit = s . QueryLimit ( )
2020-04-29 10:11:13 +00:00
return s . eventstore . FilterEvents ( ctx , query )
}
2022-08-31 07:52:43 +00:00
// lock ensures the lock on the database.
2020-09-18 11:39:28 +00:00
// the returned channel will be closed if ctx is done or an error occured durring lock
2020-07-28 07:42:21 +00:00
func ( s * spooledHandler ) lock ( ctx context . Context , errs chan <- error , workerID string ) chan bool {
2020-04-29 10:11:13 +00:00
renewTimer := time . After ( 0 )
2020-07-28 07:42:21 +00:00
locked := make ( chan bool )
2020-04-29 10:11:13 +00:00
go func ( locked chan bool ) {
2020-09-18 11:39:28 +00:00
var firstLock sync . Once
defer close ( locked )
2020-04-29 10:11:13 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case <- renewTimer :
2022-04-19 06:26:12 +00:00
err := s . locker . Renew ( workerID , s . ViewModel ( ) , systemID , s . LockDuration ( ) )
2020-09-18 11:39:28 +00:00
firstLock . Do ( func ( ) {
locked <- err == nil
} )
2020-04-29 10:11:13 +00:00
if err == nil {
2020-12-22 11:27:55 +00:00
renewTimer = time . After ( s . LockDuration ( ) )
2020-04-29 10:11:13 +00:00
continue
}
if ctx . Err ( ) == nil {
errs <- err
}
return
}
}
} ( locked )
return locked
}
2020-09-18 11:39:28 +00:00
func HandleError ( event * models . Event , failedErr error ,
2022-04-19 06:26:12 +00:00
latestFailedEvent func ( sequence uint64 , instanceID string ) ( * repository . FailedEvent , error ) ,
2020-09-18 11:39:28 +00:00
processFailedEvent func ( * repository . FailedEvent ) error ,
2020-12-21 17:42:34 +00:00
processSequence func ( * models . Event ) error ,
errorCountUntilSkip uint64 ) error {
2022-04-19 06:26:12 +00:00
failedEvent , err := latestFailedEvent ( event . Sequence , event . InstanceID )
2020-09-18 11:39:28 +00:00
if err != nil {
return err
}
failedEvent . FailureCount ++
failedEvent . ErrMsg = failedErr . Error ( )
2022-04-19 06:26:12 +00:00
failedEvent . InstanceID = event . InstanceID
2022-11-18 12:49:38 +00:00
failedEvent . LastFailed = time . Now ( )
2020-09-18 11:39:28 +00:00
err = processFailedEvent ( failedEvent )
if err != nil {
return err
}
if errorCountUntilSkip <= failedEvent . FailureCount {
2020-12-18 15:47:45 +00:00
return processSequence ( event )
2020-09-18 11:39:28 +00:00
}
2020-12-21 17:42:34 +00:00
return failedErr
2020-09-18 11:39:28 +00:00
}
2020-12-02 07:50:59 +00:00
2022-11-22 06:36:48 +00:00
func HandleSuccess ( updateSpoolerRunTimestamp func ( [ ] string ) error , instanceIDs [ ] string ) error {
return updateSpoolerRunTimestamp ( instanceIDs )
2020-12-02 07:50:59 +00:00
}