mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-08 14:57:49 +00:00
4eaa3163b6
# Which Problems Are Solved We identified the need of caching. Currently we have a number of places where we use different ways of caching, like go maps or LRU. We might also want shared chaches in the future, like Redis-based or in special SQL tables. # How the Problems Are Solved Define a generic Cache interface which allows different implementations. - A noop implementation is provided and enabled as. - An implementation using go maps is provided - disabled in defaults.yaml - enabled in integration tests - Authz middleware instance objects are cached using the interface. # Additional Changes - Enabled integration test command raceflag - Fix a race condition in the limits integration test client - Fix a number of flaky integration tests. (Because zitadel is super fast now!) 🎸 🚀 # Additional Context Related to https://github.com/zitadel/zitadel/issues/8648
96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
package handler
|
|
|
|
import (
|
|
"database/sql"
|
|
_ "embed"
|
|
"time"
|
|
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/zerrors"
|
|
)
|
|
|
|
var (
|
|
//go:embed failed_event_set.sql
|
|
setFailedEventStmt string
|
|
//go:embed failed_event_get_count.sql
|
|
failureCountStmt string
|
|
)
|
|
|
|
type failure struct {
|
|
sequence uint64
|
|
instance string
|
|
aggregateID string
|
|
aggregateType eventstore.AggregateType
|
|
eventDate time.Time
|
|
err error
|
|
}
|
|
|
|
func failureFromEvent(event eventstore.Event, err error) *failure {
|
|
return &failure{
|
|
sequence: event.Sequence(),
|
|
instance: event.Aggregate().InstanceID,
|
|
aggregateID: event.Aggregate().ID,
|
|
aggregateType: event.Aggregate().Type,
|
|
eventDate: event.CreatedAt(),
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
func failureFromStatement(statement *Statement, err error) *failure {
|
|
return &failure{
|
|
sequence: statement.Sequence,
|
|
instance: statement.Aggregate.InstanceID,
|
|
aggregateID: statement.Aggregate.ID,
|
|
aggregateType: statement.Aggregate.Type,
|
|
eventDate: statement.CreationDate,
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handleFailedStmt(tx *sql.Tx, f *failure) (shouldContinue bool) {
|
|
failureCount, err := h.failureCount(tx, f)
|
|
if err != nil {
|
|
h.logFailure(f).WithError(err).Warn("unable to get failure count")
|
|
return false
|
|
}
|
|
failureCount += 1
|
|
err = h.setFailureCount(tx, failureCount, f)
|
|
h.logFailure(f).OnError(err).Warn("unable to update failure count")
|
|
|
|
return failureCount >= h.maxFailureCount
|
|
}
|
|
|
|
func (h *Handler) failureCount(tx *sql.Tx, f *failure) (count uint8, err error) {
|
|
row := tx.QueryRow(failureCountStmt,
|
|
h.projection.Name(),
|
|
f.instance,
|
|
f.aggregateType,
|
|
f.aggregateID,
|
|
f.sequence,
|
|
)
|
|
if err = row.Err(); err != nil {
|
|
return 0, zerrors.ThrowInternal(err, "CRDB-Unnex", "unable to update failure count")
|
|
}
|
|
if err = row.Scan(&count); err != nil {
|
|
return 0, zerrors.ThrowInternal(err, "CRDB-RwSMV", "unable to scan count")
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (h *Handler) setFailureCount(tx *sql.Tx, count uint8, f *failure) error {
|
|
_, err := tx.Exec(setFailedEventStmt,
|
|
h.projection.Name(),
|
|
f.instance,
|
|
f.aggregateType,
|
|
f.aggregateID,
|
|
f.eventDate,
|
|
f.sequence,
|
|
count,
|
|
f.err.Error(),
|
|
)
|
|
if err != nil {
|
|
return zerrors.ThrowInternal(err, "CRDB-4Ht4x", "set failure count failed")
|
|
}
|
|
return nil
|
|
}
|