fix: reduce eventual consistency (#7075)

* fix: reduce eventual consistency

* fix tests

* fix linting
This commit is contained in:
Livio Spring 2023-12-14 12:07:47 +02:00 committed by GitHub
parent 51ebf7da8d
commit 19d9b8ad41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 47 deletions

View File

@ -90,7 +90,7 @@ type idpUserLinksProvider interface {
}
type userEventProvider interface {
UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error)
UserEventsByID(ctx context.Context, id string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error)
}
type userCommandProvider interface {
@ -1458,21 +1458,25 @@ var (
func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eventProvider userEventProvider, agentID string, user *user_model.UserView) (*user_model.UserSessionView, error) {
instanceID := authz.GetInstance(ctx).InstanceID()
// always load the latest sequence first, so in case the session was not found by id,
// the sequence will be equal or lower than the actual projection and no events are lost
sequence, err := provider.GetLatestUserSessionSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", user.ID).
OnError(err).
Errorf("could not get current sequence for userSessionByIDs")
session, err := provider.UserSessionByIDs(agentID, user.ID, instanceID)
if err != nil {
if !zerrors.IsNotFound(err) {
return nil, err
}
sequence, err := provider.GetLatestUserSessionSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", user.ID).
OnError(err).
Errorf("could not get current sequence for userSessionByIDs")
session = &user_view_model.UserSessionView{UserAgentID: agentID, UserID: user.ID}
if sequence != nil {
session.Sequence = sequence.Sequence
session.ChangeDate = sequence.EventCreatedAt
}
}
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence, append(session.EventTypes(), userSessionEventTypes...))
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.ChangeDate, append(session.EventTypes(), userSessionEventTypes...))
if err != nil {
logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events")
return user_view_model.UserSessionToModel(session), nil
@ -1556,7 +1560,7 @@ func userByID(ctx context.Context, viewProvider userViewProvider, eventProvider
} else if user == nil {
user = new(user_view_model.UserView)
}
events, err := eventProvider.UserEventsByID(ctx, userID, user.Sequence, user.EventTypes())
events, err := eventProvider.UserEventsByID(ctx, userID, user.ChangeDate, user.EventTypes())
if err != nil {
logging.WithFields("traceID", tracing.TraceIDFromCtx(ctx)).WithError(err).Debug("error retrieving new events")
return user_view_model.UserToModel(user), nil

View File

@ -109,7 +109,7 @@ type mockEventUser struct {
Event eventstore.Event
}
func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []eventstore.EventType) ([]eventstore.Event, error) {
func (m *mockEventUser) UserEventsByID(ctx context.Context, id string, changeDate time.Time, types []eventstore.EventType) ([]eventstore.Event, error) {
if m.Event != nil {
return []eventstore.Event{m.Event}, nil
}
@ -126,7 +126,7 @@ func (m *mockEventUser) BulkAddExternalIDPs(ctx context.Context, userID string,
type mockEventErrUser struct{}
func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, sequence uint64, types []eventstore.EventType) ([]eventstore.Event, error) {
func (m *mockEventErrUser) UserEventsByID(ctx context.Context, id string, changeDate time.Time, types []eventstore.EventType) ([]eventstore.Event, error) {
return nil, zerrors.ThrowInternal(nil, "id", "internal error")
}

View File

@ -42,26 +42,29 @@ func (r *RefreshTokenRepo) RefreshTokenByToken(ctx context.Context, refreshToken
func (r *RefreshTokenRepo) RefreshTokenByID(ctx context.Context, tokenID, userID string) (*usr_model.RefreshTokenView, error) {
instanceID := authz.GetInstance(ctx).InstanceID()
// always load the latest sequence first, so in case the token was not found by id,
// the sequence will be equal or lower than the actual projection and no events are lost
sequence, err := r.View.GetLatestRefreshTokenSequence(ctx)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for RefreshTokenByID")
tokenView, viewErr := r.View.RefreshTokenByID(tokenID, instanceID)
if viewErr != nil && !zerrors.IsNotFound(viewErr) {
return nil, viewErr
}
if zerrors.IsNotFound(viewErr) {
sequence, err := r.View.GetLatestRefreshTokenSequence(ctx)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for RefreshTokenByID")
tokenView = new(model.RefreshTokenView)
tokenView.ID = tokenID
tokenView.UserID = userID
tokenView.InstanceID = instanceID
if sequence != nil {
tokenView.Sequence = sequence.Sequence
tokenView.ChangeDate = sequence.EventCreatedAt
}
}
events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence, tokenView.GetRelevantEventTypes())
events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.ChangeDate, tokenView.GetRelevantEventTypes())
if zerrors.IsNotFound(viewErr) && len(events) == 0 {
return nil, zerrors.ThrowNotFound(nil, "EVENT-BHB52", "Errors.User.RefreshToken.Invalid")
}
@ -105,8 +108,8 @@ func (r *RefreshTokenRepo) SearchMyRefreshTokens(ctx context.Context, userID str
}, nil
}
func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
func (r *RefreshTokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes)
if err != nil {
return nil, err
}

View File

@ -24,26 +24,29 @@ type TokenRepo struct {
func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (*usr_model.TokenView, error) {
instanceID := authz.GetInstance(ctx).InstanceID()
// always load the latest sequence first, so in case the token was not found by id,
// the sequence will be equal or lower than the actual projection and no events are lost
sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for TokenByIDs")
token, viewErr := repo.View.TokenByIDs(tokenID, userID, instanceID)
if viewErr != nil && !zerrors.IsNotFound(viewErr) {
return nil, viewErr
}
if zerrors.IsNotFound(viewErr) {
sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for TokenByIDs")
token = new(model.TokenView)
token.ID = tokenID
token.UserID = userID
token.InstanceID = instanceID
if sequence != nil {
token.Sequence = sequence.Sequence
token.ChangeDate = sequence.EventCreatedAt
}
}
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence, token.GetRelevantEventTypes())
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.ChangeDate, token.GetRelevantEventTypes())
if zerrors.IsNotFound(viewErr) && len(events) == 0 {
return nil, zerrors.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
}
@ -65,8 +68,8 @@ func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (
return model.TokenViewToModel(token), nil
}
func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
func (r *TokenRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes)
if err != nil {
return nil, err
}

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
@ -38,14 +39,10 @@ func (repo *UserRepo) UserSessionUserIDsByAgentID(ctx context.Context, agentID s
return userIDs, nil
}
func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
return repo.getUserEvents(ctx, id, sequence, eventTypes)
}
func (r *UserRepo) getUserEvents(ctx context.Context, userID string, sequence uint64, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(userID, authz.GetInstance(ctx).InstanceID(), sequence, eventTypes)
func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, changeDate time.Time, eventTypes []eventstore.EventType) ([]eventstore.Event, error) {
query, err := usr_view.UserByIDQuery(id, authz.GetInstance(ctx).InstanceID(), changeDate, eventTypes)
if err != nil {
return nil, err
}
return r.Eventstore.Filter(ctx, query)
return repo.Eventstore.Filter(ctx, query) //nolint:staticcheck
}

View File

@ -2,6 +2,7 @@ package handler
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/api/authz"
auth_view "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
@ -546,7 +547,7 @@ func (u *User) loginNameInformation(ctx context.Context, orgID string, instanceI
}
func (u *User) userFromEventstore(agg *eventstore.Aggregate, eventTypes []eventstore.EventType) (*view_model.UserView, error) {
query, err := usr_view.UserByIDQuery(agg.ID, agg.InstanceID, 0, eventTypes)
query, err := usr_view.UserByIDQuery(agg.ID, agg.InstanceID, time.Time{}, eventTypes)
if err != nil {
return nil, err
}

View File

@ -97,23 +97,26 @@ func (v *View) userByID(ctx context.Context, instanceID string, queries ...query
return nil, err
}
// always load the latest sequence first, so in case the user was not found by id,
// the sequence will be equal or lower than the actual projection and no events are lost
sequence, err := v.GetLatestUserSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID).
OnError(err).
Errorf("could not get current sequence for userByID")
user, err := view.UserByID(v.Db, userTable, queriedUser.ID, instanceID)
if err != nil && !zerrors.IsNotFound(err) {
return nil, err
}
if err != nil {
sequence, err := v.GetLatestUserSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID).
OnError(err).
Errorf("could not get current sequence for userByID")
user = new(model.UserView)
if sequence != nil {
user.Sequence = sequence.Sequence
user.ChangeDate = sequence.EventCreatedAt
}
}
query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence, user.EventTypes())
query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.ChangeDate, user.EventTypes())
if err != nil {
return nil, err
}

View File

@ -61,11 +61,11 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st
token.ID = tokenID
token.UserID = userID
if sequence != nil {
token.Sequence = sequence.Sequence
token.ChangeDate = sequence.EventCreatedAt
}
}
events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.Sequence, token.GetRelevantEventTypes())
events, esErr := repo.getUserEvents(ctx, userID, instanceID, token.ChangeDate, token.GetRelevantEventTypes())
if zerrors.IsNotFound(viewErr) && len(events) == 0 {
return nil, zerrors.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
}
@ -247,10 +247,10 @@ func (repo *TokenVerifierRepo) VerifierClientID(ctx context.Context, appName str
return clientID, app.ProjectID, nil
}
func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, sequence uint64, eventTypes []eventstore.EventType) (_ []eventstore.Event, err error) {
func (repo *TokenVerifierRepo) getUserEvents(ctx context.Context, userID, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) (_ []eventstore.Event, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
query, err := usr_view.UserByIDQuery(userID, instanceID, sequence, eventTypes)
query, err := usr_view.UserByIDQuery(userID, instanceID, changeDate, eventTypes)
if err != nil {
return nil, err
}

View File

@ -1,19 +1,21 @@
package view
import (
"time"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/user"
"github.com/zitadel/zitadel/internal/zerrors"
)
func UserByIDQuery(id, instanceID string, sequence uint64, eventTypes []eventstore.EventType) (*eventstore.SearchQueryBuilder, error) {
func UserByIDQuery(id, instanceID string, changeDate time.Time, eventTypes []eventstore.EventType) (*eventstore.SearchQueryBuilder, error) {
if id == "" {
return nil, zerrors.ThrowPreconditionFailed(nil, "EVENT-d8isw", "Errors.User.UserIDMissing")
}
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AwaitOpenTransactions().
InstanceID(instanceID).
SequenceGreater(sequence).
CreationDateAfter(changeDate.Add(-1 * time.Microsecond)). // to simulate CreationDate >=
AddQuery().
AggregateTypes(user.AggregateType).
AggregateIDs(id).