From 850bd8a81379cd1b26076e0cc6f591baa41feec4 Mon Sep 17 00:00:00 2001 From: Zach Hirschtritt Date: Mon, 4 Aug 2025 09:33:01 -0400 Subject: [PATCH] fix: don't trigger session projection on notification handling (#10298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Which Problems Are Solved There is an outstanding bug wherein a session projection can fail to complete and an session OTP challenge is blocked because the projection doesn't exist. Not sure why the session projection can fail to persist - I can't find any error logs or failed events to crosscheck. However, I can clearly see the session events persisted with user/password checks and the OTP challenged added on the session - but no session projection on sessions8 table. This only seems to come up under somewhat higher loads - about 5 logins/s and only for about 1% of cases. (where a "login" is: authRequest, createSession, getAuthCodeWithSession, tokenExchange, and finally, otpSmsChallenge...💥). # How the Problems Are Solved This is only half a fix, but an important one as it can block login for affected users. Instead of triggering and checking the session projection on notification enqueuing, build a write model directly from the ES. # Additional Changes # Additional Context This doesn't touch the "legacy" notification handler as to limit the blast radius of this change. But might be worth adding there too. The test is difficult to update correctly so is somewhat incomplete. Any suggestions for refactoring or test helpers I'm missing would be welcome. Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com> --- .../notification/handlers/user_notifier.go | 11 ++-- .../handlers/user_notifier_test.go | 56 ++++++++----------- 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/internal/notification/handlers/user_notifier.go b/internal/notification/handlers/user_notifier.go index f36f5d828c..7e281c43bc 100644 --- a/internal/notification/handlers/user_notifier.go +++ b/internal/notification/handlers/user_notifier.go @@ -7,6 +7,7 @@ import ( http_util "github.com/zitadel/zitadel/internal/api/http" "github.com/zitadel/zitadel/internal/api/ui/console" "github.com/zitadel/zitadel/internal/api/ui/login" + "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler/v2" @@ -417,12 +418,14 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h if alreadyHandled { return nil } - s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil) + + ctx, err = u.queries.Origin(ctx, e) if err != nil { return err } - ctx, err = u.queries.Origin(ctx, e) + sessionWriteModel := command.NewSessionWriteModel(e.Aggregate().ID, e.Aggregate().InstanceID) + err = u.queries.es.FilterToQueryReducer(ctx, sessionWriteModel) if err != nil { return err } @@ -432,8 +435,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h return u.queue.Insert(ctx, ¬ification.Request{ Aggregate: e.Aggregate(), - UserID: s.UserFactor.UserID, - UserResourceOwner: s.UserFactor.ResourceOwner, + UserID: sessionWriteModel.UserID, + UserResourceOwner: sessionWriteModel.UserResourceOwner, TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(), EventType: e.EventType, NotificationType: domain.NotificationTypeSms, diff --git a/internal/notification/handlers/user_notifier_test.go b/internal/notification/handlers/user_notifier_test.go index f7090f0146..d4979eb229 100644 --- a/internal/notification/handlers/user_notifier_test.go +++ b/internal/notification/handlers/user_notifier_test.go @@ -1349,19 +1349,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { testCode := "testcode" _, code := cryptoValue(t, ctrl, testCode) - queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{ - ID: sessionID, - ResourceOwner: instanceID, - UserFactor: query.SessionUserFactor{ - UserID: userID, - ResourceOwner: orgID, - }, - }, nil) + queue.EXPECT().Insert( gomock.Any(), ¬ification.Request{ - UserID: userID, - UserResourceOwner: orgID, + UserID: "", // Empty since no session events are provided + UserResourceOwner: "", // Empty since no session events are provided TriggeredAtOrigin: eventOrigin, URLTemplate: "", Code: code, @@ -1387,11 +1380,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { gomock.Any(), gomock.Any(), ).Return(nil) + + mockQuerier := es_repo_mock.NewMockQuerier(ctrl) + mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return fields{ queries: queries, queue: queue, es: eventstore.NewEventstore(&eventstore.Config{ - Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, + Querier: mockQuerier, }), }, args{ event: &session.OTPSMSChallengedEvent{ @@ -1421,19 +1418,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { IsPrimary: true, }}, }, nil) - queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{ - ID: sessionID, - ResourceOwner: instanceID, - UserFactor: query.SessionUserFactor{ - UserID: userID, - ResourceOwner: orgID, - }, - }, nil) + queue.EXPECT().Insert( gomock.Any(), ¬ification.Request{ - UserID: userID, - UserResourceOwner: orgID, + UserID: "", // Empty since no session events are provided + UserResourceOwner: "", // Empty since no session events are provided TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort), URLTemplate: "", Code: code, @@ -1459,11 +1449,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { gomock.Any(), gomock.Any(), ).Return(nil) + + mockQuerier := es_repo_mock.NewMockQuerier(ctrl) + mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return fields{ queries: queries, queue: queue, es: eventstore.NewEventstore(&eventstore.Config{ - Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, + Querier: mockQuerier, }), }, args{ event: &session.OTPSMSChallengedEvent{ @@ -1484,19 +1478,11 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { { name: "external code", test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { - queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{ - ID: sessionID, - ResourceOwner: instanceID, - UserFactor: query.SessionUserFactor{ - UserID: userID, - ResourceOwner: orgID, - }, - }, nil) queue.EXPECT().Insert( gomock.Any(), ¬ification.Request{ - UserID: userID, - UserResourceOwner: orgID, + UserID: "", // Empty since no session events are provided + UserResourceOwner: "", // Empty since no session events are provided TriggeredAtOrigin: eventOrigin, URLTemplate: "", Code: nil, @@ -1522,11 +1508,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) { gomock.Any(), gomock.Any(), ).Return(nil) + + mockQuerier := es_repo_mock.NewMockQuerier(ctrl) + mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return fields{ queries: queries, queue: queue, es: eventstore.NewEventstore(&eventstore.Config{ - Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, + Querier: mockQuerier, }), }, args{ event: &session.OTPSMSChallengedEvent{