Files
zitadel/internal/queue/queue.go
Silvan 5d1185ba4e fix: enable OpenTelemetry metrics for river queue and improve session projection handling (#10391)
Integrate OpenTelemetry metrics for better visibility into river queue
job processing and queue sizes. Additionally, modify session projection
handling to prevent failures during high load scenarios, ensuring
smoother login processes for users. This addresses issues related to
session projections and enhances overall system observability.

---------

Co-authored-by: Abhinav Sethi <abhinav.sethi03@gmail.com>
Co-authored-by: Zach Hirschtritt <zachary.hirschtritt@klaviyo.com>
2025-08-05 11:24:56 +02:00

108 lines
2.4 KiB
Go

package queue
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/rivercontrib/otelriver"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
)
// Queue abstracts the underlying queuing library
// For more information see github.com/riverqueue/river
type Queue struct {
driver riverdriver.Driver[pgx.Tx]
client *river.Client[pgx.Tx]
config *river.Config
shouldStart bool
}
type Config struct {
Client *database.DB `mapstructure:"-"` // mapstructure is needed if we would like to use viper to configure the queue
}
func NewQueue(config *Config) (_ *Queue, err error) {
if config.Client.Type() == "cockroach" {
return nil, nil
}
middleware := []rivertype.Middleware{otelriver.NewMiddleware(&otelriver.MiddlewareConfig{
MeterProvider: metrics.GetMetricsProvider(),
})}
return &Queue{
driver: riverpgxv5.New(config.Client.Pool),
config: &river.Config{
Workers: river.NewWorkers(),
Queues: make(map[string]river.QueueConfig),
JobTimeout: -1,
Middleware: middleware,
},
}, nil
}
func (q *Queue) ShouldStart() {
if q == nil {
return
}
q.shouldStart = true
}
func (q *Queue) Start(ctx context.Context) (err error) {
if q == nil || !q.shouldStart {
return nil
}
ctx = WithQueue(ctx)
q.client, err = river.NewClient(q.driver, q.config)
if err != nil {
return err
}
return q.client.Start(ctx)
}
func (q *Queue) AddWorkers(w ...Worker) {
if q == nil {
logging.Info("skip adding workers because queue is not set")
return
}
for _, worker := range w {
worker.Register(q.config.Workers, q.config.Queues)
}
}
type InsertOpt func(*river.InsertOpts)
func WithMaxAttempts(maxAttempts uint8) InsertOpt {
return func(opts *river.InsertOpts) {
opts.MaxAttempts = int(maxAttempts)
}
}
func WithQueueName(name string) InsertOpt {
return func(opts *river.InsertOpts) {
opts.Queue = name
}
}
func (q *Queue) Insert(ctx context.Context, args river.JobArgs, opts ...InsertOpt) error {
options := new(river.InsertOpts)
ctx = WithQueue(ctx)
for _, opt := range opts {
opt(options)
}
_, err := q.client.Insert(ctx, args, options)
return err
}
type Worker interface {
Register(workers *river.Workers, queues map[string]river.QueueConfig)
}