fix: don't trigger session projection on notification handling (#10298)

# Which Problems Are Solved

There is an outstanding bug wherein a session projection can fail to
complete and an session OTP challenge is blocked because the projection
doesn't exist. Not sure why the session projection can fail to persist -
I can't find any error logs or failed events to crosscheck. However, I
can clearly see the session events persisted with user/password checks
and the OTP challenged added on the session - but no session projection
on sessions8 table.

This only seems to come up under somewhat higher loads - about 5
logins/s and only for about 1% of cases. (where a "login" is:
authRequest, createSession, getAuthCodeWithSession, tokenExchange, and
finally, otpSmsChallenge...💥).

# How the Problems Are Solved

This is only half a fix, but an important one as it can block login for
affected users. Instead of triggering and checking the session
projection on notification enqueuing, build a write model directly from
the ES.

# Additional Changes

# Additional Context

This doesn't touch the "legacy" notification handler as to limit the
blast radius of this change. But might be worth adding there too.

The test is difficult to update correctly so is somewhat incomplete. Any
suggestions for refactoring or test helpers I'm missing would be
welcome.

Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
This commit is contained in:
Zach Hirschtritt
2025-08-04 09:33:01 -04:00
committed by Stefan Benz
parent 9dde6247d7
commit 850bd8a813
2 changed files with 30 additions and 37 deletions

View File

@@ -7,6 +7,7 @@ import (
http_util "github.com/zitadel/zitadel/internal/api/http" http_util "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/api/ui/console" "github.com/zitadel/zitadel/internal/api/ui/console"
"github.com/zitadel/zitadel/internal/api/ui/login" "github.com/zitadel/zitadel/internal/api/ui/login"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2" "github.com/zitadel/zitadel/internal/eventstore/handler/v2"
@@ -417,12 +418,14 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
if alreadyHandled { if alreadyHandled {
return nil return nil
} }
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
ctx, err = u.queries.Origin(ctx, e)
if err != nil { if err != nil {
return err return err
} }
ctx, err = u.queries.Origin(ctx, e) sessionWriteModel := command.NewSessionWriteModel(e.Aggregate().ID, e.Aggregate().InstanceID)
err = u.queries.es.FilterToQueryReducer(ctx, sessionWriteModel)
if err != nil { if err != nil {
return err return err
} }
@@ -432,8 +435,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
return u.queue.Insert(ctx, return u.queue.Insert(ctx,
&notification.Request{ &notification.Request{
Aggregate: e.Aggregate(), Aggregate: e.Aggregate(),
UserID: s.UserFactor.UserID, UserID: sessionWriteModel.UserID,
UserResourceOwner: s.UserFactor.ResourceOwner, UserResourceOwner: sessionWriteModel.UserResourceOwner,
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(), TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
EventType: e.EventType, EventType: e.EventType,
NotificationType: domain.NotificationTypeSms, NotificationType: domain.NotificationTypeSms,

View File

@@ -1349,19 +1349,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
testCode := "testcode" testCode := "testcode"
_, code := cryptoValue(t, ctrl, testCode) _, code := cryptoValue(t, ctrl, testCode)
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: eventOrigin, TriggeredAtOrigin: eventOrigin,
URLTemplate: "", URLTemplate: "",
Code: code, Code: code,
@@ -1387,11 +1380,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{
@@ -1421,19 +1418,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
IsPrimary: true, IsPrimary: true,
}}, }},
}, nil) }, nil)
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort), TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort),
URLTemplate: "", URLTemplate: "",
Code: code, Code: code,
@@ -1459,11 +1449,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{
@@ -1484,19 +1478,11 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
{ {
name: "external code", name: "external code",
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) { test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
ID: sessionID,
ResourceOwner: instanceID,
UserFactor: query.SessionUserFactor{
UserID: userID,
ResourceOwner: orgID,
},
}, nil)
queue.EXPECT().Insert( queue.EXPECT().Insert(
gomock.Any(), gomock.Any(),
&notification.Request{ &notification.Request{
UserID: userID, UserID: "", // Empty since no session events are provided
UserResourceOwner: orgID, UserResourceOwner: "", // Empty since no session events are provided
TriggeredAtOrigin: eventOrigin, TriggeredAtOrigin: eventOrigin,
URLTemplate: "", URLTemplate: "",
Code: nil, Code: nil,
@@ -1522,11 +1508,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).Return(nil) ).Return(nil)
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return fields{ return fields{
queries: queries, queries: queries,
queue: queue, queue: queue,
es: eventstore.NewEventstore(&eventstore.Config{ es: eventstore.NewEventstore(&eventstore.Config{
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier, Querier: mockQuerier,
}), }),
}, args{ }, args{
event: &session.OTPSMSChallengedEvent{ event: &session.OTPSMSChallengedEvent{