mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-12 02:54:20 +00:00
perf: reduce events read from eventstore (#6280)
* fix: events query user * fix: events query user * user events query * fix tests * fix query * cleanup --------- Co-authored-by: Fabienne <fabienne.gerschwiler@gmail.com>
This commit is contained in:
parent
b0dc02509b
commit
d3e403f645
@ -89,7 +89,7 @@ type idpUserLinksProvider interface {
|
||||
}
|
||||
|
||||
type userEventProvider interface {
|
||||
UserEventsByID(ctx context.Context, id string, sequence uint64) ([]*es_models.Event, error)
|
||||
UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []es_models.EventType) ([]*es_models.Event, error)
|
||||
}
|
||||
|
||||
type userCommandProvider interface {
|
||||
@ -1314,6 +1314,29 @@ func userSessionsByUserAgentID(provider userSessionViewProvider, agentID, instan
|
||||
return user_view_model.UserSessionsToModel(session), nil
|
||||
}
|
||||
|
||||
var (
|
||||
userSessionEventTypes = []es_models.EventType{
|
||||
es_models.EventType(user_repo.UserV1PasswordCheckSucceededType),
|
||||
es_models.EventType(user_repo.UserV1PasswordCheckFailedType),
|
||||
es_models.EventType(user_repo.UserV1MFAOTPCheckSucceededType),
|
||||
es_models.EventType(user_repo.UserV1MFAOTPCheckFailedType),
|
||||
es_models.EventType(user_repo.UserV1SignedOutType),
|
||||
es_models.EventType(user_repo.UserLockedType),
|
||||
es_models.EventType(user_repo.UserDeactivatedType),
|
||||
es_models.EventType(user_repo.HumanPasswordCheckSucceededType),
|
||||
es_models.EventType(user_repo.HumanPasswordCheckFailedType),
|
||||
es_models.EventType(user_repo.UserIDPLoginCheckSucceededType),
|
||||
es_models.EventType(user_repo.HumanMFAOTPCheckSucceededType),
|
||||
es_models.EventType(user_repo.HumanMFAOTPCheckFailedType),
|
||||
es_models.EventType(user_repo.HumanSignedOutType),
|
||||
es_models.EventType(user_repo.HumanPasswordlessTokenCheckSucceededType),
|
||||
es_models.EventType(user_repo.HumanPasswordlessTokenCheckFailedType),
|
||||
es_models.EventType(user_repo.HumanU2FTokenCheckSucceededType),
|
||||
es_models.EventType(user_repo.HumanU2FTokenCheckFailedType),
|
||||
es_models.EventType(user_repo.UserRemovedType),
|
||||
}
|
||||
)
|
||||
|
||||
func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eventProvider userEventProvider, agentID string, user *user_model.UserView) (*user_model.UserSessionView, error) {
|
||||
instanceID := authz.GetInstance(ctx).InstanceID()
|
||||
session, err := provider.UserSessionByIDs(agentID, user.ID, instanceID)
|
||||
@ -1330,7 +1353,7 @@ func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eve
|
||||
session.Sequence = sequence.CurrentSequence
|
||||
}
|
||||
}
|
||||
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence)
|
||||
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence, append(session.EventTypes(), userSessionEventTypes...))
|
||||
if err != nil {
|
||||
logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events")
|
||||
return user_view_model.UserSessionToModel(session), nil
|
||||
@ -1411,7 +1434,7 @@ func userByID(ctx context.Context, viewProvider userViewProvider, eventProvider
|
||||
} else if user == nil {
|
||||
user = new(user_view_model.UserView)
|
||||
}
|
||||
events, err := eventProvider.UserEventsByID(ctx, userID, user.Sequence)
|
||||
events, err := eventProvider.UserEventsByID(ctx, userID, user.Sequence, user.EventTypes())
|
||||
if err != nil {
|
||||
logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events")
|
||||
return user_view_model.UserToModel(user), nil
|
||||
|
@ -105,7 +105,7 @@ type mockEventUser struct {
|
||||
Event *es_models.Event
|
||||
}
|
||||
|
||||
func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, sequence uint64) ([]*es_models.Event, error) {
|
||||
func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []es_models.EventType) ([]*es_models.Event, error) {
|
||||
events := make([]*es_models.Event, 0)
|
||||
if m.Event != nil {
|
||||
events = append(events, m.Event)
|
||||
@ -119,7 +119,7 @@ func (m *mockEventUser) BulkAddExternalIDPs(ctx context.Context, userID string,
|
||||
|
||||
type mockEventErrUser struct{}
|
||||
|
||||
func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, sequence uint64) ([]*es_models.Event, error) {
|
||||
func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []es_models.EventType) ([]*es_models.Event, error) {
|
||||
return nil, errors.ThrowInternal(nil, "id", "internal error")
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ func (r *RefreshTokenRepo) RefreshTokenByID(ctx context.Context, tokenID, userID
|
||||
}
|
||||
}
|
||||
|
||||
events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence)
|
||||
events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence, tokenView.GetRelevantEventTypes())
|
||||
if errors.IsNotFound(viewErr) && len(events) == 0 {
|
||||
return nil, errors.ThrowNotFound(nil, "EVENT-BHB52", "Errors.User.RefreshToken.Invalid")
|
||||
}
|
||||
@ -106,8 +106,8 @@ func (r *RefreshTokenRepo) SearchMyRefreshTokens(ctx context.Context, userID str
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence)
|
||||
func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []models.EventType) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (
|
||||
}
|
||||
}
|
||||
|
||||
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence)
|
||||
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence, token.GetRelevantEventTypes())
|
||||
if errors.IsNotFound(viewErr) && len(events) == 0 {
|
||||
return nil, errors.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
|
||||
}
|
||||
@ -77,8 +77,8 @@ func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (
|
||||
return model.TokenViewToModel(token), nil
|
||||
}
|
||||
|
||||
func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence)
|
||||
func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []models.EventType) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -39,12 +39,12 @@ func (repo *UserRepo) UserSessionUserIDsByAgentID(ctx context.Context, agentID s
|
||||
return userIDs, nil
|
||||
}
|
||||
|
||||
func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, sequence uint64) ([]*models.Event, error) {
|
||||
return repo.getUserEvents(ctx, id, sequence)
|
||||
func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []models.EventType) ([]*models.Event, error) {
|
||||
return repo.getUserEvents(ctx, id, sequence, eventTypes)
|
||||
}
|
||||
|
||||
func (r *UserRepo) getUserEvents(ctx context.Context, userID string, sequence uint64) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, authz.GetInstance(ctx).InstanceID(), sequence)
|
||||
func (r *UserRepo) getUserEvents(ctx context.Context, userID string, sequence uint64, eventTypes []models.EventType) ([]*models.Event, error) {
|
||||
query, err := usr_view.UserByIDQuery(userID, authz.GetInstance(ctx).InstanceID(), sequence, eventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ func (u *User) ProcessUser(event *es_models.Event) (err error) {
|
||||
"userID", event.AggregateID,
|
||||
"eventType", event.Type,
|
||||
).Info("user not found in view")
|
||||
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0)
|
||||
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0, user.EventTypes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -191,7 +191,7 @@ func (u *User) ProcessUser(event *es_models.Event) (err error) {
|
||||
"userID", event.AggregateID,
|
||||
"eventType", event.Type,
|
||||
).Info("user not found in view")
|
||||
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0)
|
||||
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0, user.EventTypes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func (v *View) userByID(ctx context.Context, instanceID string, queries ...query
|
||||
}
|
||||
}
|
||||
|
||||
query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence)
|
||||
query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence, user.EventTypes())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st
|
||||
}
|
||||
}
|
||||
|
||||
events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence)
|
||||
events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence, token.GetRelevantEventTypes())
|
||||
if caos_errs.IsNotFound(viewErr) && len(events) == 0 {
|
||||
return nil, caos_errs.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
|
||||
}
|
||||
@ -238,10 +238,10 @@ func (repo *TokenVerifierRepo) VerifierClientID(ctx context.Context, appName str
|
||||
return clientID, app.ProjectID, nil
|
||||
}
|
||||
|
||||
func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64) (_ []*models.Event, err error) {
|
||||
func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []models.EventType) (_ []*models.Event, err error) {
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence)
|
||||
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -155,3 +155,14 @@ func (t *RefreshTokenView) appendRenewedEvent(event *es_models.Event) error {
|
||||
func (t *RefreshTokenView) appendRemovedEvent(event *es_models.Event) {
|
||||
t.Expiration = event.CreationDate
|
||||
}
|
||||
|
||||
func (t *RefreshTokenView) GetRelevantEventTypes() []es_models.EventType {
|
||||
return []es_models.EventType{
|
||||
es_models.EventType(user_repo.HumanRefreshTokenAddedType),
|
||||
es_models.EventType(user_repo.HumanRefreshTokenRenewedType),
|
||||
es_models.EventType(user_repo.HumanRefreshTokenRemovedType),
|
||||
es_models.EventType(user_repo.UserRemovedType),
|
||||
es_models.EventType(user_repo.UserDeactivatedType),
|
||||
es_models.EventType(user_repo.UserLockedType),
|
||||
}
|
||||
}
|
||||
|
@ -182,6 +182,23 @@ func (t *TokenView) appendPATRemoved(event *es_models.Event) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TokenView) GetRelevantEventTypes() []es_models.EventType {
|
||||
return []es_models.EventType{
|
||||
es_models.EventType(user_repo.UserTokenAddedType),
|
||||
es_models.EventType(user_repo.PersonalAccessTokenAddedType),
|
||||
es_models.EventType(user_repo.UserTokenRemovedType),
|
||||
es_models.EventType(user_repo.HumanRefreshTokenRemovedType),
|
||||
es_models.EventType(user_repo.UserV1SignedOutType),
|
||||
es_models.EventType(user_repo.HumanSignedOutType),
|
||||
es_models.EventType(user_repo.UserRemovedType),
|
||||
es_models.EventType(user_repo.UserDeactivatedType),
|
||||
es_models.EventType(user_repo.UserLockedType),
|
||||
es_models.EventType(user_repo.UserLockedType),
|
||||
es_models.EventType(user_repo.UserReactivatedType),
|
||||
es_models.EventType(user_repo.PersonalAccessTokenRemovedType),
|
||||
}
|
||||
}
|
||||
|
||||
func eventToMap(event *es_models.Event) (map[string]interface{}, error) {
|
||||
m := make(map[string]interface{})
|
||||
if err := json.Unmarshal(event.Data, &m); err != nil {
|
||||
|
@ -534,3 +534,59 @@ func (u *UserView) SetEmptyUserType() {
|
||||
u.HumanView = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UserView) EventTypes() []models.EventType {
|
||||
return []models.EventType{
|
||||
models.EventType(user.MachineAddedEventType),
|
||||
models.EventType(user.UserV1AddedType),
|
||||
models.EventType(user.UserV1RegisteredType),
|
||||
models.EventType(user.HumanRegisteredType),
|
||||
models.EventType(user.HumanAddedType),
|
||||
models.EventType(user.UserRemovedType),
|
||||
models.EventType(user.UserV1PasswordChangedType),
|
||||
models.EventType(user.HumanPasswordChangedType),
|
||||
models.EventType(user.HumanPasswordlessTokenAddedType),
|
||||
models.EventType(user.HumanPasswordlessTokenVerifiedType),
|
||||
models.EventType(user.HumanPasswordlessTokenRemovedType),
|
||||
models.EventType(user.UserV1ProfileChangedType),
|
||||
models.EventType(user.HumanProfileChangedType),
|
||||
models.EventType(user.UserV1AddressChangedType),
|
||||
models.EventType(user.HumanAddressChangedType),
|
||||
models.EventType(user.MachineChangedEventType),
|
||||
models.EventType(user.UserDomainClaimedType),
|
||||
models.EventType(user.UserUserNameChangedType),
|
||||
models.EventType(user.UserV1EmailChangedType),
|
||||
models.EventType(user.HumanEmailChangedType),
|
||||
models.EventType(user.UserV1EmailVerifiedType),
|
||||
models.EventType(user.HumanEmailVerifiedType),
|
||||
models.EventType(user.UserV1PhoneChangedType),
|
||||
models.EventType(user.HumanPhoneChangedType),
|
||||
models.EventType(user.UserV1PhoneVerifiedType),
|
||||
models.EventType(user.HumanPhoneVerifiedType),
|
||||
models.EventType(user.UserV1PhoneRemovedType),
|
||||
models.EventType(user.HumanPhoneRemovedType),
|
||||
models.EventType(user.UserDeactivatedType),
|
||||
models.EventType(user.UserReactivatedType),
|
||||
models.EventType(user.UserUnlockedType),
|
||||
models.EventType(user.UserLockedType),
|
||||
models.EventType(user.UserV1MFAOTPAddedType),
|
||||
models.EventType(user.HumanMFAOTPAddedType),
|
||||
models.EventType(user.UserV1MFAOTPVerifiedType),
|
||||
models.EventType(user.HumanMFAOTPVerifiedType),
|
||||
models.EventType(user.UserV1MFAOTPRemovedType),
|
||||
models.EventType(user.HumanMFAOTPRemovedType),
|
||||
models.EventType(user.HumanU2FTokenAddedType),
|
||||
models.EventType(user.HumanU2FTokenVerifiedType),
|
||||
models.EventType(user.HumanU2FTokenRemovedType),
|
||||
models.EventType(user.UserV1MFAInitSkippedType),
|
||||
models.EventType(user.HumanMFAInitSkippedType),
|
||||
models.EventType(user.UserV1InitialCodeAddedType),
|
||||
models.EventType(user.HumanInitialCodeAddedType),
|
||||
models.EventType(user.UserV1InitializedCheckSucceededType),
|
||||
models.EventType(user.HumanInitializedCheckSucceededType),
|
||||
models.EventType(user.HumanAvatarAddedType),
|
||||
models.EventType(user.HumanAvatarRemovedType),
|
||||
models.EventType(user.HumanPasswordlessInitCodeAddedType),
|
||||
models.EventType(user.HumanPasswordlessInitCodeRequestedType),
|
||||
}
|
||||
}
|
||||
|
@ -198,3 +198,37 @@ func avatarKeyFromEvent(event *models.Event) (string, error) {
|
||||
}
|
||||
return data["storeKey"], nil
|
||||
}
|
||||
|
||||
func (v *UserSessionView) EventTypes() []models.EventType {
|
||||
return []models.EventType{
|
||||
models.EventType(user.UserV1PasswordCheckSucceededType),
|
||||
models.EventType(user.HumanPasswordCheckSucceededType),
|
||||
models.EventType(user.UserIDPLoginCheckSucceededType),
|
||||
models.EventType(user.HumanPasswordlessTokenCheckSucceededType),
|
||||
models.EventType(user.HumanPasswordlessTokenCheckFailedType),
|
||||
models.EventType(user.HumanPasswordlessTokenRemovedType),
|
||||
models.EventType(user.UserV1PasswordCheckFailedType),
|
||||
models.EventType(user.HumanPasswordCheckFailedType),
|
||||
models.EventType(user.UserV1PasswordChangedType),
|
||||
models.EventType(user.HumanPasswordChangedType),
|
||||
models.EventType(user.HumanMFAOTPVerifiedType),
|
||||
models.EventType(user.UserV1MFAOTPCheckSucceededType),
|
||||
models.EventType(user.HumanMFAOTPCheckSucceededType),
|
||||
models.EventType(user.UserV1MFAOTPCheckFailedType),
|
||||
models.EventType(user.UserV1MFAOTPRemovedType),
|
||||
models.EventType(user.HumanMFAOTPCheckFailedType),
|
||||
models.EventType(user.HumanMFAOTPRemovedType),
|
||||
models.EventType(user.HumanU2FTokenCheckFailedType),
|
||||
models.EventType(user.HumanU2FTokenRemovedType),
|
||||
models.EventType(user.HumanU2FTokenVerifiedType),
|
||||
models.EventType(user.HumanU2FTokenCheckSucceededType),
|
||||
models.EventType(user.UserV1SignedOutType),
|
||||
models.EventType(user.HumanSignedOutType),
|
||||
models.EventType(user.UserLockedType),
|
||||
models.EventType(user.UserDeactivatedType),
|
||||
models.EventType(user.UserIDPLinkRemovedType),
|
||||
models.EventType(user.UserIDPLinkCascadeRemovedType),
|
||||
models.EventType(user.HumanAvatarAddedType),
|
||||
models.EventType(user.HumanAvatarRemovedType),
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/repository/user"
|
||||
)
|
||||
|
||||
func UserByIDQuery(id, instanceID string, latestSequence uint64) (*es_models.SearchQuery, error) {
|
||||
func UserByIDQuery(id, instanceID string, latestSequence uint64, eventTypes []es_models.EventType) (*es_models.SearchQuery, error) {
|
||||
if id == "" {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-d8isw", "Errors.User.UserIDMissing")
|
||||
}
|
||||
@ -14,6 +14,7 @@ func UserByIDQuery(id, instanceID string, latestSequence uint64) (*es_models.Sea
|
||||
AddQuery().
|
||||
AggregateTypeFilter(user.AggregateType).
|
||||
AggregateIDFilter(id).
|
||||
EventTypesFilter(eventTypes...).
|
||||
LatestSequenceFilter(latestSequence).
|
||||
InstanceIDFilter(instanceID).
|
||||
SearchQuery(), nil
|
||||
|
Loading…
Reference in New Issue
Block a user