From 19d9b8ad411d1c6132f921a2294e75bf9488233d Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Thu, 14 Dec 2023 12:07:47 +0200 Subject: [PATCH] fix: reduce eventual consistency (#7075) * fix: reduce eventual consistency * fix tests * fix linting --- .../eventsourcing/eventstore/auth_request.go | 20 +++++++++++------- .../eventstore/auth_request_test.go | 4 ++-- .../eventsourcing/eventstore/refresh_token.go | 21 +++++++++++-------- .../eventsourcing/eventstore/token.go | 19 ++++++++++------- .../eventsourcing/eventstore/user.go | 11 ++++------ .../repository/eventsourcing/handler/user.go | 3 ++- .../repository/eventsourcing/view/user.go | 15 +++++++------ .../eventstore/token_verifier.go | 8 +++---- internal/user/repository/view/query.go | 6 ++++-- 9 files changed, 60 insertions(+), 47 deletions(-) diff --git a/internal/auth/repository/eventsourcing/eventstore/auth_request.go b/internal/auth/repository/eventsourcing/eventstore/auth_request.go index 5f5dbd083d..2d2c60e3cc 100644 --- a/internal/auth/repository/eventsourcing/eventstore/auth_request.go +++ b/internal/auth/repository/eventsourcing/eventstore/auth_request.go @@ -90,7 +90,7 @@ type idpUserLinksProvider interface { } type userEventProvider interface { - UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) + UserEventsByID(ctx context.Context, id string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) } type userCommandProvider interface { @@ -1458,21 +1458,25 @@ var ( func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eventProvider userEventProvider, agentID string, user *user_model.UserView) (*user_model.UserSessionView, error) { instanceID := authz.GetInstance(ctx).InstanceID() + + // always load the latest sequence first, so in case the session was not found by id, + // the sequence will be equal or lower than the actual projection and no events are lost + sequence, err := provider.GetLatestUserSessionSequence(ctx, instanceID) + logging.WithFields("instanceID", instanceID, "userID", user.ID). + OnError(err). + Errorf("could not get current sequence for userSessionByIDs") + session, err := provider.UserSessionByIDs(agentID, user.ID, instanceID) if err != nil { if !zerrors.IsNotFound(err) { return nil, err } - sequence, err := provider.GetLatestUserSessionSequence(ctx, instanceID) - logging.WithFields("instanceID", instanceID, "userID", user.ID). - OnError(err). - Errorf("could not get current sequence for userSessionByIDs") session = &user_view_model.UserSessionView{UserAgentID: agentID, UserID: user.ID} if sequence != nil { - session.Sequence = sequence.Sequence + session.ChangeDate = sequence.EventCreatedAt } } - events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence, append(session.EventTypes(), userSessionEventTypes...)) + events, err := eventProvider.UserEventsByID(ctx, user.ID, session.ChangeDate, append(session.EventTypes(), userSessionEventTypes...)) if err != nil { logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events") return user_view_model.UserSessionToModel(session), nil @@ -1556,7 +1560,7 @@ func userByID(ctx context.Context, viewProvider userViewProvider, eventProvider } else if user == nil { user = new(user_view_model.UserView) } - events, err := eventProvider.UserEventsByID(ctx, userID, user.Sequence, user.EventTypes()) + events, err := eventProvider.UserEventsByID(ctx, userID, user.ChangeDate, user.EventTypes()) if err != nil { logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events") return user_view_model.UserToModel(user), nil diff --git a/internal/auth/repository/eventsourcing/eventstore/auth_request_test.go b/internal/auth/repository/eventsourcing/eventstore/auth_request_test.go index e57fb9c7d1..de0af84247 100644 --- a/internal/auth/repository/eventsourcing/eventstore/auth_request_test.go +++ b/internal/auth/repository/eventsourcing/eventstore/auth_request_test.go @@ -109,7 +109,7 @@ type mockEventUser struct { Event eventstore.Event } -func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []eventstore.EventType) ([]eventstore.Event, error) { +func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, changeDate time.Time, types []eventstore.EventType) ([]eventstore.Event, error) { if m.Event != nil { return []eventstore.Event{m.Event}, nil } @@ -126,7 +126,7 @@ func (m *mockEventUser) BulkAddExternalIDPs(ctx context.Context, userID string, type mockEventErrUser struct{} -func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []eventstore.EventType) ([]eventstore.Event, error) { +func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, changeDate time.Time, types []eventstore.EventType) ([]eventstore.Event, error) { return nil, zerrors.ThrowInternal(nil, "id", "internal error") } diff --git a/internal/auth/repository/eventsourcing/eventstore/refresh_token.go b/internal/auth/repository/eventsourcing/eventstore/refresh_token.go index 36808d7c6c..90b98aaf8c 100644 --- a/internal/auth/repository/eventsourcing/eventstore/refresh_token.go +++ b/internal/auth/repository/eventsourcing/eventstore/refresh_token.go @@ -42,26 +42,29 @@ func (r *RefreshTokenRepo) RefreshTokenByToken(ctx context.Context, refreshToken func (r *RefreshTokenRepo) RefreshTokenByID(ctx context.Context, tokenID, userID string) (*usr_model.RefreshTokenView, error) { instanceID := authz.GetInstance(ctx).InstanceID() + + // always load the latest sequence first, so in case the token was not found by id, + // the sequence will be equal or lower than the actual projection and no events are lost + sequence, err := r.View.GetLatestRefreshTokenSequence(ctx) + logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID). + OnError(err). + Errorf("could not get current sequence for RefreshTokenByID") + tokenView, viewErr := r.View.RefreshTokenByID(tokenID, instanceID) if viewErr != nil && !zerrors.IsNotFound(viewErr) { return nil, viewErr } if zerrors.IsNotFound(viewErr) { - sequence, err := r.View.GetLatestRefreshTokenSequence(ctx) - logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID). - OnError(err). - Errorf("could not get current sequence for RefreshTokenByID") - tokenView = new(model.RefreshTokenView) tokenView.ID = tokenID tokenView.UserID = userID tokenView.InstanceID = instanceID if sequence != nil { - tokenView.Sequence = sequence.Sequence + tokenView.ChangeDate = sequence.EventCreatedAt } } - events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence, tokenView.GetRelevantEventTypes()) + events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.ChangeDate, tokenView.GetRelevantEventTypes()) if zerrors.IsNotFound(viewErr) && len(events) == 0 { return nil, zerrors.ThrowNotFound(nil, "EVENT-BHB52", "Errors.User.RefreshToken.Invalid") } @@ -105,8 +108,8 @@ func (r *RefreshTokenRepo) SearchMyRefreshTokens(ctx context.Context, userID str }, nil } -func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { - query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes) +func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { + query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes) if err != nil { return nil, err } diff --git a/internal/auth/repository/eventsourcing/eventstore/token.go b/internal/auth/repository/eventsourcing/eventstore/token.go index 820ecdba2b..ba58dfd4d7 100644 --- a/internal/auth/repository/eventsourcing/eventstore/token.go +++ b/internal/auth/repository/eventsourcing/eventstore/token.go @@ -24,26 +24,29 @@ type TokenRepo struct { func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (*usr_model.TokenView, error) { instanceID := authz.GetInstance(ctx).InstanceID() + // always load the latest sequence first, so in case the token was not found by id, + // the sequence will be equal or lower than the actual projection and no events are lost + sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID) + logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID). + OnError(err). + Errorf("could not get current sequence for TokenByIDs") + token, viewErr := repo.View.TokenByIDs(tokenID, userID, instanceID) if viewErr != nil && !zerrors.IsNotFound(viewErr) { return nil, viewErr } if zerrors.IsNotFound(viewErr) { - sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID) - logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID). - OnError(err). - Errorf("could not get current sequence for TokenByIDs") token = new(model.TokenView) token.ID = tokenID token.UserID = userID token.InstanceID = instanceID if sequence != nil { - token.Sequence = sequence.Sequence + token.ChangeDate = sequence.EventCreatedAt } } - events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence, token.GetRelevantEventTypes()) + events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.ChangeDate, token.GetRelevantEventTypes()) if zerrors.IsNotFound(viewErr) && len(events) == 0 { return nil, zerrors.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound") } @@ -65,8 +68,8 @@ func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) ( return model.TokenViewToModel(token), nil } -func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { - query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes) +func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { + query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes) if err != nil { return nil, err } diff --git a/internal/auth/repository/eventsourcing/eventstore/user.go b/internal/auth/repository/eventsourcing/eventstore/user.go index 288527dcef..83f09be6ae 100644 --- a/internal/auth/repository/eventsourcing/eventstore/user.go +++ b/internal/auth/repository/eventsourcing/eventstore/user.go @@ -2,6 +2,7 @@ package eventstore import ( "context" + "time" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" @@ -38,14 +39,10 @@ func (repo *UserRepo) UserSessionUserIDsByAgentID(ctx context.Context, agentID s return userIDs, nil } -func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { - return repo.getUserEvents(ctx, id, sequence, eventTypes) -} - -func (r *UserRepo) getUserEvents(ctx context.Context, userID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { - query, err := usr_view.UserByIDQuery(userID, authz.GetInstance(ctx).InstanceID(), sequence, eventTypes) +func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) { + query, err := usr_view.UserByIDQuery(id, authz.GetInstance(ctx).InstanceID(), changeDate, eventTypes) if err != nil { return nil, err } - return r.Eventstore.Filter(ctx, query) + return repo.Eventstore.Filter(ctx, query) //nolint:staticcheck } diff --git a/internal/auth/repository/eventsourcing/handler/user.go b/internal/auth/repository/eventsourcing/handler/user.go index 90fd37d80f..c93dc06477 100644 --- a/internal/auth/repository/eventsourcing/handler/user.go +++ b/internal/auth/repository/eventsourcing/handler/user.go @@ -2,6 +2,7 @@ package handler import ( "context" + "time" "github.com/zitadel/zitadel/internal/api/authz" auth_view "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" @@ -546,7 +547,7 @@ func (u *User) loginNameInformation(ctx context.Context, orgID string, instanceI } func (u *User) userFromEventstore(agg *eventstore.Aggregate, eventTypes []eventstore.EventType) (*view_model.UserView, error) { - query, err := usr_view.UserByIDQuery(agg.ID, agg.InstanceID, 0, eventTypes) + query, err := usr_view.UserByIDQuery(agg.ID, agg.InstanceID, time.Time{}, eventTypes) if err != nil { return nil, err } diff --git a/internal/auth/repository/eventsourcing/view/user.go b/internal/auth/repository/eventsourcing/view/user.go index d2101dcfd1..8479924f9a 100644 --- a/internal/auth/repository/eventsourcing/view/user.go +++ b/internal/auth/repository/eventsourcing/view/user.go @@ -97,23 +97,26 @@ func (v *View) userByID(ctx context.Context, instanceID string, queries ...query return nil, err } + // always load the latest sequence first, so in case the user was not found by id, + // the sequence will be equal or lower than the actual projection and no events are lost + sequence, err := v.GetLatestUserSequence(ctx, instanceID) + logging.WithFields("instanceID", instanceID). + OnError(err). + Errorf("could not get current sequence for userByID") + user, err := view.UserByID(v.Db, userTable, queriedUser.ID, instanceID) if err != nil && !zerrors.IsNotFound(err) { return nil, err } if err != nil { - sequence, err := v.GetLatestUserSequence(ctx, instanceID) - logging.WithFields("instanceID", instanceID). - OnError(err). - Errorf("could not get current sequence for userByID") user = new(model.UserView) if sequence != nil { - user.Sequence = sequence.Sequence + user.ChangeDate = sequence.EventCreatedAt } } - query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence, user.EventTypes()) + query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.ChangeDate, user.EventTypes()) if err != nil { return nil, err } diff --git a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go index 80dda82724..2ddcc3cf93 100644 --- a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go +++ b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go @@ -61,11 +61,11 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st token.ID = tokenID token.UserID = userID if sequence != nil { - token.Sequence = sequence.Sequence + token.ChangeDate = sequence.EventCreatedAt } } - events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence, token.GetRelevantEventTypes()) + events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.ChangeDate, token.GetRelevantEventTypes()) if zerrors.IsNotFound(viewErr) && len(events) == 0 { return nil, zerrors.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound") } @@ -247,10 +247,10 @@ func (repo *TokenVerifierRepo) VerifierClientID(ctx context.Context, appName str return clientID, app.ProjectID, nil } -func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) (_ []eventstore.Event, err error) { +func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) (_ []eventstore.Event, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() - query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes) + query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes) if err != nil { return nil, err } diff --git a/internal/user/repository/view/query.go b/internal/user/repository/view/query.go index d88c31da44..3285c11234 100644 --- a/internal/user/repository/view/query.go +++ b/internal/user/repository/view/query.go @@ -1,19 +1,21 @@ package view import ( + "time" + "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/repository/user" "github.com/zitadel/zitadel/internal/zerrors" ) -func UserByIDQuery(id, instanceID string, sequence uint64, eventTypes []eventstore.EventType) (*eventstore.SearchQueryBuilder, error) { +func UserByIDQuery(id, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) (*eventstore.SearchQueryBuilder, error) { if id == "" { return nil, zerrors.ThrowPreconditionFailed(nil, "EVENT-d8isw", "Errors.User.UserIDMissing") } return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). AwaitOpenTransactions(). InstanceID(instanceID). - SequenceGreater(sequence). + CreationDateAfter(changeDate.Add(-1 * time.Microsecond)). // to simulate CreationDate >= AddQuery(). AggregateTypes(user.AggregateType). AggregateIDs(id).