diff --git a/go.mod b/go.mod index 21a7fe9f16..15b3d2b391 100644 --- a/go.mod +++ b/go.mod @@ -146,6 +146,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/riverqueue/river/rivershared v0.22.0 // indirect + github.com/riverqueue/rivercontrib/otelriver v0.5.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect diff --git a/go.sum b/go.sum index cc3bc35841..272ce655a3 100644 --- a/go.sum +++ b/go.sum @@ -682,6 +682,8 @@ github.com/riverqueue/river/rivershared v0.22.0 h1:hLPHr98d6OEfmUJ4KpIXgoy2tbQ14 github.com/riverqueue/river/rivershared v0.22.0/go.mod h1:BK+hvhECfdDLWNDH3xiGI95m2YoPfVtECZLT+my8XM8= github.com/riverqueue/river/rivertype v0.22.0 h1:rSRhbd5uV/BaFTPxReCxuYTAzx+/riBZJlZdREADvO4= github.com/riverqueue/river/rivertype v0.22.0/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs= +github.com/riverqueue/rivercontrib/otelriver v0.5.0 h1:dZF4Fy7/3RaIRsyCPdpIJtzEip0pCvoJ44YpSDum8e4= +github.com/riverqueue/rivercontrib/otelriver v0.5.0/go.mod h1:rXANcBrlgRvg+auD3/O6Xfs59AWeWNpa/kim62mkxGo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/internal/queue/queue.go b/internal/queue/queue.go index d680221753..22df8c2b5c 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -7,9 +7,12 @@ import ( "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" + "github.com/riverqueue/rivercontrib/otelriver" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/telemetry/metrics" ) // Queue abstracts the underlying queuing library @@ -27,12 +30,16 @@ type Config struct { } func NewQueue(config *Config) (_ *Queue, err error) { + middleware := []rivertype.Middleware{otelriver.NewMiddleware(&otelriver.MiddlewareConfig{ + MeterProvider: metrics.GetMetricsProvider(), + })} return &Queue{ driver: riverpgxv5.New(config.Client.Pool), config: &river.Config{ Workers: river.NewWorkers(), Queues: make(map[string]river.QueueConfig), JobTimeout: -1, + Middleware: middleware, }, }, nil }