feat(storage): generic cache interface (#8628)

# Which Problems Are Solved

We identified the need of caching.
Currently we have a number of places where we use different ways of
caching, like go maps or LRU.
We might also want shared chaches in the future, like Redis-based or in
special SQL tables.

# How the Problems Are Solved

Define a generic Cache interface which allows different implementations.

- A noop implementation is provided and enabled as.
- An implementation using go maps is provided
  - disabled in defaults.yaml
  - enabled in integration tests
- Authz middleware instance objects are cached using the interface.

# Additional Changes

- Enabled integration test command raceflag
- Fix a race condition in the limits integration test client
- Fix a number of flaky integration tests. (Because zitadel is super
fast now!) 🎸 🚀

# Additional Context

Related to https://github.com/zitadel/zitadel/issues/8648
This commit is contained in:
Tim Möhlmann
2024-09-25 22:40:21 +03:00
committed by GitHub
parent a6ea83168d
commit 4eaa3163b6
28 changed files with 1290 additions and 78 deletions

View File

@@ -39,9 +39,9 @@ func failureFromEvent(event eventstore.Event, err error) *failure {
func failureFromStatement(statement *Statement, err error) *failure {
return &failure{
sequence: statement.Sequence,
instance: statement.InstanceID,
aggregateID: statement.AggregateID,
aggregateType: statement.AggregateType,
instance: statement.Aggregate.InstanceID,
aggregateID: statement.Aggregate.ID,
aggregateType: statement.Aggregate.Type,
eventDate: statement.CreationDate,
err: err,
}

View File

@@ -62,6 +62,7 @@ type Handler struct {
triggeredInstancesSync sync.Map
triggerWithoutEvents Reduce
cacheInvalidations []func(ctx context.Context, aggregates []*eventstore.Aggregate)
}
var _ migration.Migration = (*Handler)(nil)
@@ -418,6 +419,12 @@ func (h *Handler) Trigger(ctx context.Context, opts ...TriggerOpt) (_ context.Co
}
}
// RegisterCacheInvalidation registers a function to be called when a cache needs to be invalidated.
// In order to avoid race conditions, this method must be called before [Handler.Start] is called.
func (h *Handler) RegisterCacheInvalidation(invalidate func(ctx context.Context, aggregates []*eventstore.Aggregate)) {
h.cacheInvalidations = append(h.cacheInvalidations, invalidate)
}
// lockInstance tries to lock the instance.
// If the instance is already locked from another process no cancel function is returned
// the instance can be skipped then
@@ -486,10 +493,6 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
return
}
commitErr := tx.Commit()
if err == nil {
err = commitErr
}
}()
currentState, err := h.currentState(ctx, tx, config)
@@ -509,6 +512,17 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
if err != nil {
return additionalIteration, err
}
defer func() {
commitErr := tx.Commit()
if err == nil {
err = commitErr
}
if err == nil && currentState.aggregateID != "" && len(statements) > 0 {
h.invalidateCaches(ctx, aggregatesFromStatements(statements))
}
}()
if len(statements) == 0 {
err = h.setState(tx, currentState)
return additionalIteration, err
@@ -522,8 +536,8 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
currentState.position = statements[lastProcessedIndex].Position
currentState.offset = statements[lastProcessedIndex].offset
currentState.aggregateID = statements[lastProcessedIndex].AggregateID
currentState.aggregateType = statements[lastProcessedIndex].AggregateType
currentState.aggregateID = statements[lastProcessedIndex].Aggregate.ID
currentState.aggregateType = statements[lastProcessedIndex].Aggregate.Type
currentState.sequence = statements[lastProcessedIndex].Sequence
currentState.eventTimestamp = statements[lastProcessedIndex].CreationDate
err = h.setState(tx, currentState)
@@ -556,8 +570,8 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
if idx+1 == len(statements) {
currentState.position = statements[len(statements)-1].Position
currentState.offset = statements[len(statements)-1].offset
currentState.aggregateID = statements[len(statements)-1].AggregateID
currentState.aggregateType = statements[len(statements)-1].AggregateType
currentState.aggregateID = statements[len(statements)-1].Aggregate.ID
currentState.aggregateType = statements[len(statements)-1].Aggregate.Type
currentState.sequence = statements[len(statements)-1].Sequence
currentState.eventTimestamp = statements[len(statements)-1].CreationDate
@@ -577,8 +591,8 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
func skipPreviouslyReducedStatements(statements []*Statement, currentState *state) int {
for i, statement := range statements {
if statement.Position == currentState.position &&
statement.AggregateID == currentState.aggregateID &&
statement.AggregateType == currentState.aggregateType &&
statement.Aggregate.ID == currentState.aggregateID &&
statement.Aggregate.Type == currentState.aggregateType &&
statement.Sequence == currentState.sequence {
return i
}
@@ -667,3 +681,34 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
func (h *Handler) ProjectionName() string {
return h.projection.Name()
}
func (h *Handler) invalidateCaches(ctx context.Context, aggregates []*eventstore.Aggregate) {
if len(h.cacheInvalidations) == 0 {
return
}
var wg sync.WaitGroup
wg.Add(len(h.cacheInvalidations))
for _, invalidate := range h.cacheInvalidations {
go func(invalidate func(context.Context, []*eventstore.Aggregate)) {
defer wg.Done()
invalidate(ctx, aggregates)
}(invalidate)
}
wg.Wait()
}
// aggregatesFromStatements returns the unique aggregates from statements.
// Duplicate aggregates are omitted.
func aggregatesFromStatements(statements []*Statement) []*eventstore.Aggregate {
aggregates := make([]*eventstore.Aggregate, 0, len(statements))
for _, statement := range statements {
if !slices.ContainsFunc(aggregates, func(aggregate *eventstore.Aggregate) bool {
return *statement.Aggregate == *aggregate
}) {
aggregates = append(aggregates, statement.Aggregate)
}
}
return aggregates
}

View File

@@ -80,12 +80,10 @@ func (h *Handler) reduce(event eventstore.Event) (*Statement, error) {
}
type Statement struct {
AggregateType eventstore.AggregateType
AggregateID string
Sequence uint64
Position float64
CreationDate time.Time
InstanceID string
Aggregate *eventstore.Aggregate
Sequence uint64
Position float64
CreationDate time.Time
offset uint32
@@ -108,13 +106,11 @@ var (
func NewStatement(event eventstore.Event, e Exec) *Statement {
return &Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
Position: event.Position(),
AggregateID: event.Aggregate().ID,
CreationDate: event.CreatedAt(),
InstanceID: event.Aggregate().InstanceID,
Execute: e,
Aggregate: event.Aggregate(),
Sequence: event.Sequence(),
Position: event.Position(),
CreationDate: event.CreatedAt(),
Execute: e,
}
}