fix: use current sequence for refetching of events (#5772)

* fix: use current sequence for refetching of events

* fix: use client ids
This commit is contained in:
Livio Spring 2023-04-28 16:28:13 +02:00 committed by GitHub
parent c8c5cf3c5f
commit 458a383de2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 273 additions and 107 deletions

View File

@ -65,16 +65,16 @@ func (_ *Styling) AggregateTypes() []models.AggregateType {
return []models.AggregateType{org.AggregateType, instance.AggregateType} return []models.AggregateType{org.AggregateType, instance.AggregateType}
} }
func (m *Styling) CurrentSequence(instanceID string) (uint64, error) { func (m *Styling) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
sequence, err := m.view.GetLatestStylingSequence(instanceID) sequence, err := m.view.GetLatestStylingSequence(ctx, instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (m *Styling) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { func (m *Styling) EventQuery(ctx context.Context, instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := m.view.GetLatestStylingSequences(instanceIDs) sequences, err := m.view.GetLatestStylingSequences(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package view package view
import ( import (
"context"
"time" "time"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
@ -15,12 +16,12 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate) return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate)
} }
func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { func (v *View) latestSequence(ctx context.Context, viewName, instanceID string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, v.TimeTravel(ctx, sequencesTable), viewName, instanceID)
} }
func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) latestSequences(ctx context.Context, viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) return repository.LatestSequences(v.Db, v.TimeTravel(ctx, sequencesTable), viewName, instanceIDs)
} }
func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) { func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) {

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/iam/repository/view" "github.com/zitadel/zitadel/internal/iam/repository/view"
"github.com/zitadel/zitadel/internal/iam/repository/view/model" "github.com/zitadel/zitadel/internal/iam/repository/view/model"
@ -39,12 +41,12 @@ func (v *View) UpdateOrgOwnerRemovedStyling(event *models.Event) error {
return v.ProcessedStylingSequence(event) return v.ProcessedStylingSequence(event)
} }
func (v *View) GetLatestStylingSequence(instanceID string) (*global_view.CurrentSequence, error) { func (v *View) GetLatestStylingSequence(ctx context.Context, instanceID string) (*global_view.CurrentSequence, error) {
return v.latestSequence(stylingTyble, instanceID) return v.latestSequence(ctx, stylingTyble, instanceID)
} }
func (v *View) GetLatestStylingSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { func (v *View) GetLatestStylingSequences(ctx context.Context, instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(stylingTyble, instanceIDs) return v.latestSequences(ctx, stylingTyble, instanceIDs)
} }
func (v *View) ProcessedStylingSequence(event *models.Event) error { func (v *View) ProcessedStylingSequence(event *models.Event) error {

View File

@ -1,12 +1,17 @@
package view package view
import ( import (
"context"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
) )
type View struct { type View struct {
Db *gorm.DB Db *gorm.DB
client *database.DB
} }
func StartView(sqlClient *database.DB) (*View, error) { func StartView(sqlClient *database.DB) (*View, error) {
@ -15,10 +20,15 @@ func StartView(sqlClient *database.DB) (*View, error) {
return nil, err return nil, err
} }
return &View{ return &View{
Db: gorm, Db: gorm,
client: sqlClient,
}, nil }, nil
} }
func (v *View) Health() (err error) { func (v *View) Health() (err error) {
return v.Db.DB().Ping() return v.Db.DB().Ping()
} }
func (v *View) TimeTravel(ctx context.Context, tableName string) string {
return tableName + v.client.Timetravel(call.Took(ctx))
}

View File

@ -23,6 +23,7 @@ import (
"github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/telemetry/tracing"
user_model "github.com/zitadel/zitadel/internal/user/model" user_model "github.com/zitadel/zitadel/internal/user/model"
user_view_model "github.com/zitadel/zitadel/internal/user/repository/view/model" user_view_model "github.com/zitadel/zitadel/internal/user/repository/view/model"
"github.com/zitadel/zitadel/internal/view/repository"
) )
const unknownUserID = "UNKNOWN" const unknownUserID = "UNKNOWN"
@ -64,7 +65,9 @@ type privacyPolicyProvider interface {
type userSessionViewProvider interface { type userSessionViewProvider interface {
UserSessionByIDs(string, string, string) (*user_view_model.UserSessionView, error) UserSessionByIDs(string, string, string) (*user_view_model.UserSessionView, error)
UserSessionsByAgentID(string, string) ([]*user_view_model.UserSessionView, error) UserSessionsByAgentID(string, string) ([]*user_view_model.UserSessionView, error)
GetLatestUserSessionSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error)
} }
type userViewProvider interface { type userViewProvider interface {
UserByID(string, string) (*user_view_model.UserView, error) UserByID(string, string) (*user_view_model.UserView, error)
} }
@ -654,7 +657,7 @@ func (repo *AuthRequestRepo) checkLoginName(ctx context.Context, request *domain
preferredLoginName += "@" + request.RequestedPrimaryDomain preferredLoginName += "@" + request.RequestedPrimaryDomain
} }
} }
user, err = repo.checkLoginNameInputForResourceOwner(request, preferredLoginName) user, err = repo.checkLoginNameInputForResourceOwner(ctx, request, preferredLoginName)
} else { } else {
user, err = repo.checkLoginNameInput(ctx, request, preferredLoginName) user, err = repo.checkLoginNameInput(ctx, request, preferredLoginName)
} }
@ -729,12 +732,12 @@ func (repo *AuthRequestRepo) checkDomainDiscovery(ctx context.Context, request *
func (repo *AuthRequestRepo) checkLoginNameInput(ctx context.Context, request *domain.AuthRequest, loginNameInput string) (*user_view_model.UserView, error) { func (repo *AuthRequestRepo) checkLoginNameInput(ctx context.Context, request *domain.AuthRequest, loginNameInput string) (*user_view_model.UserView, error) {
// always check the loginname first // always check the loginname first
user, err := repo.View.UserByLoginName(loginNameInput, request.InstanceID) user, err := repo.View.UserByLoginName(ctx, loginNameInput, request.InstanceID)
if err == nil { if err == nil {
// and take the user regardless if there would be a user with that email or phone // and take the user regardless if there would be a user with that email or phone
return user, repo.checkLoginPolicyWithResourceOwner(ctx, request, user.ResourceOwner) return user, repo.checkLoginPolicyWithResourceOwner(ctx, request, user.ResourceOwner)
} }
user, emailErr := repo.View.UserByEmail(loginNameInput, request.InstanceID) user, emailErr := repo.View.UserByEmail(ctx, loginNameInput, request.InstanceID)
if emailErr == nil { if emailErr == nil {
// if there was a single user with the specified email // if there was a single user with the specified email
// load and check the login policy // load and check the login policy
@ -747,7 +750,7 @@ func (repo *AuthRequestRepo) checkLoginNameInput(ctx context.Context, request *d
return user, nil return user, nil
} }
} }
user, phoneErr := repo.View.UserByPhone(loginNameInput, request.InstanceID) user, phoneErr := repo.View.UserByPhone(ctx, loginNameInput, request.InstanceID)
if phoneErr == nil { if phoneErr == nil {
// if there was a single user with the specified phone // if there was a single user with the specified phone
// load and check the login policy // load and check the login policy
@ -765,9 +768,9 @@ func (repo *AuthRequestRepo) checkLoginNameInput(ctx context.Context, request *d
return nil, err return nil, err
} }
func (repo *AuthRequestRepo) checkLoginNameInputForResourceOwner(request *domain.AuthRequest, loginNameInput string) (*user_view_model.UserView, error) { func (repo *AuthRequestRepo) checkLoginNameInputForResourceOwner(ctx context.Context, request *domain.AuthRequest, loginNameInput string) (*user_view_model.UserView, error) {
// always check the loginname first // always check the loginname first
user, err := repo.View.UserByLoginNameAndResourceOwner(loginNameInput, request.RequestedOrgID, request.InstanceID) user, err := repo.View.UserByLoginNameAndResourceOwner(ctx, loginNameInput, request.RequestedOrgID, request.InstanceID)
if err == nil { if err == nil {
// and take the user regardless if there would be a user with that email or phone // and take the user regardless if there would be a user with that email or phone
return user, nil return user, nil
@ -775,7 +778,7 @@ func (repo *AuthRequestRepo) checkLoginNameInputForResourceOwner(request *domain
if request.LoginPolicy != nil && !request.LoginPolicy.DisableLoginWithEmail { if request.LoginPolicy != nil && !request.LoginPolicy.DisableLoginWithEmail {
// if login by email is allowed and there was a single user with the specified email // if login by email is allowed and there was a single user with the specified email
// take that user (and ignore possible phone number matches) // take that user (and ignore possible phone number matches)
user, emailErr := repo.View.UserByEmailAndResourceOwner(loginNameInput, request.RequestedOrgID, request.InstanceID) user, emailErr := repo.View.UserByEmailAndResourceOwner(ctx, loginNameInput, request.RequestedOrgID, request.InstanceID)
if emailErr == nil { if emailErr == nil {
return user, nil return user, nil
} }
@ -783,7 +786,7 @@ func (repo *AuthRequestRepo) checkLoginNameInputForResourceOwner(request *domain
if request.LoginPolicy != nil && !request.LoginPolicy.DisableLoginWithPhone { if request.LoginPolicy != nil && !request.LoginPolicy.DisableLoginWithPhone {
// if login by phone is allowed and there was a single user with the specified phone // if login by phone is allowed and there was a single user with the specified phone
// take that user // take that user
user, phoneErr := repo.View.UserByPhoneAndResourceOwner(loginNameInput, request.RequestedOrgID, request.InstanceID) user, phoneErr := repo.View.UserByPhoneAndResourceOwner(ctx, loginNameInput, request.RequestedOrgID, request.InstanceID)
if phoneErr == nil { if phoneErr == nil {
return user, nil return user, nil
} }
@ -1298,12 +1301,20 @@ func userSessionsByUserAgentID(provider userSessionViewProvider, agentID, instan
} }
func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eventProvider userEventProvider, agentID string, user *user_model.UserView) (*user_model.UserSessionView, error) { func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eventProvider userEventProvider, agentID string, user *user_model.UserView) (*user_model.UserSessionView, error) {
session, err := provider.UserSessionByIDs(agentID, user.ID, authz.GetInstance(ctx).InstanceID()) instanceID := authz.GetInstance(ctx).InstanceID()
session, err := provider.UserSessionByIDs(agentID, user.ID, instanceID)
if err != nil { if err != nil {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
return nil, err return nil, err
} }
sequence, err := provider.GetLatestUserSessionSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", user.ID).
OnError(err).
Errorf("could not get current sequence for userSessionByIDs")
session = &user_view_model.UserSessionView{UserAgentID: agentID, UserID: user.ID} session = &user_view_model.UserSessionView{UserAgentID: agentID, UserID: user.ID}
if sequence != nil {
session.Sequence = sequence.CurrentSequence
}
} }
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence) events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence)
if err != nil { if err != nil {

View File

@ -19,6 +19,7 @@ import (
user_model "github.com/zitadel/zitadel/internal/user/model" user_model "github.com/zitadel/zitadel/internal/user/model"
user_es_model "github.com/zitadel/zitadel/internal/user/repository/eventsourcing/model" user_es_model "github.com/zitadel/zitadel/internal/user/repository/eventsourcing/model"
user_view_model "github.com/zitadel/zitadel/internal/user/repository/view/model" user_view_model "github.com/zitadel/zitadel/internal/user/repository/view/model"
"github.com/zitadel/zitadel/internal/view/repository"
) )
var ( var (
@ -35,6 +36,10 @@ func (m *mockViewNoUserSession) UserSessionsByAgentID(string, string) ([]*user_v
return nil, nil return nil, nil
} }
func (m *mockViewNoUserSession) GetLatestUserSessionSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return &repository.CurrentSequence{}, nil
}
type mockViewErrUserSession struct{} type mockViewErrUserSession struct{}
func (m *mockViewErrUserSession) UserSessionByIDs(string, string, string) (*user_view_model.UserSessionView, error) { func (m *mockViewErrUserSession) UserSessionByIDs(string, string, string) (*user_view_model.UserSessionView, error) {
@ -45,6 +50,10 @@ func (m *mockViewErrUserSession) UserSessionsByAgentID(string, string) ([]*user_
return nil, errors.ThrowInternal(nil, "id", "internal error") return nil, errors.ThrowInternal(nil, "id", "internal error")
} }
func (m *mockViewErrUserSession) GetLatestUserSessionSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return &repository.CurrentSequence{}, nil
}
type mockViewUserSession struct { type mockViewUserSession struct {
ExternalLoginVerification time.Time ExternalLoginVerification time.Time
PasswordlessVerification time.Time PasswordlessVerification time.Time
@ -82,6 +91,10 @@ func (m *mockViewUserSession) UserSessionsByAgentID(string, string) ([]*user_vie
return sessions, nil return sessions, nil
} }
func (m *mockViewUserSession) GetLatestUserSessionSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return &repository.CurrentSequence{}, nil
}
type mockViewNoUser struct{} type mockViewNoUser struct{}
func (m *mockViewNoUser) UserByID(string, string) (*user_view_model.UserView, error) { func (m *mockViewNoUser) UserByID(string, string) (*user_view_model.UserView, error) {

View File

@ -42,15 +42,24 @@ func (r *RefreshTokenRepo) RefreshTokenByToken(ctx context.Context, refreshToken
} }
func (r *RefreshTokenRepo) RefreshTokenByID(ctx context.Context, tokenID, userID string) (*usr_model.RefreshTokenView, error) { func (r *RefreshTokenRepo) RefreshTokenByID(ctx context.Context, tokenID, userID string) (*usr_model.RefreshTokenView, error) {
tokenView, viewErr := r.View.RefreshTokenByID(tokenID, authz.GetInstance(ctx).InstanceID()) instanceID := authz.GetInstance(ctx).InstanceID()
tokenView, viewErr := r.View.RefreshTokenByID(tokenID, instanceID)
if viewErr != nil && !errors.IsNotFound(viewErr) { if viewErr != nil && !errors.IsNotFound(viewErr) {
return nil, viewErr return nil, viewErr
} }
if errors.IsNotFound(viewErr) { if errors.IsNotFound(viewErr) {
sequence, err := r.View.GetLatestRefreshTokenSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for RefreshTokenByID")
tokenView = new(model.RefreshTokenView) tokenView = new(model.RefreshTokenView)
tokenView.ID = tokenID tokenView.ID = tokenID
tokenView.UserID = userID tokenView.UserID = userID
tokenView.InstanceID = authz.GetInstance(ctx).InstanceID() tokenView.InstanceID = instanceID
if sequence != nil {
tokenView.Sequence = sequence.CurrentSequence
}
} }
events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence) events, esErr := r.getUserEvents(ctx, userID, tokenView.InstanceID, tokenView.Sequence)
@ -80,7 +89,7 @@ func (r *RefreshTokenRepo) SearchMyRefreshTokens(ctx context.Context, userID str
if err != nil { if err != nil {
return nil, err return nil, err
} }
sequence, err := r.View.GetLatestRefreshTokenSequence(authz.GetInstance(ctx).InstanceID()) sequence, err := r.View.GetLatestRefreshTokenSequence(ctx, authz.GetInstance(ctx).InstanceID())
logging.Log("EVENT-GBdn4").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest refresh token sequence") logging.Log("EVENT-GBdn4").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest refresh token sequence")
request.Queries = append(request.Queries, &usr_model.RefreshTokenSearchQuery{Key: usr_model.RefreshTokenSearchKeyUserID, Method: domain.SearchMethodEquals, Value: userID}) request.Queries = append(request.Queries, &usr_model.RefreshTokenSearchQuery{Key: usr_model.RefreshTokenSearchKeyUserID, Method: domain.SearchMethodEquals, Value: userID})
tokens, count, err := r.View.SearchRefreshTokens(request) tokens, count, err := r.View.SearchRefreshTokens(request)

View File

@ -34,15 +34,25 @@ func (repo *TokenRepo) IsTokenValid(ctx context.Context, userID, tokenID string)
} }
func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (*usr_model.TokenView, error) { func (repo *TokenRepo) TokenByIDs(ctx context.Context, userID, tokenID string) (*usr_model.TokenView, error) {
token, viewErr := repo.View.TokenByIDs(tokenID, userID, authz.GetInstance(ctx).InstanceID()) instanceID := authz.GetInstance(ctx).InstanceID()
token, viewErr := repo.View.TokenByIDs(tokenID, userID, instanceID)
if viewErr != nil && !errors.IsNotFound(viewErr) { if viewErr != nil && !errors.IsNotFound(viewErr) {
return nil, viewErr return nil, viewErr
} }
if errors.IsNotFound(viewErr) { if errors.IsNotFound(viewErr) {
sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for TokenByIDs")
token = new(model.TokenView) token = new(model.TokenView)
token.ID = tokenID token.ID = tokenID
token.UserID = userID token.UserID = userID
token.InstanceID = authz.GetInstance(ctx).InstanceID() token.InstanceID = instanceID
if sequence != nil {
token.Sequence = sequence.CurrentSequence
}
} }
events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence) events, esErr := repo.getUserEvents(ctx, userID, token.InstanceID, token.Sequence)

View File

@ -62,16 +62,16 @@ func (t *RefreshToken) AggregateTypes() []es_models.AggregateType {
return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType} return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType}
} }
func (t *RefreshToken) CurrentSequence(instanceID string) (uint64, error) { func (t *RefreshToken) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
sequence, err := t.view.GetLatestRefreshTokenSequence(instanceID) sequence, err := t.view.GetLatestRefreshTokenSequence(ctx, instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (t *RefreshToken) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { func (t *RefreshToken) EventQuery(ctx context.Context, instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs) sequences, err := t.view.GetLatestRefreshTokenSequences(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -67,16 +67,16 @@ func (_ *Token) AggregateTypes() []es_models.AggregateType {
return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType} return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType}
} }
func (t *Token) CurrentSequence(instanceID string) (uint64, error) { func (t *Token) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
sequence, err := t.view.GetLatestTokenSequence(instanceID) sequence, err := t.view.GetLatestTokenSequence(ctx, instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (t *Token) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { func (t *Token) EventQuery(ctx context.Context, instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestTokenSequences(instanceIDs) sequences, err := t.view.GetLatestTokenSequences(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -145,11 +145,13 @@ func (t *Token) Reduce(event *es_models.Event) (err error) {
if err != nil { if err != nil {
return err return err
} }
applicationsIDs := make([]string, 0, len(project.Applications)) clientIDs := make([]string, 0, len(project.Applications))
for _, app := range project.Applications { for _, app := range project.Applications {
applicationsIDs = append(applicationsIDs, app.AppID) if app.OIDCConfig != nil {
clientIDs = append(clientIDs, app.OIDCConfig.ClientID)
}
} }
return t.view.DeleteApplicationTokens(event, applicationsIDs...) return t.view.DeleteApplicationTokens(event, clientIDs...)
case instance.InstanceRemovedEventType: case instance.InstanceRemovedEventType:
return t.view.DeleteInstanceTokens(event) return t.view.DeleteInstanceTokens(event)
case org.OrgRemovedEventType: case org.OrgRemovedEventType:
@ -208,7 +210,7 @@ func (t *Token) OnSuccess(instanceIDs []string) error {
} }
func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) { func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) {
query, err := proj_view.ProjectByIDQuery(projID, instanceID, 0) projectQuery, err := proj_view.ProjectByIDQuery(projID, instanceID, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -217,7 +219,7 @@ func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (
AggregateID: projID, AggregateID: projID,
}, },
} }
err = es_sdk.Filter(ctx, t.Eventstore().FilterEvents, esProject.AppendEvents, query) err = es_sdk.Filter(ctx, t.Eventstore().FilterEvents, esProject.AppendEvents, projectQuery)
if err != nil && !caos_errs.IsNotFound(err) { if err != nil && !caos_errs.IsNotFound(err) {
return nil, err return nil, err
} }

View File

@ -68,16 +68,16 @@ func (_ *User) AggregateTypes() []es_models.AggregateType {
return []es_models.AggregateType{user_repo.AggregateType, org.AggregateType, instance.AggregateType} return []es_models.AggregateType{user_repo.AggregateType, org.AggregateType, instance.AggregateType}
} }
func (u *User) CurrentSequence(instanceID string) (uint64, error) { func (u *User) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
sequence, err := u.view.GetLatestUserSequence(instanceID) sequence, err := u.view.GetLatestUserSequence(ctx, instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (u *User) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { func (u *User) EventQuery(ctx context.Context, instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSequences(instanceIDs) sequences, err := u.view.GetLatestUserSequences(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -158,6 +158,11 @@ func (u *User) ProcessUser(event *es_models.Event) (err error) {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
return err return err
} }
logging.WithFields(
"instance", event.InstanceID,
"userID", event.AggregateID,
"eventType", event.Type,
).Info("user not found in view")
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0) query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0)
if err != nil { if err != nil {
return err return err
@ -181,6 +186,11 @@ func (u *User) ProcessUser(event *es_models.Event) (err error) {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
return err return err
} }
logging.WithFields(
"instance", event.InstanceID,
"userID", event.AggregateID,
"eventType", event.Type,
).Info("user not found in view")
query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0) query, err := usr_view.UserByIDQuery(event.AggregateID, event.InstanceID, 0)
if err != nil { if err != nil {
return err return err
@ -291,7 +301,7 @@ func (u *User) OnSuccess(instanceIDs []string) error {
} }
func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) {
query, err := view.OrgByIDQuery(orgID, instanceID, 0) orgQuery, err := view.OrgByIDQuery(orgID, instanceID, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -301,7 +311,7 @@ func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_m
AggregateID: orgID, AggregateID: orgID,
}, },
} }
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query) err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, orgQuery)
if err != nil && !errors.IsNotFound(err) { if err != nil && !errors.IsNotFound(err) {
return nil, err return nil, err
} }

View File

@ -65,16 +65,16 @@ func (_ *UserSession) AggregateTypes() []models.AggregateType {
return []models.AggregateType{user.AggregateType, org.AggregateType, instance.AggregateType} return []models.AggregateType{user.AggregateType, org.AggregateType, instance.AggregateType}
} }
func (u *UserSession) CurrentSequence(instanceID string) (uint64, error) { func (u *UserSession) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
sequence, err := u.view.GetLatestUserSessionSequence(instanceID) sequence, err := u.view.GetLatestUserSessionSequence(ctx, instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (u *UserSession) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { func (u *UserSession) EventQuery(ctx context.Context, instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs) sequences, err := u.view.GetLatestUserSessionSequences(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -231,7 +231,7 @@ func (u *UserSession) loginNameInformation(ctx context.Context, orgID string, in
} }
func (u *UserSession) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { func (u *UserSession) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) {
query, err := view.OrgByIDQuery(orgID, instanceID, 0) orgQuery, err := view.OrgByIDQuery(orgID, instanceID, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -241,7 +241,7 @@ func (u *UserSession) getOrgByID(ctx context.Context, orgID, instanceID string)
AggregateID: orgID, AggregateID: orgID,
}, },
} }
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query) err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, orgQuery)
if err != nil && !errors.IsNotFound(err) { if err != nil && !errors.IsNotFound(err) {
return nil, err return nil, err
} }

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
user_model "github.com/zitadel/zitadel/internal/user/model" user_model "github.com/zitadel/zitadel/internal/user/model"
@ -81,12 +83,12 @@ func (v *View) DeleteOrgRefreshTokens(event *models.Event) error {
return v.ProcessedRefreshTokenSequence(event) return v.ProcessedRefreshTokenSequence(event)
} }
func (v *View) GetLatestRefreshTokenSequence(instanceID string) (*repository.CurrentSequence, error) { func (v *View) GetLatestRefreshTokenSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return v.latestSequence(refreshTokenTable, instanceID) return v.latestSequence(ctx, refreshTokenTable, instanceID)
} }
func (v *View) GetLatestRefreshTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestRefreshTokenSequences(ctx context.Context, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(refreshTokenTable, instanceIDs) return v.latestSequences(ctx, refreshTokenTable, instanceIDs)
} }
func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error { func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error {

View File

@ -1,6 +1,7 @@
package view package view
import ( import (
"context"
"time" "time"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
@ -15,12 +16,12 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate) return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate)
} }
func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { func (v *View) latestSequence(ctx context.Context, viewName, instanceID string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, v.TimeTravel(ctx, sequencesTable), viewName, instanceID)
} }
func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) latestSequences(ctx context.Context, viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) return repository.LatestSequences(v.Db, v.TimeTravel(ctx, sequencesTable), viewName, instanceIDs)
} }
func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error { func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error {

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
usr_view "github.com/zitadel/zitadel/internal/user/repository/view" usr_view "github.com/zitadel/zitadel/internal/user/repository/view"
@ -92,12 +94,12 @@ func (v *View) DeleteOrgTokens(event *models.Event) error {
return v.ProcessedTokenSequence(event) return v.ProcessedTokenSequence(event)
} }
func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSequence, error) { func (v *View) GetLatestTokenSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return v.latestSequence(tokenTable, instanceID) return v.latestSequence(ctx, tokenTable, instanceID)
} }
func (v *View) GetLatestTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestTokenSequences(ctx context.Context, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(tokenTable, instanceIDs) return v.latestSequences(ctx, tokenTable, instanceIDs)
} }
func (v *View) ProcessedTokenSequence(event *models.Event) error { func (v *View) ProcessedTokenSequence(event *models.Event) error {

View File

@ -3,7 +3,8 @@ package view
import ( import (
"context" "context"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/query" "github.com/zitadel/zitadel/internal/query"
@ -21,16 +22,16 @@ func (v *View) UserByID(userID, instanceID string) (*model.UserView, error) {
return view.UserByID(v.Db, userTable, userID, instanceID) return view.UserByID(v.Db, userTable, userID, instanceID)
} }
func (v *View) UserByLoginName(loginName, instanceID string) (*model.UserView, error) { func (v *View) UserByLoginName(ctx context.Context, loginName, instanceID string) (*model.UserView, error) {
loginNameQuery, err := query.NewUserLoginNamesSearchQuery(loginName) loginNameQuery, err := query.NewUserLoginNamesSearchQuery(loginName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return v.userByID(instanceID, loginNameQuery) return v.userByID(ctx, instanceID, loginNameQuery)
} }
func (v *View) UserByLoginNameAndResourceOwner(loginName, resourceOwner, instanceID string) (*model.UserView, error) { func (v *View) UserByLoginNameAndResourceOwner(ctx context.Context, loginName, resourceOwner, instanceID string) (*model.UserView, error) {
loginNameQuery, err := query.NewUserLoginNamesSearchQuery(loginName) loginNameQuery, err := query.NewUserLoginNamesSearchQuery(loginName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -40,18 +41,18 @@ func (v *View) UserByLoginNameAndResourceOwner(loginName, resourceOwner, instanc
return nil, err return nil, err
} }
return v.userByID(instanceID, loginNameQuery, resourceOwnerQuery) return v.userByID(ctx, instanceID, loginNameQuery, resourceOwnerQuery)
} }
func (v *View) UserByEmail(email, instanceID string) (*model.UserView, error) { func (v *View) UserByEmail(ctx context.Context, email, instanceID string) (*model.UserView, error) {
emailQuery, err := query.NewUserVerifiedEmailSearchQuery(email, query.TextEqualsIgnoreCase) emailQuery, err := query.NewUserVerifiedEmailSearchQuery(email, query.TextEqualsIgnoreCase)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return v.userByID(instanceID, emailQuery) return v.userByID(ctx, instanceID, emailQuery)
} }
func (v *View) UserByEmailAndResourceOwner(email, resourceOwner, instanceID string) (*model.UserView, error) { func (v *View) UserByEmailAndResourceOwner(ctx context.Context, email, resourceOwner, instanceID string) (*model.UserView, error) {
emailQuery, err := query.NewUserVerifiedEmailSearchQuery(email, query.TextEquals) emailQuery, err := query.NewUserVerifiedEmailSearchQuery(email, query.TextEquals)
if err != nil { if err != nil {
return nil, err return nil, err
@ -61,18 +62,18 @@ func (v *View) UserByEmailAndResourceOwner(email, resourceOwner, instanceID stri
return nil, err return nil, err
} }
return v.userByID(instanceID, emailQuery, resourceOwnerQuery) return v.userByID(ctx, instanceID, emailQuery, resourceOwnerQuery)
} }
func (v *View) UserByPhone(phone, instanceID string) (*model.UserView, error) { func (v *View) UserByPhone(ctx context.Context, phone, instanceID string) (*model.UserView, error) {
phoneQuery, err := query.NewUserVerifiedPhoneSearchQuery(phone, query.TextEquals) phoneQuery, err := query.NewUserVerifiedPhoneSearchQuery(phone, query.TextEquals)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return v.userByID(instanceID, phoneQuery) return v.userByID(ctx, instanceID, phoneQuery)
} }
func (v *View) UserByPhoneAndResourceOwner(phone, resourceOwner, instanceID string) (*model.UserView, error) { func (v *View) UserByPhoneAndResourceOwner(ctx context.Context, phone, resourceOwner, instanceID string) (*model.UserView, error) {
phoneQuery, err := query.NewUserVerifiedPhoneSearchQuery(phone, query.TextEquals) phoneQuery, err := query.NewUserVerifiedPhoneSearchQuery(phone, query.TextEquals)
if err != nil { if err != nil {
return nil, err return nil, err
@ -82,12 +83,10 @@ func (v *View) UserByPhoneAndResourceOwner(phone, resourceOwner, instanceID stri
return nil, err return nil, err
} }
return v.userByID(instanceID, phoneQuery, resourceOwnerQuery) return v.userByID(ctx, instanceID, phoneQuery, resourceOwnerQuery)
} }
func (v *View) userByID(instanceID string, queries ...query.SearchQuery) (*model.UserView, error) { func (v *View) userByID(ctx context.Context, instanceID string, queries ...query.SearchQuery) (*model.UserView, error) {
ctx := authz.WithInstanceID(context.Background(), instanceID)
queriedUser, err := v.query.GetNotifyUser(ctx, true, false, queries...) queriedUser, err := v.query.GetNotifyUser(ctx, true, false, queries...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -99,7 +98,14 @@ func (v *View) userByID(instanceID string, queries ...query.SearchQuery) (*model
} }
if err != nil { if err != nil {
sequence, err := v.GetLatestUserSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID).
OnError(err).
Errorf("could not get current sequence for userByID")
user = new(model.UserView) user = new(model.UserView)
if sequence != nil {
user.Sequence = sequence.CurrentSequence
}
} }
query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence) query, err := view.UserByIDQuery(queriedUser.ID, instanceID, user.Sequence)
@ -188,12 +194,12 @@ func (v *View) UpdateOrgOwnerRemovedUsers(event *models.Event) error {
return v.ProcessedUserSequence(event) return v.ProcessedUserSequence(event)
} }
func (v *View) GetLatestUserSequence(instanceID string) (*repository.CurrentSequence, error) { func (v *View) GetLatestUserSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return v.latestSequence(userTable, instanceID) return v.latestSequence(ctx, userTable, instanceID)
} }
func (v *View) GetLatestUserSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestUserSequences(ctx context.Context, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userTable, instanceIDs) return v.latestSequences(ctx, userTable, instanceIDs)
} }
func (v *View) ProcessedUserSequence(event *models.Event) error { func (v *View) ProcessedUserSequence(event *models.Event) error {

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/user/repository/view" "github.com/zitadel/zitadel/internal/user/repository/view"
@ -72,12 +74,12 @@ func (v *View) DeleteOrgUserSessions(event *models.Event) error {
return v.ProcessedUserSessionSequence(event) return v.ProcessedUserSessionSequence(event)
} }
func (v *View) GetLatestUserSessionSequence(instanceID string) (*repository.CurrentSequence, error) { func (v *View) GetLatestUserSessionSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return v.latestSequence(userSessionTable, instanceID) return v.latestSequence(ctx, userSessionTable, instanceID)
} }
func (v *View) GetLatestUserSessionSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestUserSessionSequences(ctx context.Context, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userSessionTable, instanceIDs) return v.latestSequences(ctx, userSessionTable, instanceIDs)
} }
func (v *View) ProcessedUserSessionSequence(event *models.Event) error { func (v *View) ProcessedUserSessionSequence(event *models.Event) error {

View File

@ -1,8 +1,11 @@
package view package view
import ( import (
"context"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
eventstore "github.com/zitadel/zitadel/internal/eventstore/v1" eventstore "github.com/zitadel/zitadel/internal/eventstore/v1"
@ -16,6 +19,7 @@ type View struct {
idGenerator id.Generator idGenerator id.Generator
query *query.Queries query *query.Queries
es eventstore.Eventstore es eventstore.Eventstore
client *database.DB
} }
func StartView(sqlClient *database.DB, keyAlgorithm crypto.EncryptionAlgorithm, queries *query.Queries, idGenerator id.Generator, es eventstore.Eventstore) (*View, error) { func StartView(sqlClient *database.DB, keyAlgorithm crypto.EncryptionAlgorithm, queries *query.Queries, idGenerator id.Generator, es eventstore.Eventstore) (*View, error) {
@ -29,9 +33,14 @@ func StartView(sqlClient *database.DB, keyAlgorithm crypto.EncryptionAlgorithm,
idGenerator: idGenerator, idGenerator: idGenerator,
query: queries, query: queries,
es: es, es: es,
client: sqlClient,
}, nil }, nil
} }
func (v *View) Health() (err error) { func (v *View) Health() (err error) {
return v.Db.DB().Ping() return v.Db.DB().Ping()
} }
func (v *View) TimeTravel(ctx context.Context, tableName string) string {
return tableName + v.client.Timetravel(call.Took(ctx))
}

View File

@ -44,16 +44,16 @@ func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID st
defer func() { span.EndWithError(err) }() defer func() { span.EndWithError(err) }()
instanceID := authz.GetInstance(ctx).InstanceID() instanceID := authz.GetInstance(ctx).InstanceID()
sequence, err := repo.View.GetLatestTokenSequence(instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID").
OnError(err).
Errorf("could not get current sequence for token check")
token, viewErr := repo.View.TokenByIDs(tokenID, userID, instanceID) token, viewErr := repo.View.TokenByIDs(tokenID, userID, instanceID)
if viewErr != nil && !caos_errs.IsNotFound(viewErr) { if viewErr != nil && !caos_errs.IsNotFound(viewErr) {
return nil, viewErr return nil, viewErr
} }
if caos_errs.IsNotFound(viewErr) { if caos_errs.IsNotFound(viewErr) {
sequence, err := repo.View.GetLatestTokenSequence(ctx, instanceID)
logging.WithFields("instanceID", instanceID, "userID", userID, "tokenID", tokenID).
OnError(err).
Errorf("could not get current sequence for token check")
token = new(model.TokenView) token = new(model.TokenView)
token.ID = tokenID token.ID = tokenID
token.UserID = userID token.UserID = userID

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/view/repository" "github.com/zitadel/zitadel/internal/view/repository"
) )
@ -13,6 +15,6 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate) return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, event.InstanceID, event.Sequence, event.CreationDate)
} }
func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { func (v *View) latestSequence(ctx context.Context, viewName, instanceID string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, v.TimeTravel(ctx, sequencesTable), viewName, instanceID)
} }

View File

@ -1,6 +1,8 @@
package view package view
import ( import (
"context"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
usr_view "github.com/zitadel/zitadel/internal/user/repository/view" usr_view "github.com/zitadel/zitadel/internal/user/repository/view"
@ -40,8 +42,8 @@ func (v *View) DeleteSessionTokens(agentID, userID, instanceID string, event *mo
return v.ProcessedTokenSequence(event) return v.ProcessedTokenSequence(event)
} }
func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSequence, error) { func (v *View) GetLatestTokenSequence(ctx context.Context, instanceID string) (*repository.CurrentSequence, error) {
return v.latestSequence(tokenTable, instanceID) return v.latestSequence(ctx, tokenTable, instanceID)
} }
func (v *View) ProcessedTokenSequence(event *models.Event) error { func (v *View) ProcessedTokenSequence(event *models.Event) error {

View File

@ -1,17 +1,21 @@
package view package view
import ( import (
"context"
"github.com/jinzhu/gorm"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/id"
"github.com/zitadel/zitadel/internal/query" "github.com/zitadel/zitadel/internal/query"
"github.com/jinzhu/gorm"
) )
type View struct { type View struct {
Db *gorm.DB Db *gorm.DB
Query *query.Queries Query *query.Queries
idGenerator id.Generator idGenerator id.Generator
client *database.DB
} }
func StartView(sqlClient *database.DB, idGenerator id.Generator, queries *query.Queries) (*View, error) { func StartView(sqlClient *database.DB, idGenerator id.Generator, queries *query.Queries) (*View, error) {
@ -23,9 +27,14 @@ func StartView(sqlClient *database.DB, idGenerator id.Generator, queries *query.
Db: gorm, Db: gorm,
idGenerator: idGenerator, idGenerator: idGenerator,
Query: queries, Query: queries,
client: sqlClient,
}, nil }, nil
} }
func (v *View) Health() (err error) { func (v *View) Health() (err error) {
return v.Db.DB().Ping() return v.Db.DB().Ping()
} }
func (v *View) TimeTravel(ctx context.Context, tableName string) string {
return tableName + v.client.Timetravel(call.Took(ctx))
}

View File

@ -17,7 +17,7 @@ const (
type Handler interface { type Handler interface {
ViewModel() string ViewModel() string
EventQuery(instanceIDs []string) (*models.SearchQuery, error) EventQuery(ctx context.Context, instanceIDs []string) (*models.SearchQuery, error)
Reduce(*models.Event) error Reduce(*models.Event) error
OnError(event *models.Event, err error) error OnError(event *models.Event, err error) error
OnSuccess(instanceIDs []string) error OnSuccess(instanceIDs []string) error
@ -26,7 +26,7 @@ type Handler interface {
QueryLimit() uint64 QueryLimit() uint64
AggregateTypes() []models.AggregateType AggregateTypes() []models.AggregateType
CurrentSequence(instanceID string) (uint64, error) CurrentSequence(ctx context.Context, instanceID string) (uint64, error)
Eventstore() v1.Eventstore Eventstore() v1.Eventstore
Subscription() *v1.Subscription Subscription() *v1.Subscription
@ -46,7 +46,7 @@ func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) {
).Error("reduce panicked") ).Error("reduce panicked")
} }
}() }()
currentSequence, err := handler.CurrentSequence(event.InstanceID) currentSequence, err := handler.CurrentSequence(ctx, event.InstanceID)
if err != nil { if err != nil {
logging.WithError(err).Warn("unable to get current sequence") logging.WithError(err).Warn("unable to get current sequence")
return return
@ -67,7 +67,7 @@ func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) {
} }
for _, unprocessedEvent := range unprocessedEvents { for _, unprocessedEvent := range unprocessedEvents {
currentSequence, err := handler.CurrentSequence(unprocessedEvent.InstanceID) currentSequence, err := handler.CurrentSequence(ctx, unprocessedEvent.InstanceID)
if err != nil { if err != nil {
logging.WithError(err).Warn("unable to get current sequence") logging.WithError(err).Warn("unable to get current sequence")
return return

View File

@ -222,7 +222,7 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo
} }
func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) { func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) {
query, err := s.EventQuery(instanceIDs) query, err := s.EventQuery(ctx, instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -35,7 +35,7 @@ func (h *testHandler) AggregateTypes() []models.AggregateType {
return nil return nil
} }
func (h *testHandler) CurrentSequence(instanceID string) (uint64, error) { func (h *testHandler) CurrentSequence(ctx context.Context, instanceID string) (uint64, error) {
return 0, nil return 0, nil
} }
@ -51,7 +51,7 @@ func (h *testHandler) Subscription() *v1.Subscription {
return nil return nil
} }
func (h *testHandler) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { func (h *testHandler) EventQuery(ctx context.Context, instanceIDs []string) (*models.SearchQuery, error) {
if h.queryError != nil { if h.queryError != nil {
return nil, h.queryError return nil, h.queryError
} }

View File

@ -16,5 +16,19 @@ func OrgByIDQuery(id, instanceID string, latestSequence uint64) (*es_models.Sear
LatestSequenceFilter(latestSequence). LatestSequenceFilter(latestSequence).
InstanceIDFilter(instanceID). InstanceIDFilter(instanceID).
AggregateIDFilter(id). AggregateIDFilter(id).
EventTypesFilter(
es_models.EventType(org.OrgAddedEventType),
es_models.EventType(org.OrgChangedEventType),
es_models.EventType(org.OrgDeactivatedEventType),
es_models.EventType(org.OrgReactivatedEventType),
es_models.EventType(org.OrgDomainAddedEventType),
es_models.EventType(org.OrgDomainVerificationAddedEventType),
es_models.EventType(org.OrgDomainVerifiedEventType),
es_models.EventType(org.OrgDomainPrimarySetEventType),
es_models.EventType(org.OrgDomainRemovedEventType),
es_models.EventType(org.DomainPolicyAddedEventType),
es_models.EventType(org.DomainPolicyChangedEventType),
es_models.EventType(org.DomainPolicyRemovedEventType),
).
SearchQuery(), nil SearchQuery(), nil
} }

View File

@ -18,15 +18,26 @@ type Project struct {
ProjectRoleCheck bool `json:"projectRoleCheck,omitempty"` ProjectRoleCheck bool `json:"projectRoleCheck,omitempty"`
HasProjectCheck bool `json:"hasProjectCheck,omitempty"` HasProjectCheck bool `json:"hasProjectCheck,omitempty"`
State int32 `json:"-"` State int32 `json:"-"`
OIDCApplications []*oidcApp
}
type oidcApp struct {
AppID string `json:"appId"`
ClientID string `json:"clientId,omitempty"`
} }
func ProjectToModel(project *Project) *model.Project { func ProjectToModel(project *Project) *model.Project {
apps := make([]*model.Application, len(project.OIDCApplications))
for i, application := range project.OIDCApplications {
apps[i] = &model.Application{OIDCConfig: &model.OIDCConfig{ClientID: application.ClientID}}
}
return &model.Project{ return &model.Project{
ObjectRoot: project.ObjectRoot, ObjectRoot: project.ObjectRoot,
Name: project.Name, Name: project.Name,
ProjectRoleAssertion: project.ProjectRoleAssertion, ProjectRoleAssertion: project.ProjectRoleAssertion,
ProjectRoleCheck: project.ProjectRoleCheck, ProjectRoleCheck: project.ProjectRoleCheck,
State: model.ProjectState(project.State), State: model.ProjectState(project.State),
Applications: apps,
} }
} }
@ -59,6 +70,10 @@ func (p *Project) AppendEvent(event *es_models.Event) error {
return p.appendReactivatedEvent() return p.appendReactivatedEvent()
case project.ProjectRemovedType: case project.ProjectRemovedType:
return p.appendRemovedEvent() return p.appendRemovedEvent()
case project.OIDCConfigAddedType:
return p.appendOIDCConfig(event)
case project.ApplicationRemovedType:
return p.appendApplicationRemoved(event)
} }
return nil return nil
} }
@ -84,6 +99,31 @@ func (p *Project) appendRemovedEvent() error {
return nil return nil
} }
func (p *Project) appendOIDCConfig(event *es_models.Event) error {
appEvent := new(oidcApp)
if err := json.Unmarshal(event.Data, appEvent); err != nil {
return err
}
p.OIDCApplications = append(p.OIDCApplications, appEvent)
return nil
}
func (p *Project) appendApplicationRemoved(event *es_models.Event) error {
appEvent := new(oidcApp)
if err := json.Unmarshal(event.Data, appEvent); err != nil {
return err
}
for i := len(p.OIDCApplications) - 1; i >= 0; i-- {
if p.OIDCApplications[i].AppID == appEvent.AppID {
p.OIDCApplications[i] = p.OIDCApplications[len(p.OIDCApplications)-1]
p.OIDCApplications[len(p.OIDCApplications)-1] = nil
p.OIDCApplications = p.OIDCApplications[:len(p.OIDCApplications)-1]
return nil
}
}
return nil
}
func (p *Project) SetData(event *es_models.Event) error { func (p *Project) SetData(event *es_models.Event) error {
if err := json.Unmarshal(event.Data, p); err != nil { if err := json.Unmarshal(event.Data, p); err != nil {
logging.Log("EVEN-lo9sr").WithError(err).Error("could not unmarshal event data") logging.Log("EVEN-lo9sr").WithError(err).Error("could not unmarshal event data")

View File

@ -16,5 +16,14 @@ func ProjectByIDQuery(id, instanceID string, latestSequence uint64) (*es_models.
AggregateTypeFilter(project.AggregateType). AggregateTypeFilter(project.AggregateType).
LatestSequenceFilter(latestSequence). LatestSequenceFilter(latestSequence).
InstanceIDFilter(instanceID). InstanceIDFilter(instanceID).
EventTypesFilter(
es_models.EventType(project.ProjectAddedType),
es_models.EventType(project.ProjectChangedType),
es_models.EventType(project.ProjectDeactivatedType),
es_models.EventType(project.ProjectReactivatedType),
es_models.EventType(project.ProjectRemovedType),
es_models.EventType(project.OIDCConfigAddedType),
es_models.EventType(project.ApplicationRemovedType),
).
SearchQuery(), nil SearchQuery(), nil
} }