mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-15 20:38:00 +00:00
1d84635836
# Which Problems Are Solved To improve performance a new table and method is implemented on eventstore. The goal of this table is to index searchable fields on command side to use it on command and query side. The table allows to store one primitive value (numeric, text) per row. The eventstore framework is extended by the `Search`-method which allows to search for objects. The `Command`-interface is extended by the `SearchOperations()`-method which does manipulate the the `search`-table. # How the Problems Are Solved This PR adds the capability of improving performance for command and query side by using the `Search`-method of the eventstore instead of using one of the `Filter`-methods. # Open Tasks - [x] Add feature flag - [x] Unit tests - [ ] ~~Benchmarks if needed~~ - [x] Ensure no behavior change - [x] Add setup step to fill table with current data - [x] Add projection which ensures data added between setup and start of the new version are also added to the table # Additional Changes The `Search`-method is currently used by `ProjectGrant`-command side. # Additional Context - Closes https://github.com/zitadel/zitadel/issues/8094
206 lines
5.5 KiB
Go
206 lines
5.5 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgconn"
|
|
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
|
)
|
|
|
|
type FieldHandler struct {
|
|
Handler
|
|
}
|
|
|
|
type fieldProjection struct {
|
|
name string
|
|
}
|
|
|
|
// Name implements Projection.
|
|
func (f *fieldProjection) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Reducers implements Projection.
|
|
func (f *fieldProjection) Reducers() []AggregateReducer {
|
|
return nil
|
|
}
|
|
|
|
var _ Projection = (*fieldProjection)(nil)
|
|
|
|
func NewFieldHandler(config *Config, name string, eventTypes map[eventstore.AggregateType][]eventstore.EventType) *FieldHandler {
|
|
return &FieldHandler{
|
|
Handler: Handler{
|
|
projection: &fieldProjection{name: name},
|
|
client: config.Client,
|
|
es: config.Eventstore,
|
|
bulkLimit: config.BulkLimit,
|
|
eventTypes: eventTypes,
|
|
requeueEvery: config.RequeueEvery,
|
|
handleActiveInstances: config.HandleActiveInstances,
|
|
now: time.Now,
|
|
maxFailureCount: config.MaxFailureCount,
|
|
retryFailedAfter: config.RetryFailedAfter,
|
|
triggeredInstancesSync: sync.Map{},
|
|
triggerWithoutEvents: config.TriggerWithoutEvents,
|
|
txDuration: config.TransactionDuration,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *FieldHandler) Trigger(ctx context.Context, opts ...TriggerOpt) (err error) {
|
|
config := new(triggerConfig)
|
|
for _, opt := range opts {
|
|
opt(config)
|
|
}
|
|
|
|
cancel := h.lockInstance(ctx, config)
|
|
if cancel == nil {
|
|
return nil
|
|
}
|
|
defer cancel()
|
|
|
|
for i := 0; ; i++ {
|
|
additionalIteration, err := h.processEvents(ctx, config)
|
|
h.log().OnError(err).Info("process events failed")
|
|
h.log().WithField("iteration", i).Debug("trigger iteration")
|
|
if !additionalIteration || err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) {
|
|
defer func() {
|
|
pgErr := new(pgconn.PgError)
|
|
if errors.As(err, &pgErr) {
|
|
// error returned if the row is currently locked by another connection
|
|
if pgErr.Code == "55P03" {
|
|
h.log().Debug("state already locked")
|
|
err = nil
|
|
additionalIteration = false
|
|
}
|
|
}
|
|
}()
|
|
|
|
txCtx := ctx
|
|
if h.txDuration > 0 {
|
|
var cancel, cancelTx func()
|
|
// add 100ms to store current state if iteration takes too long
|
|
txCtx, cancelTx = context.WithTimeout(ctx, h.txDuration+100*time.Millisecond)
|
|
defer cancelTx()
|
|
ctx, cancel = context.WithTimeout(ctx, h.txDuration)
|
|
defer cancel()
|
|
}
|
|
|
|
ctx, spanBeginTx := tracing.NewNamedSpan(ctx, "db.BeginTx")
|
|
tx, err := h.client.BeginTx(txCtx, nil)
|
|
spanBeginTx.EndWithError(err)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer func() {
|
|
if err != nil && !errors.Is(err, &executionError{}) {
|
|
rollbackErr := tx.Rollback()
|
|
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
|
|
return
|
|
}
|
|
commitErr := tx.Commit()
|
|
if err == nil {
|
|
err = commitErr
|
|
}
|
|
}()
|
|
|
|
currentState, err := h.currentState(ctx, tx, config)
|
|
if err != nil {
|
|
if errors.Is(err, errJustUpdated) {
|
|
return false, nil
|
|
}
|
|
return additionalIteration, err
|
|
}
|
|
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
|
|
if config.maxPosition != 0 && currentState.position >= config.maxPosition {
|
|
return false, nil
|
|
}
|
|
|
|
events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState)
|
|
if err != nil {
|
|
return additionalIteration, err
|
|
}
|
|
if len(events) == 0 {
|
|
err = h.setState(tx, currentState)
|
|
return additionalIteration, err
|
|
}
|
|
|
|
err = h.es.FillFields(ctx, events...)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
err = h.setState(tx, currentState)
|
|
|
|
return additionalIteration, err
|
|
}
|
|
|
|
func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) {
|
|
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx))
|
|
if err != nil {
|
|
h.log().WithError(err).Debug("filter eventstore failed")
|
|
return nil, false, err
|
|
}
|
|
eventAmount := len(events)
|
|
|
|
idx, offset := skipPreviouslyReducedEvents(events, currentState)
|
|
|
|
if currentState.position == events[len(events)-1].Position() {
|
|
offset += currentState.offset
|
|
}
|
|
currentState.position = events[len(events)-1].Position()
|
|
currentState.offset = offset
|
|
currentState.aggregateID = events[len(events)-1].Aggregate().ID
|
|
currentState.aggregateType = events[len(events)-1].Aggregate().Type
|
|
currentState.sequence = events[len(events)-1].Sequence()
|
|
currentState.eventTimestamp = events[len(events)-1].CreatedAt()
|
|
|
|
if idx+1 == len(events) {
|
|
return nil, false, nil
|
|
}
|
|
events = events[idx+1:]
|
|
|
|
additionalIteration = eventAmount == int(h.bulkLimit)
|
|
|
|
fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events))
|
|
highestPosition := events[len(events)-1].Position()
|
|
for i, event := range events {
|
|
if event.Position() == highestPosition {
|
|
offset++
|
|
}
|
|
fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent)
|
|
}
|
|
|
|
return fillFieldsEvents, additionalIteration, nil
|
|
}
|
|
|
|
func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) {
|
|
var position float64
|
|
for i, event := range events {
|
|
if event.Position() != position {
|
|
offset = 0
|
|
position = event.Position()
|
|
}
|
|
offset++
|
|
if event.Position() == currentState.position &&
|
|
event.Aggregate().ID == currentState.aggregateID &&
|
|
event.Aggregate().Type == currentState.aggregateType &&
|
|
event.Sequence() == currentState.sequence {
|
|
return i, offset
|
|
}
|
|
}
|
|
return -1, 0
|
|
}
|