feat: enable handling projections for inactive instances (#5523)

* fix: don't ignore failed events in handlers

* question

* fix retries

* don't instance ids query

* statements can be nil

* make unit tests pass

* add comments

* spool only active instances

* feat(config): handle inactive instances

* customizable HandleInactiveInstances

* test: handling with and w/o inactive instances

* docs: describe projection options

* enable global handling of inactive instances

* accept NowFunc, not Clock interface

* add comment about stringer usage

* remove enum stringer implementations

* fix enum format types

* Update internal/eventstore/repository/mock/repository.mock.impl.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

---------

Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
Elio Bischof 2023-03-27 14:34:01 +02:00 committed by GitHub
parent 41ff0bbc63
commit 62bd606593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 332 additions and 78 deletions

View File

@ -147,16 +147,30 @@ AssetStorage:
MaxAge: 5s
SharedMaxAge: 168h #7d
# The Projections section defines the behaviour for the scheduled and synchronous events projections.
Projections:
# Time interval between scheduled projections
RequeueEvery: 60s
# Time between retried database statements resulting from projected events
RetryFailedAfter: 1s
# Retried execution number of database statements resulting from projected events
MaxFailureCount: 5
# Number of concurrent projection routines. Values of 0 and below are overwritten to 1
ConcurrentInstances: 1
# Limit of returned events per query
BulkLimit: 200
MaxIterators: 1
# If HandleInactiveInstances this is false, only instances are projected,
# for which at least a projection relevant event exists withing the timeframe
# from twice the RequeueEvery time in the past until the projections current time
HandleInactiveInstances: false
# In the Customizations section, all settings from above can be overwritten for each specific projection
Customizations:
projects:
Projects:
BulkLimit: 2000
# The Notifications projection is used for sending emails and SMS to users
Notifications:
# As notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
Auth:
SearchLimit: 1000

View File

@ -76,16 +76,36 @@ Database:
You also might want to configure how [projections](/concepts/eventstore/implementation#projections) are computed. These are the default values:
```yaml
# The Projections section defines the behaviour for the scheduled and synchronous events projections.
Projections:
# Time interval between scheduled projections
RequeueEvery: 60s
# Time between retried database statements resulting from projected events
RetryFailedAfter: 1s
# Retried execution number of database statements resulting from projected events
MaxFailureCount: 5
# Number of concurrent projection routines
ConcurrentInstances: 1
# Limit of returned events per query
BulkLimit: 200
MaxIterators: 1
# If HandleInactiveInstances this is false, only instances are projected,
# for which at least a projection relevant event exists withing the timeframe
# from twice the RequeueEvery time in the past until the projections current time
HandleInactiveInstances: false
# In the Customizations section, all settings from above can be overwritten for each specific projection
Customizations:
projects:
Projects:
BulkLimit: 2000
# The Notifications projection is used for sending emails and SMS to users
Notifications:
# As notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
# The NotificationsQuotas projection is used for calling quota webhooks
NotificationsQuotas:
# Delivery guarantee requirements are probably higher for quota webhooks
HandleInactiveInstances: true
# As quota notification projections don't result in database statements, retries don't have an effect
MaxFailureCount: 0
```
### Manage your Data
@ -143,4 +163,4 @@ DefaultInstance:
## Quotas
If you host ZITADEL as a service,
you might want to [limit usage and/or execute tasks on certain usage units and levels](/self-hosting/manage/quotas).
you might want to [limit usage and/or execute tasks on certain usage units and levels](/self-hosting/manage/quotas).

View File

@ -20,11 +20,12 @@ const (
type ProjectionHandlerConfig struct {
HandlerConfig
ProjectionName string
RequeueEvery time.Duration
RetryFailedAfter time.Duration
Retries uint
ConcurrentInstances uint
ProjectionName string
RequeueEvery time.Duration
RetryFailedAfter time.Duration
Retries uint
ConcurrentInstances uint
HandleInactiveInstances bool
}
// Update updates the projection with the given statements
@ -43,19 +44,24 @@ type Lock func(context.Context, time.Duration, ...string) <-chan error
// Unlock releases the mutex of the projection
type Unlock func(...string) error
// NowFunc makes time.Now() mockable
type NowFunc func() time.Time
type ProjectionHandler struct {
Handler
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
handleInactiveInstances bool
nowFunc NowFunc
}
func NewProjectionHandler(
@ -73,18 +79,20 @@ func NewProjectionHandler(
concurrentInstances = 1
}
h := &ProjectionHandler{
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
handleInactiveInstances: config.HandleInactiveInstances,
nowFunc: time.Now,
}
go func() {
@ -221,10 +229,11 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
}
go h.cancelOnErr(lockCtx, errs, cancelLock)
}
if succeededOnce {
if succeededOnce && !h.handleInactiveInstances {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
query = query.CreationDateAfter(time.Now().Add(-2 * h.requeueAfter))
// This ensures that only instances with recent events on the handler are projected
query = query.CreationDateAfter(h.nowFunc().Add(-2 * h.requeueAfter))
}
ids, err := h.Eventstore.InstanceIDs(ctx, query.Builder())
if err != nil {

View File

@ -11,6 +11,7 @@ import (
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/service"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
es_repo_mock "github.com/zitadel/zitadel/internal/eventstore/repository/mock"
@ -658,17 +659,22 @@ func TestProjection_subscribe(t *testing.T) {
}
func TestProjection_schedule(t *testing.T) {
now := func() time.Time {
return time.Date(2023, 1, 31, 12, 0, 0, 0, time.UTC)
}
type args struct {
ctx context.Context
}
type fields struct {
reduce Reduce
update Update
eventstore func(t *testing.T) *eventstore.Eventstore
triggerProjection *time.Timer
lock *lockMock
unlock *unlockMock
query SearchQuery
reduce Reduce
update Update
eventstore func(t *testing.T) *eventstore.Eventstore
lock *lockMock
unlock *unlockMock
query SearchQuery
handleInactiveInstances bool
}
type want struct {
locksCount int
@ -705,7 +711,7 @@ func TestProjection_schedule(t *testing.T) {
),
)
},
triggerProjection: time.NewTimer(0),
handleInactiveInstances: false,
},
want{
locksCount: 0,
@ -733,7 +739,7 @@ func TestProjection_schedule(t *testing.T) {
),
)
},
triggerProjection: time.NewTimer(0),
handleInactiveInstances: false,
},
want{
locksCount: 0,
@ -756,16 +762,16 @@ func TestProjection_schedule(t *testing.T) {
PreviousAggregateSequence: 5,
InstanceID: "",
Type: "system.projections.scheduler.succeeded",
}).ExpectInstanceIDs("instanceID1"),
}).ExpectInstanceIDs(nil, "instanceID1"),
),
)
},
triggerProjection: time.NewTimer(0),
lock: &lockMock{
errWait: 100 * time.Millisecond,
firstErr: ErrLock,
canceled: make(chan bool, 1),
},
handleInactiveInstances: false,
},
want{
locksCount: 1,
@ -788,18 +794,18 @@ func TestProjection_schedule(t *testing.T) {
PreviousAggregateSequence: 5,
InstanceID: "",
Type: "system.projections.scheduler.succeeded",
}).ExpectInstanceIDs("instanceID1"),
}).ExpectInstanceIDs(nil, "instanceID1"),
),
)
},
triggerProjection: time.NewTimer(0),
lock: &lockMock{
canceled: make(chan bool, 1),
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
query: testQuery(nil, 0, ErrQuery),
unlock: &unlockMock{},
query: testQuery(nil, 0, ErrQuery),
handleInactiveInstances: false,
},
want{
locksCount: 1,
@ -807,6 +813,121 @@ func TestProjection_schedule(t *testing.T) {
unlockCount: 1,
},
},
{
"only active instances are handled",
args{
ctx: context.Background(),
},
fields{
eventstore: func(t *testing.T) *eventstore.Eventstore {
return eventstore.NewEventstore(eventstore.TestConfig(
es_repo_mock.NewRepo(t).
ExpectFilterEvents(&repository.Event{
AggregateType: "system",
Sequence: 6,
PreviousAggregateSequence: 5,
InstanceID: "",
Type: "system.projections.scheduler.succeeded",
}).
ExpectInstanceIDs(
[]*repository.Filter{{
Field: repository.FieldInstanceID,
Operation: repository.OperationNotIn,
Value: database.StringArray{""},
}, {
Field: repository.FieldCreationDate,
Operation: repository.OperationGreater,
Value: now().Add(-2 * time.Hour),
}},
"206626268110651755",
).
ExpectFilterEvents(&repository.Event{
AggregateType: "quota",
Sequence: 6,
PreviousAggregateSequence: 5,
InstanceID: "206626268110651755",
Type: "quota.notificationdue",
}),
))
},
lock: &lockMock{
canceled: make(chan bool, 1),
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
handleInactiveInstances: false,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
query: testQuery(
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
AggregateTypes("test").
Builder(),
2,
nil,
),
},
want{
locksCount: 1,
lockCanceled: false,
unlockCount: 1,
},
},
{
"all instances are handled",
args{
ctx: context.Background(),
},
fields{
eventstore: func(t *testing.T) *eventstore.Eventstore {
return eventstore.NewEventstore(eventstore.TestConfig(
es_repo_mock.NewRepo(t).
ExpectFilterEvents(&repository.Event{
AggregateType: "system",
Sequence: 6,
PreviousAggregateSequence: 5,
InstanceID: "",
Type: "system.projections.scheduler.succeeded",
}).
ExpectInstanceIDs([]*repository.Filter{{
Field: repository.FieldInstanceID,
Operation: repository.OperationNotIn,
Value: database.StringArray{""},
}}, "206626268110651755").
ExpectFilterEvents(&repository.Event{
AggregateType: "quota",
Sequence: 6,
PreviousAggregateSequence: 5,
InstanceID: "206626268110651755",
Type: "quota.notificationdue",
}),
))
},
lock: &lockMock{
canceled: make(chan bool, 1),
firstErr: nil,
errWait: 100 * time.Millisecond,
},
unlock: &unlockMock{},
handleInactiveInstances: true,
reduce: testReduce(newTestStatement("aggregate1", 1, 0)),
update: testUpdate(t, 1, 1, nil),
query: testQuery(
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
AggregateTypes("test").
Builder(),
2,
nil,
),
},
want{
locksCount: 1,
lockCanceled: false,
unlockCount: 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -815,14 +936,17 @@ func TestProjection_schedule(t *testing.T) {
EventQueue: make(chan eventstore.Event, 10),
Eventstore: tt.fields.eventstore(t),
},
reduce: tt.fields.reduce,
update: tt.fields.update,
searchQuery: tt.fields.query,
lock: tt.fields.lock.lock(),
unlock: tt.fields.unlock.unlock(),
triggerProjection: tt.fields.triggerProjection,
requeueAfter: 10 * time.Second,
concurrentInstances: 1,
reduce: tt.fields.reduce,
update: tt.fields.update,
searchQuery: tt.fields.query,
lock: tt.fields.lock.lock(),
unlock: tt.fields.unlock.unlock(),
triggerProjection: time.NewTimer(0), // immediately run an iteration
requeueAfter: time.Hour, // run only one iteration
concurrentInstances: 1,
handleInactiveInstances: tt.fields.handleInactiveInstances,
retries: 0,
nowFunc: now,
}
ctx, cancel := context.WithCancel(tt.args.ctx)
go func() {
@ -831,14 +955,14 @@ func TestProjection_schedule(t *testing.T) {
h.schedule(ctx)
}()
time.Sleep(1 * time.Second)
time.Sleep(time.Second)
cancel()
if tt.fields.lock != nil {
tt.fields.lock.check(t, tt.want.locksCount, tt.want.lockCanceled)
}
if tt.fields.unlock != nil {
tt.fields.unlock.check(t, tt.want.unlockCount)
}
cancel()
})
}
}

View File

@ -29,8 +29,12 @@ func (m *MockRepository) ExpectFilterEventsError(err error) *MockRepository {
return m
}
func (m *MockRepository) ExpectInstanceIDs(instanceIDs ...string) *MockRepository {
m.EXPECT().InstanceIDs(gomock.Any(), gomock.Any()).Return(instanceIDs, nil)
func (m *MockRepository) ExpectInstanceIDs(hasFilters []*repository.Filter, instanceIDs ...string) *MockRepository {
matcher := gomock.Any()
if len(hasFilters) > 0 {
matcher = &filterQueryMatcher{Filters: [][]*repository.Filter{hasFilters}}
}
m.EXPECT().InstanceIDs(gomock.Any(), matcher).Return(instanceIDs, nil)
return m
}

View File

@ -0,0 +1,33 @@
package mock
import (
"encoding/json"
"fmt"
"reflect"
"github.com/golang/mock/gomock"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
var _ gomock.Matcher = (*filterMatcher)(nil)
var _ gomock.GotFormatter = (*filterMatcher)(nil)
type filterMatcher repository.Filter
func (f *filterMatcher) String() string {
jsonValue, err := json.Marshal(f.Value)
if err != nil {
panic(err)
}
return fmt.Sprintf("%d %d (content=%+v,type=%T,json=%s)", f.Field, f.Operation, f.Value, f.Value, string(jsonValue))
}
func (f *filterMatcher) Matches(x interface{}) bool {
other := x.(*repository.Filter)
return f.Field == other.Field && f.Operation == other.Operation && reflect.DeepEqual(f.Value, other.Value)
}
func (f *filterMatcher) Got(got interface{}) string {
return (*filterMatcher)(got.(*repository.Filter)).String()
}

View File

@ -0,0 +1,45 @@
package mock
import (
"fmt"
"strings"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
type filterQueryMatcher repository.SearchQuery
func (f *filterQueryMatcher) String() string {
var filterLists []string
for _, filterSlice := range f.Filters {
var str string
for _, filter := range filterSlice {
str += "," + (*filterMatcher)(filter).String()
}
filterLists = append(filterLists, fmt.Sprintf("[%s]", strings.TrimPrefix(str, ",")))
}
return fmt.Sprintf("Filters: %s", strings.Join(filterLists, " "))
}
func (f *filterQueryMatcher) Matches(x interface{}) bool {
other := x.(*repository.SearchQuery)
if len(f.Filters) != len(other.Filters) {
return false
}
for filterSliceIdx, filterSlice := range f.Filters {
if len(filterSlice) != len(other.Filters[filterSliceIdx]) {
return false
}
for filterIdx, filter := range f.Filters[filterSliceIdx] {
if !(*filterMatcher)(filter).Matches(other.Filters[filterSliceIdx][filterIdx]) {
return false
}
}
}
return true
}
func (f *filterQueryMatcher) Got(got interface{}) string {
return (*filterQueryMatcher)(got.(*repository.SearchQuery)).String()
}

View File

@ -5,19 +5,20 @@ import (
)
type Config struct {
RequeueEvery time.Duration
RetryFailedAfter time.Duration
MaxFailureCount uint
ConcurrentInstances uint
BulkLimit uint64
Customizations map[string]CustomConfig
MaxIterators int
RequeueEvery time.Duration
RetryFailedAfter time.Duration
MaxFailureCount uint
ConcurrentInstances uint
BulkLimit uint64
Customizations map[string]CustomConfig
HandleInactiveInstances bool
}
type CustomConfig struct {
RequeueEvery *time.Duration
RetryFailedAfter *time.Duration
MaxFailureCount *uint
ConcurrentInstances *uint
BulkLimit *uint64
RequeueEvery *time.Duration
RetryFailedAfter *time.Duration
MaxFailureCount *uint
ConcurrentInstances *uint
BulkLimit *uint64
HandleInactiveInstances *bool
}

View File

@ -80,10 +80,11 @@ func Create(ctx context.Context, sqlClient *database.DB, es *eventstore.Eventsto
HandlerConfig: handler.HandlerConfig{
Eventstore: es,
},
RequeueEvery: config.RequeueEvery,
RetryFailedAfter: config.RetryFailedAfter,
Retries: config.MaxFailureCount,
ConcurrentInstances: config.ConcurrentInstances,
RequeueEvery: config.RequeueEvery,
RetryFailedAfter: config.RetryFailedAfter,
Retries: config.MaxFailureCount,
ConcurrentInstances: config.ConcurrentInstances,
HandleInactiveInstances: config.HandleInactiveInstances,
},
Client: sqlClient,
SequenceTable: CurrentSeqTable,
@ -173,6 +174,9 @@ func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomCo
if customConfig.RetryFailedAfter != nil {
config.RetryFailedAfter = *customConfig.RetryFailedAfter
}
if customConfig.HandleInactiveInstances != nil {
config.HandleInactiveInstances = *customConfig.HandleInactiveInstances
}
return config
}