fix: enable OpenTelemetry metrics for river queue and improve session projection handling (#10391)

Integrate OpenTelemetry metrics for better visibility into river queue
job processing and queue sizes. Additionally, modify session projection
handling to prevent failures during high load scenarios, ensuring
smoother login processes for users. This addresses issues related to
session projections and enhances overall system observability.

---------

Co-authored-by: Abhinav Sethi <abhinav.sethi03@gmail.com>
Co-authored-by: Zach Hirschtritt <zachary.hirschtritt@klaviyo.com>
This commit is contained in:
Silvan
2025-08-05 11:24:56 +02:00
committed by GitHub
parent 17f033f0b4
commit 5d1185ba4e
9 changed files with 4390 additions and 482 deletions

View File

@@ -9,4 +9,3 @@ docker compose run e2e
```bash ```bash
docker compose down docker compose down
``` ```

3554
e2e/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -29,6 +29,6 @@
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^22.3.0", "@types/node": "^22.3.0",
"cypress": "^13.13.3" "cypress": "^14.5.3"
} }
} }

File diff suppressed because it is too large Load Diff

15
go.mod
View File

@@ -60,9 +60,10 @@ require (
github.com/pquerna/otp v1.4.0 github.com/pquerna/otp v1.4.0
github.com/rakyll/statik v0.1.7 github.com/rakyll/statik v0.1.7
github.com/redis/go-redis/v9 v9.7.3 github.com/redis/go-redis/v9 v9.7.3
github.com/riverqueue/river v0.19.0 github.com/riverqueue/river v0.22.0
github.com/riverqueue/river/riverdriver v0.19.0 github.com/riverqueue/river/riverdriver v0.22.0
github.com/riverqueue/river/rivertype v0.19.0 github.com/riverqueue/river/rivertype v0.22.0
github.com/riverqueue/rivercontrib/otelriver v0.5.0
github.com/rs/cors v1.11.1 github.com/rs/cors v1.11.1
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/shopspring/decimal v1.4.0 github.com/shopspring/decimal v1.4.0
@@ -94,8 +95,8 @@ require (
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 golang.org/x/exp v0.0.0-20250305212735-054e65f0b394
golang.org/x/net v0.37.0 golang.org/x/net v0.37.0
golang.org/x/oauth2 v0.28.0 golang.org/x/oauth2 v0.28.0
golang.org/x/sync v0.12.0 golang.org/x/sync v0.13.0
golang.org/x/text v0.23.0 golang.org/x/text v0.24.0
google.golang.org/api v0.227.0 google.golang.org/api v0.227.0
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4 google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4
google.golang.org/grpc v1.71.0 google.golang.org/grpc v1.71.0
@@ -143,7 +144,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/riverqueue/river/rivershared v0.19.0 // indirect github.com/riverqueue/river/rivershared v0.22.0 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/gjson v1.18.0 // indirect
@@ -216,7 +217,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.19.0 github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0
github.com/rs/xid v1.6.0 // indirect github.com/rs/xid v1.6.0 // indirect
github.com/russellhaering/goxmldsig v1.4.0 // indirect github.com/russellhaering/goxmldsig v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3

38
go.sum
View File

@@ -439,8 +439,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -673,18 +673,20 @@ github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo= github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo= github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/riverqueue/river v0.19.0 h1:WRh/NXhp+WEEY0HpCYgr4wSRllugYBt30HtyQ3jlz08= github.com/riverqueue/river v0.22.0 h1:PO4Ula2RqViQqNs6xjze7yFV6Zq4T3Ffv092+f4S8xQ=
github.com/riverqueue/river v0.19.0/go.mod h1:YJ7LA2uBdqFHQJzKyYc+X6S04KJeiwsS1yU5a1rynlk= github.com/riverqueue/river v0.22.0/go.mod h1:IRoWoK4RGCiPuVJUV4EWcCl9d/TMQYkk0EEYV/Wgq+U=
github.com/riverqueue/river/riverdriver v0.19.0 h1:NyHz5DfB13paT2lvaO0CKmwy4SFLbA7n6MFRGRtwii4= github.com/riverqueue/river/riverdriver v0.22.0 h1:i7OSFkUi6x4UKvttdFOIg7NYLYaBOFLJZvkZ0+JWS/8=
github.com/riverqueue/river/riverdriver v0.19.0/go.mod h1:Soxi08hHkEvopExAp6ADG2437r4coSiB4QpuIL5E28k= github.com/riverqueue/river/riverdriver v0.22.0/go.mod h1:oNdjJCeAJhN/UiZGLNL+guNqWaxMFuSD4lr5x/v/was=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.19.0 h1:ytdPnueiv7ANxJcntBtYenrYZZLY5P0mXoDV0l4WsLk= github.com/riverqueue/river/riverdriver/riverdatabasesql v0.22.0 h1:+no3gToOK9SmWg0pDPKfOGSCsrxqqaFdD8K1NQndRbY=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.19.0/go.mod h1:5Fahb3n+m1V0RAb0JlOIpzimoTlkOgudMfxSSCTcmFk= github.com/riverqueue/river/riverdriver/riverdatabasesql v0.22.0/go.mod h1:mygiHa1dnlKRjxT1//wIvfT2fMTbfXKm37NcsxoyBoQ=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.19.0 h1:QWg7VTDDXbtTF6srr7Y1C888PiNzqv379yQuNSnH2hg= github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0 h1:2TWbVL73gipJ2/4JNCQbifaNj+BCC/Zxpp30o1D8RTg=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.19.0/go.mod h1:uvF1YS+iSQavCIHtaB/Y6O8A6Dnn38ctVQCpCpmHDZE= github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0/go.mod h1:TZY/BG8w/nDxkraAEvvgyVupIz0b4+PQVUW0kIiy1fc=
github.com/riverqueue/river/rivershared v0.19.0 h1:TZvFM6CC+QgwQQUMQ5Ueuhx25ptgqcKqZQGsdLJnFeE= github.com/riverqueue/river/rivershared v0.22.0 h1:hLPHr98d6OEfmUJ4KpIXgoy2tbQ14htWILcRBHJF11U=
github.com/riverqueue/river/rivershared v0.19.0/go.mod h1:JAvmohuC5lounVk8e3zXZIs07Da3klzEeJo1qDQIbjw= github.com/riverqueue/river/rivershared v0.22.0/go.mod h1:BK+hvhECfdDLWNDH3xiGI95m2YoPfVtECZLT+my8XM8=
github.com/riverqueue/river/rivertype v0.19.0 h1:5rwgdh21pVcU9WjrHIIO9qC2dOMdRrrZ/HZZOE0JRyY= github.com/riverqueue/river/rivertype v0.22.0 h1:rSRhbd5uV/BaFTPxReCxuYTAzx+/riBZJlZdREADvO4=
github.com/riverqueue/river/rivertype v0.19.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE= github.com/riverqueue/river/rivertype v0.22.0/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
github.com/riverqueue/rivercontrib/otelriver v0.5.0 h1:dZF4Fy7/3RaIRsyCPdpIJtzEip0pCvoJ44YpSDum8e4=
github.com/riverqueue/rivercontrib/otelriver v0.5.0/go.mod h1:rXANcBrlgRvg+auD3/O6Xfs59AWeWNpa/kim62mkxGo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -970,8 +972,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -1038,8 +1040,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=

View File

@@ -7,6 +7,7 @@ import (
http_util "github.com/zitadel/zitadel/internal/api/http" http_util "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/api/ui/console" "github.com/zitadel/zitadel/internal/api/ui/console"
"github.com/zitadel/zitadel/internal/api/ui/login" "github.com/zitadel/zitadel/internal/api/ui/login"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2" "github.com/zitadel/zitadel/internal/eventstore/handler/v2"
@@ -417,12 +418,14 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
if alreadyHandled { if alreadyHandled {
return nil return nil
} }
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
ctx, err = u.queries.Origin(ctx, e)
if err != nil { if err != nil {
return err return err
} }
ctx, err = u.queries.Origin(ctx, e) sessionWriteModel := command.NewSessionWriteModel(e.Aggregate().ID, e.Aggregate().InstanceID)
err = u.queries.es.FilterToQueryReducer(ctx, sessionWriteModel)
if err != nil { if err != nil {
return err return err
} }
@@ -432,8 +435,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
return u.queue.Insert(ctx, return u.queue.Insert(ctx,
&notification.Request{ &notification.Request{
Aggregate: e.Aggregate(), Aggregate: e.Aggregate(),
UserID: s.UserFactor.UserID, UserID: sessionWriteModel.UserID,
UserResourceOwner: s.UserFactor.ResourceOwner, UserResourceOwner: sessionWriteModel.UserResourceOwner,
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(), TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
EventType: e.EventType, EventType: e.EventType,
NotificationType: domain.NotificationTypeSms, NotificationType: domain.NotificationTypeSms,

View File

@@ -1349,19 +1349,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
testCode := "testcode" testCode := "testcode"
_, code := cryptoValue(t, ctrl, testCode) _, code := cryptoValue(t, ctrl, testCode)
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: eventOrigin, TriggeredAtOrigin: eventOrigin,
URLTemplate: "", URLTemplate: "",
Code: code, Code: code,
@@ -1387,11 +1380,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{
@@ -1421,19 +1418,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
IsPrimary: true, IsPrimary: true,
}}, }},
}, nil) }, nil)
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort), TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort),
URLTemplate: "", URLTemplate: "",
Code: code, Code: code,
@@ -1459,11 +1449,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{
@@ -1484,19 +1478,11 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
{ {
name: "external code", name: "external code",
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: eventOrigin, TriggeredAtOrigin: eventOrigin,
URLTemplate: "", URLTemplate: "",
Code: nil, Code: nil,
@@ -1522,11 +1508,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{

View File

@@ -7,9 +7,12 @@ import (
"github.com/riverqueue/river" "github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/rivercontrib/otelriver"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/telemetry/metrics"
) )
// Queue abstracts the underlying queuing library // Queue abstracts the underlying queuing library
@@ -30,12 +33,16 @@ func NewQueue(config *Config) (_ *Queue, err error) {
if config.Client.Type() == "cockroach" { if config.Client.Type() == "cockroach" {
return nil, nil return nil, nil
} }
middleware := []rivertype.Middleware{otelriver.NewMiddleware(&otelriver.MiddlewareConfig{
MeterProvider: metrics.GetMetricsProvider(),
})}
return &Queue{ return &Queue{
driver: riverpgxv5.New(config.Client.Pool), driver: riverpgxv5.New(config.Client.Pool),
config: &river.Config{ config: &river.Config{
Workers: river.NewWorkers(), Workers: river.NewWorkers(),
Queues: make(map[string]river.QueueConfig), Queues: make(map[string]river.QueueConfig),
JobTimeout: -1, JobTimeout: -1,
Middleware: middleware,
}, },
}, nil }, nil
} }