mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:17:32 +00:00
fix: improve startup times by initializing projection tables during setup (#4642)
* fix: improve startup times by initializing projections table during setup * add missing file
This commit is contained in:
@@ -42,8 +42,10 @@ type StatementHandler struct {
|
||||
failureCountStmt string
|
||||
setFailureCountStmt string
|
||||
|
||||
aggregates []eventstore.AggregateType
|
||||
reduces map[eventstore.EventType]handler.Reduce
|
||||
aggregates []eventstore.AggregateType
|
||||
reduces map[eventstore.EventType]handler.Reduce
|
||||
initCheck *handler.Check
|
||||
initialized chan bool
|
||||
|
||||
bulkLimit uint64
|
||||
}
|
||||
@@ -73,19 +75,21 @@ func NewStatementHandler(
|
||||
reduces: reduces,
|
||||
bulkLimit: config.BulkLimit,
|
||||
Locker: NewLocker(config.Client, config.LockTable, config.ProjectionName),
|
||||
initCheck: config.InitCheck,
|
||||
initialized: make(chan bool),
|
||||
}
|
||||
|
||||
initialized := make(chan bool)
|
||||
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, initialized)
|
||||
|
||||
err := h.Init(ctx, initialized, config.InitCheck)
|
||||
logging.OnError(err).WithField("projection", config.ProjectionName).Fatal("unable to initialize projections")
|
||||
|
||||
h.Subscribe(h.aggregates...)
|
||||
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *StatementHandler) Start() {
|
||||
h.initialized <- true
|
||||
close(h.initialized)
|
||||
h.Subscribe(h.aggregates...)
|
||||
}
|
||||
|
||||
func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) {
|
||||
sequences, err := h.currentSequences(ctx, h.client.QueryContext, instanceIDs)
|
||||
if err != nil {
|
||||
|
@@ -186,36 +186,28 @@ type ForeignKey struct {
|
||||
}
|
||||
|
||||
// Init implements handler.Init
|
||||
func (h *StatementHandler) Init(ctx context.Context, initialized chan<- bool, checks ...*handler.Check) error {
|
||||
for _, check := range checks {
|
||||
if check == nil || check.IsNoop() {
|
||||
initialized <- true
|
||||
close(initialized)
|
||||
return nil
|
||||
}
|
||||
tx, err := h.client.BeginTx(ctx, nil)
|
||||
func (h *StatementHandler) Init(ctx context.Context) error {
|
||||
check := h.initCheck
|
||||
if check == nil || check.IsNoop() {
|
||||
return nil
|
||||
}
|
||||
tx, err := h.client.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "CRDB-SAdf2", "begin failed")
|
||||
}
|
||||
for i, execute := range check.Executes {
|
||||
logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("executing check")
|
||||
next, err := execute(h.client, h.ProjectionName)
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "CRDB-SAdf2", "begin failed")
|
||||
}
|
||||
for i, execute := range check.Executes {
|
||||
logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("executing check")
|
||||
next, err := execute(h.client, h.ProjectionName)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
if !next {
|
||||
logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("skipping next check")
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
if !next {
|
||||
logging.WithFields("projection", h.ProjectionName, "execute", i).Debug("skipping next check")
|
||||
break
|
||||
}
|
||||
}
|
||||
initialized <- true
|
||||
close(initialized)
|
||||
return nil
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func NewTableCheck(table *Table, opts ...execOption) *handler.Check {
|
||||
|
@@ -12,7 +12,11 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
const systemID = "system"
|
||||
const (
|
||||
schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded")
|
||||
aggregateType = eventstore.AggregateType("system")
|
||||
aggregateID = "SYSTEM"
|
||||
)
|
||||
|
||||
type ProjectionHandlerConfig struct {
|
||||
HandlerConfig
|
||||
@@ -188,9 +192,35 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
|
||||
}()
|
||||
// flag if projection has been successfully executed at least once since start
|
||||
var succeededOnce bool
|
||||
var err error
|
||||
// get every instance id except empty (system)
|
||||
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).AddQuery().ExcludedInstanceID("")
|
||||
for range h.triggerProjection.C {
|
||||
if !succeededOnce {
|
||||
// (re)check if it has succeeded in the meantime
|
||||
succeededOnce, err = h.hasSucceededOnce(ctx)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "err", err).
|
||||
Error("schedule could not check if projection has already succeeded once")
|
||||
h.triggerProjection.Reset(h.requeueAfter)
|
||||
continue
|
||||
}
|
||||
}
|
||||
lockCtx := ctx
|
||||
var cancelLock context.CancelFunc
|
||||
// if it still has not succeeded, lock the projection for the system
|
||||
// so that only a single scheduler does a first schedule (of every instance)
|
||||
if !succeededOnce {
|
||||
lockCtx, cancelLock = context.WithCancel(ctx)
|
||||
errs := h.lock(lockCtx, h.requeueAfter, "system")
|
||||
if err, ok := <-errs; err != nil || !ok {
|
||||
cancelLock()
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed for first schedule")
|
||||
h.triggerProjection.Reset(h.requeueAfter)
|
||||
continue
|
||||
}
|
||||
go h.cancelOnErr(lockCtx, errs, cancelLock)
|
||||
}
|
||||
if succeededOnce {
|
||||
// since we have at least one successful run, we can restrict it to events not older than
|
||||
// twice the requeue time (just to be sure not to miss an event)
|
||||
@@ -209,32 +239,80 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
|
||||
max = len(ids)
|
||||
}
|
||||
instances := ids[i:max]
|
||||
lockCtx, cancelLock := context.WithCancel(ctx)
|
||||
errs := h.lock(lockCtx, h.requeueAfter, instances...)
|
||||
lockInstanceCtx, cancelInstanceLock := context.WithCancel(lockCtx)
|
||||
errs := h.lock(lockInstanceCtx, h.requeueAfter, instances...)
|
||||
//wait until projection is locked
|
||||
if err, ok := <-errs; err != nil || !ok {
|
||||
cancelLock()
|
||||
cancelInstanceLock()
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed")
|
||||
failed = true
|
||||
continue
|
||||
}
|
||||
go h.cancelOnErr(lockCtx, errs, cancelLock)
|
||||
err = h.Trigger(lockCtx, instances...)
|
||||
go h.cancelOnErr(lockInstanceCtx, errs, cancelInstanceLock)
|
||||
err = h.Trigger(lockInstanceCtx, instances...)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed")
|
||||
failed = true
|
||||
}
|
||||
|
||||
cancelLock()
|
||||
cancelInstanceLock()
|
||||
unlockErr := h.unlock(instances...)
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock")
|
||||
}
|
||||
// if the first schedule did not fail, store that in the eventstore, so we can check on later starts
|
||||
if !succeededOnce {
|
||||
if !failed {
|
||||
err = h.setSucceededOnce(ctx)
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("unable to push first schedule succeeded")
|
||||
}
|
||||
cancelLock()
|
||||
unlockErr := h.unlock("system")
|
||||
logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock first schedule")
|
||||
}
|
||||
// it succeeded at least once if it has succeeded before or if it has succeeded now - not failed ;-)
|
||||
succeededOnce = succeededOnce || !failed
|
||||
h.triggerProjection.Reset(h.requeueAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) hasSucceededOnce(ctx context.Context) (bool, error) {
|
||||
events, err := h.Eventstore.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AddQuery().
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(aggregateID).
|
||||
EventTypes(schedulerSucceeded).
|
||||
EventData(map[string]interface{}{
|
||||
"name": h.ProjectionName,
|
||||
}).
|
||||
Builder(),
|
||||
)
|
||||
return len(events) > 0 && err == nil, err
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) setSucceededOnce(ctx context.Context) error {
|
||||
_, err := h.Eventstore.Push(ctx, &ProjectionSucceededEvent{
|
||||
BaseEvent: *eventstore.NewBaseEventForPush(ctx,
|
||||
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
|
||||
schedulerSucceeded,
|
||||
),
|
||||
Name: h.ProjectionName,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
type ProjectionSucceededEvent struct {
|
||||
eventstore.BaseEvent `json:"-"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func (p *ProjectionSucceededEvent) Data() interface{} {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ProjectionSucceededEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error, cancel func()) {
|
||||
for {
|
||||
select {
|
||||
@@ -248,7 +326,6 @@ func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error,
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -687,6 +687,25 @@ func TestProjection_schedule(t *testing.T) {
|
||||
},
|
||||
want{},
|
||||
},
|
||||
{
|
||||
"filter succeeded once error",
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
},
|
||||
fields{
|
||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||
return eventstore.NewEventstore(
|
||||
es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
|
||||
)
|
||||
},
|
||||
triggerProjection: time.NewTimer(0),
|
||||
},
|
||||
want{
|
||||
locksCount: 0,
|
||||
lockCanceled: false,
|
||||
unlockCount: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
"filter instance ids error",
|
||||
args{
|
||||
@@ -695,7 +714,15 @@ func TestProjection_schedule(t *testing.T) {
|
||||
fields{
|
||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||
return eventstore.NewEventstore(
|
||||
es_repo_mock.NewRepo(t).ExpectInstanceIDsError(ErrFilter),
|
||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||
&repository.Event{
|
||||
AggregateType: "system",
|
||||
Sequence: 6,
|
||||
PreviousAggregateSequence: 5,
|
||||
InstanceID: "",
|
||||
Type: "system.projections.scheduler.succeeded",
|
||||
}).
|
||||
ExpectInstanceIDsError(ErrFilter),
|
||||
)
|
||||
},
|
||||
triggerProjection: time.NewTimer(0),
|
||||
@@ -714,7 +741,14 @@ func TestProjection_schedule(t *testing.T) {
|
||||
fields{
|
||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||
return eventstore.NewEventstore(
|
||||
es_repo_mock.NewRepo(t).ExpectInstanceIDs("instanceID1"),
|
||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||
&repository.Event{
|
||||
AggregateType: "system",
|
||||
Sequence: 6,
|
||||
PreviousAggregateSequence: 5,
|
||||
InstanceID: "",
|
||||
Type: "system.projections.scheduler.succeeded",
|
||||
}).ExpectInstanceIDs("instanceID1"),
|
||||
)
|
||||
},
|
||||
triggerProjection: time.NewTimer(0),
|
||||
@@ -738,7 +772,14 @@ func TestProjection_schedule(t *testing.T) {
|
||||
fields{
|
||||
eventstore: func(t *testing.T) *eventstore.Eventstore {
|
||||
return eventstore.NewEventstore(
|
||||
es_repo_mock.NewRepo(t).ExpectInstanceIDs("instanceID1"),
|
||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||
&repository.Event{
|
||||
AggregateType: "system",
|
||||
Sequence: 6,
|
||||
PreviousAggregateSequence: 5,
|
||||
InstanceID: "",
|
||||
Type: "system.projections.scheduler.succeeded",
|
||||
}).ExpectInstanceIDs("instanceID1"),
|
||||
)
|
||||
},
|
||||
triggerProjection: time.NewTimer(0),
|
||||
|
Reference in New Issue
Block a user