zitadel/internal/cache/connector/redis/circuit_breaker.go
Tim Möhlmann 3b7b0c69e6
feat(cache): redis circuit breaker (#8890)
# Which Problems Are Solved

If a redis cache has connection issues or any other type of permament
error,
it tanks the responsiveness of ZITADEL.
We currently do not support things like Redis cluster or sentinel. So
adding a simple redis cache improves performance but introduces a single
point of failure.

# How the Problems Are Solved

Implement a [circuit
breaker](https://learn.microsoft.com/en-us/previous-versions/msp-n-p/dn589784(v=pandp.10)?redirectedfrom=MSDN)
as
[`redis.Limiter`](https://pkg.go.dev/github.com/redis/go-redis/v9#Limiter)
by wrapping sony's [gobreaker](https://github.com/sony/gobreaker)
package. This package is picked as it seems well maintained and we
already use their `sonyflake` package

# Additional Changes

- The unit tests constructed an unused `redis.Client` and didn't cleanup
the connector. This is now fixed.

# Additional Context

Closes #8864
2024-11-13 19:11:48 +01:00

91 lines
2.6 KiB
Go

package redis
import (
"context"
"errors"
"time"
"github.com/redis/go-redis/v9"
"github.com/sony/gobreaker/v2"
"github.com/zitadel/logging"
)
const defaultInflightSize = 100000
type CBConfig struct {
// Interval when the counters are reset to 0.
// 0 interval never resets the counters until the CB is opened.
Interval time.Duration
// Amount of consecutive failures permitted
MaxConsecutiveFailures uint32
// The ratio of failed requests out of total requests
MaxFailureRatio float64
// Timeout after opening of the CB, until the state is set to half-open.
Timeout time.Duration
// The allowed amount of requests that are allowed to pass when the CB is half-open.
MaxRetryRequests uint32
}
func (config *CBConfig) readyToTrip(counts gobreaker.Counts) bool {
if config.MaxConsecutiveFailures > 0 && counts.ConsecutiveFailures > config.MaxConsecutiveFailures {
return true
}
if config.MaxFailureRatio > 0 && counts.Requests > 0 {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return failureRatio > config.MaxFailureRatio
}
return false
}
// limiter implements [redis.Limiter] as a circuit breaker.
type limiter struct {
inflight chan func(success bool)
cb *gobreaker.TwoStepCircuitBreaker[struct{}]
}
func newLimiter(config *CBConfig, maxActiveConns int) redis.Limiter {
if config == nil {
return nil
}
// The size of the inflight channel needs to be big enough for maxActiveConns to prevent blocking.
// When that is 0 (no limit), we must set a sane default.
if maxActiveConns <= 0 {
maxActiveConns = defaultInflightSize
}
return &limiter{
inflight: make(chan func(success bool), maxActiveConns),
cb: gobreaker.NewTwoStepCircuitBreaker[struct{}](gobreaker.Settings{
Name: "redis cache",
MaxRequests: config.MaxRetryRequests,
Interval: config.Interval,
Timeout: config.Timeout,
ReadyToTrip: config.readyToTrip,
OnStateChange: func(name string, from, to gobreaker.State) {
logging.WithFields("name", name, "from", from, "to", to).Warn("circuit breaker state change")
},
}),
}
}
// Allow implements [redis.Limiter].
func (l *limiter) Allow() error {
done, err := l.cb.Allow()
if err != nil {
return err
}
l.inflight <- done
return nil
}
// ReportResult implements [redis.Limiter].
//
// ReportResult checks the error returned by the Redis client.
// `nil`, [redis.Nil] and [context.Canceled] are not considered failures.
// Any other error, like connection or [context.DeadlineExceeded] is counted as a failure.
func (l *limiter) ReportResult(err error) {
done := <-l.inflight
done(err == nil ||
errors.Is(err, redis.Nil) ||
errors.Is(err, context.Canceled))
}