mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 15:37:33 +00:00
fix: don't trigger session projection on notification handling (#10298)
# Which Problems Are Solved There is an outstanding bug wherein a session projection can fail to complete and an session OTP challenge is blocked because the projection doesn't exist. Not sure why the session projection can fail to persist - I can't find any error logs or failed events to crosscheck. However, I can clearly see the session events persisted with user/password checks and the OTP challenged added on the session - but no session projection on sessions8 table. This only seems to come up under somewhat higher loads - about 5 logins/s and only for about 1% of cases. (where a "login" is: authRequest, createSession, getAuthCodeWithSession, tokenExchange, and finally, otpSmsChallenge...💥). # How the Problems Are Solved This is only half a fix, but an important one as it can block login for affected users. Instead of triggering and checking the session projection on notification enqueuing, build a write model directly from the ES. # Additional Changes # Additional Context This doesn't touch the "legacy" notification handler as to limit the blast radius of this change. But might be worth adding there too. The test is difficult to update correctly so is somewhat incomplete. Any suggestions for refactoring or test helpers I'm missing would be welcome. Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
|||||||
http_util "github.com/zitadel/zitadel/internal/api/http"
|
http_util "github.com/zitadel/zitadel/internal/api/http"
|
||||||
"github.com/zitadel/zitadel/internal/api/ui/console"
|
"github.com/zitadel/zitadel/internal/api/ui/console"
|
||||||
"github.com/zitadel/zitadel/internal/api/ui/login"
|
"github.com/zitadel/zitadel/internal/api/ui/login"
|
||||||
|
"github.com/zitadel/zitadel/internal/command"
|
||||||
"github.com/zitadel/zitadel/internal/domain"
|
"github.com/zitadel/zitadel/internal/domain"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore"
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
||||||
@@ -417,12 +418,14 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
|
|||||||
if alreadyHandled {
|
if alreadyHandled {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s, err := u.queries.SessionByID(ctx, true, e.Aggregate().ID, "", nil)
|
|
||||||
|
ctx, err = u.queries.Origin(ctx, e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, err = u.queries.Origin(ctx, e)
|
sessionWriteModel := command.NewSessionWriteModel(e.Aggregate().ID, e.Aggregate().InstanceID)
|
||||||
|
err = u.queries.es.FilterToQueryReducer(ctx, sessionWriteModel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -432,8 +435,8 @@ func (u *userNotifier) reduceSessionOTPSMSChallenged(event eventstore.Event) (*h
|
|||||||
return u.queue.Insert(ctx,
|
return u.queue.Insert(ctx,
|
||||||
¬ification.Request{
|
¬ification.Request{
|
||||||
Aggregate: e.Aggregate(),
|
Aggregate: e.Aggregate(),
|
||||||
UserID: s.UserFactor.UserID,
|
UserID: sessionWriteModel.UserID,
|
||||||
UserResourceOwner: s.UserFactor.ResourceOwner,
|
UserResourceOwner: sessionWriteModel.UserResourceOwner,
|
||||||
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
|
TriggeredAtOrigin: http_util.DomainContext(ctx).Origin(),
|
||||||
EventType: e.EventType,
|
EventType: e.EventType,
|
||||||
NotificationType: domain.NotificationTypeSms,
|
NotificationType: domain.NotificationTypeSms,
|
||||||
|
@@ -1349,19 +1349,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
|
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
|
||||||
testCode := "testcode"
|
testCode := "testcode"
|
||||||
_, code := cryptoValue(t, ctrl, testCode)
|
_, code := cryptoValue(t, ctrl, testCode)
|
||||||
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
|
|
||||||
ID: sessionID,
|
|
||||||
ResourceOwner: instanceID,
|
|
||||||
UserFactor: query.SessionUserFactor{
|
|
||||||
UserID: userID,
|
|
||||||
ResourceOwner: orgID,
|
|
||||||
},
|
|
||||||
}, nil)
|
|
||||||
queue.EXPECT().Insert(
|
queue.EXPECT().Insert(
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
¬ification.Request{
|
¬ification.Request{
|
||||||
UserID: userID,
|
UserID: "", // Empty since no session events are provided
|
||||||
UserResourceOwner: orgID,
|
UserResourceOwner: "", // Empty since no session events are provided
|
||||||
TriggeredAtOrigin: eventOrigin,
|
TriggeredAtOrigin: eventOrigin,
|
||||||
URLTemplate: "",
|
URLTemplate: "",
|
||||||
Code: code,
|
Code: code,
|
||||||
@@ -1387,11 +1380,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
).Return(nil)
|
).Return(nil)
|
||||||
|
|
||||||
|
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
|
||||||
|
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
|
||||||
|
|
||||||
return fields{
|
return fields{
|
||||||
queries: queries,
|
queries: queries,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
es: eventstore.NewEventstore(&eventstore.Config{
|
es: eventstore.NewEventstore(&eventstore.Config{
|
||||||
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier,
|
Querier: mockQuerier,
|
||||||
}),
|
}),
|
||||||
}, args{
|
}, args{
|
||||||
event: &session.OTPSMSChallengedEvent{
|
event: &session.OTPSMSChallengedEvent{
|
||||||
@@ -1421,19 +1418,12 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
IsPrimary: true,
|
IsPrimary: true,
|
||||||
}},
|
}},
|
||||||
}, nil)
|
}, nil)
|
||||||
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
|
|
||||||
ID: sessionID,
|
|
||||||
ResourceOwner: instanceID,
|
|
||||||
UserFactor: query.SessionUserFactor{
|
|
||||||
UserID: userID,
|
|
||||||
ResourceOwner: orgID,
|
|
||||||
},
|
|
||||||
}, nil)
|
|
||||||
queue.EXPECT().Insert(
|
queue.EXPECT().Insert(
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
¬ification.Request{
|
¬ification.Request{
|
||||||
UserID: userID,
|
UserID: "", // Empty since no session events are provided
|
||||||
UserResourceOwner: orgID,
|
UserResourceOwner: "", // Empty since no session events are provided
|
||||||
TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort),
|
TriggeredAtOrigin: fmt.Sprintf("%s://%s:%d", externalProtocol, instancePrimaryDomain, externalPort),
|
||||||
URLTemplate: "",
|
URLTemplate: "",
|
||||||
Code: code,
|
Code: code,
|
||||||
@@ -1459,11 +1449,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
).Return(nil)
|
).Return(nil)
|
||||||
|
|
||||||
|
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
|
||||||
|
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
|
||||||
|
|
||||||
return fields{
|
return fields{
|
||||||
queries: queries,
|
queries: queries,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
es: eventstore.NewEventstore(&eventstore.Config{
|
es: eventstore.NewEventstore(&eventstore.Config{
|
||||||
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier,
|
Querier: mockQuerier,
|
||||||
}),
|
}),
|
||||||
}, args{
|
}, args{
|
||||||
event: &session.OTPSMSChallengedEvent{
|
event: &session.OTPSMSChallengedEvent{
|
||||||
@@ -1484,19 +1478,11 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "external code",
|
name: "external code",
|
||||||
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
|
test: func(ctrl *gomock.Controller, queries *mock.MockQueries, queue *mock.MockQueue) (f fields, a args, w want) {
|
||||||
queries.EXPECT().SessionByID(gomock.Any(), gomock.Any(), sessionID, gomock.Any(), nil).Return(&query.Session{
|
|
||||||
ID: sessionID,
|
|
||||||
ResourceOwner: instanceID,
|
|
||||||
UserFactor: query.SessionUserFactor{
|
|
||||||
UserID: userID,
|
|
||||||
ResourceOwner: orgID,
|
|
||||||
},
|
|
||||||
}, nil)
|
|
||||||
queue.EXPECT().Insert(
|
queue.EXPECT().Insert(
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
¬ification.Request{
|
¬ification.Request{
|
||||||
UserID: userID,
|
UserID: "", // Empty since no session events are provided
|
||||||
UserResourceOwner: orgID,
|
UserResourceOwner: "", // Empty since no session events are provided
|
||||||
TriggeredAtOrigin: eventOrigin,
|
TriggeredAtOrigin: eventOrigin,
|
||||||
URLTemplate: "",
|
URLTemplate: "",
|
||||||
Code: nil,
|
Code: nil,
|
||||||
@@ -1522,11 +1508,15 @@ func Test_userNotifier_reduceOTPSMSChallenged(t *testing.T) {
|
|||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
).Return(nil)
|
).Return(nil)
|
||||||
|
|
||||||
|
mockQuerier := es_repo_mock.NewMockQuerier(ctrl)
|
||||||
|
mockQuerier.EXPECT().FilterToReducer(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
|
||||||
|
|
||||||
return fields{
|
return fields{
|
||||||
queries: queries,
|
queries: queries,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
es: eventstore.NewEventstore(&eventstore.Config{
|
es: eventstore.NewEventstore(&eventstore.Config{
|
||||||
Querier: es_repo_mock.NewRepo(t).ExpectFilterEvents().MockQuerier,
|
Querier: mockQuerier,
|
||||||
}),
|
}),
|
||||||
}, args{
|
}, args{
|
||||||
event: &session.OTPSMSChallengedEvent{
|
event: &session.OTPSMSChallengedEvent{
|
||||||
|
Reference in New Issue
Block a user