diff --git a/cmd/zitadel/startup.yaml b/cmd/zitadel/startup.yaml index 9f0f011d60..8e92800397 100644 --- a/cmd/zitadel/startup.yaml +++ b/cmd/zitadel/startup.yaml @@ -49,7 +49,7 @@ AuthZ: Key: $CR_AUTHZ_KEY Spooler: ConcurrentWorkers: 1 - BulkLimit: 100 + BulkLimit: 10000 FailureCountUntilSkip: 5 Auth: @@ -98,7 +98,7 @@ Auth: Key: $CR_AUTH_KEY Spooler: ConcurrentWorkers: 1 - BulkLimit: 100 + BulkLimit: 10000 FailureCountUntilSkip: 5 KeyConfig: Size: 2048 @@ -142,7 +142,7 @@ Admin: Key: $CR_ADMINAPI_KEY Spooler: ConcurrentWorkers: 1 - BulkLimit: 100 + BulkLimit: 10000 FailureCountUntilSkip: 5 Mgmt: @@ -179,7 +179,7 @@ Mgmt: Key: $CR_MANAGEMENT_KEY Spooler: ConcurrentWorkers: 1 - BulkLimit: 100 + BulkLimit: 10000 FailureCountUntilSkip: 5 API: @@ -292,6 +292,6 @@ Notification: Key: $CR_NOTIFICATION_KEY Spooler: ConcurrentWorkers: 1 - BulkLimit: 100 + BulkLimit: 10000 FailureCountUntilSkip: 5 Handlers: \ No newline at end of file diff --git a/internal/admin/repository/eventsourcing/handler/iam_member.go b/internal/admin/repository/eventsourcing/handler/iam_member.go index 19db852e42..79aafcf055 100644 --- a/internal/admin/repository/eventsourcing/handler/iam_member.go +++ b/internal/admin/repository/eventsourcing/handler/iam_member.go @@ -74,14 +74,14 @@ func (m *IAMMember) EventQuery() (*es_models.SearchQuery, error) { func (m *IAMMember) Reduce(event *es_models.Event) (err error) { switch event.AggregateType { case model.IAMAggregate: - err = m.processIamMember(event) + err = m.processIAMMember(event) case usr_es_model.UserAggregate: err = m.processUser(event) } return err } -func (m *IAMMember) processIamMember(event *es_models.Event) (err error) { +func (m *IAMMember) processIAMMember(event *es_models.Event) (err error) { member := new(iam_model.IAMMemberView) switch event.Type { case model.IAMMemberAdded: diff --git a/internal/admin/repository/eventsourcing/handler/user_external_idps.go b/internal/admin/repository/eventsourcing/handler/user_external_idps.go index cf4e4d05ce..f7719e601b 100644 --- a/internal/admin/repository/eventsourcing/handler/user_external_idps.go +++ b/internal/admin/repository/eventsourcing/handler/user_external_idps.go @@ -152,7 +152,6 @@ func (i *ExternalIDP) processIdpConfig(event *models.Event) (err error) { default: return i.view.ProcessedExternalIDPSequence(event) } - return nil } func (i *ExternalIDP) fillData(externalIDP *usr_view_model.ExternalIDPView) error { diff --git a/internal/admin/repository/eventsourcing/view/user.go b/internal/admin/repository/eventsourcing/view/user.go index 6e34155c96..9694b41f78 100644 --- a/internal/admin/repository/eventsourcing/view/user.go +++ b/internal/admin/repository/eventsourcing/view/user.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -53,16 +54,13 @@ func (v *View) PutUser(user *model.UserView, event *models.Event) error { if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedUserSequence(event) - } - return nil + return v.ProcessedUserSequence(event) } func (v *View) DeleteUser(userID string, event *models.Event) error { err := view.DeleteUser(v.Db, userTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserSequence(event) } diff --git a/internal/auth/repository/eventsourcing/handler/application.go b/internal/auth/repository/eventsourcing/handler/application.go index 2ce66a2c22..5a8d5d41ff 100644 --- a/internal/auth/repository/eventsourcing/handler/application.go +++ b/internal/auth/repository/eventsourcing/handler/application.go @@ -116,7 +116,10 @@ func (a *Application) Reduce(event *models.Event) (err error) { } return a.view.PutApplications(apps, event) case es_model.ProjectRemoved: - return a.view.DeleteApplicationsByProjectID(event.AggregateID) + err = a.view.DeleteApplicationsByProjectID(event.AggregateID) + if err == nil { + return a.view.ProcessedApplicationSequence(event) + } default: return a.view.ProcessedApplicationSequence(event) } diff --git a/internal/auth/repository/eventsourcing/handler/project_role.go b/internal/auth/repository/eventsourcing/handler/project_role.go index 9d7e684c7c..5ee8722bdc 100644 --- a/internal/auth/repository/eventsourcing/handler/project_role.go +++ b/internal/auth/repository/eventsourcing/handler/project_role.go @@ -2,7 +2,6 @@ package handler import ( "github.com/caos/logging" - "github.com/caos/zitadel/internal/eventstore" "github.com/caos/zitadel/internal/eventstore/models" es_models "github.com/caos/zitadel/internal/eventstore/models" @@ -93,7 +92,10 @@ func (p *ProjectRole) Reduce(event *es_models.Event) (err error) { } return p.view.DeleteProjectRole(event.AggregateID, event.ResourceOwner, role.Key, event) case model.ProjectRemoved: - return p.view.DeleteProjectRolesByProjectID(event.AggregateID) + err := p.view.DeleteProjectRolesByProjectID(event.AggregateID) + if err == nil { + return p.view.ProcessedProjectRoleSequence(event) + } default: return p.view.ProcessedProjectRoleSequence(event) } diff --git a/internal/auth/repository/eventsourcing/handler/user_membership.go b/internal/auth/repository/eventsourcing/handler/user_membership.go index ddc6fa6055..e1a41eccf5 100644 --- a/internal/auth/repository/eventsourcing/handler/user_membership.go +++ b/internal/auth/repository/eventsourcing/handler/user_membership.go @@ -3,23 +3,20 @@ package handler import ( "context" - "github.com/caos/zitadel/internal/eventstore" - "github.com/caos/zitadel/internal/user/repository/eventsourcing/model" - - iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model" - org_model "github.com/caos/zitadel/internal/org/model" - org_event "github.com/caos/zitadel/internal/org/repository/eventsourcing" - proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing" - proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model" - "github.com/caos/logging" - + "github.com/caos/zitadel/internal/eventstore" "github.com/caos/zitadel/internal/eventstore/models" es_models "github.com/caos/zitadel/internal/eventstore/models" "github.com/caos/zitadel/internal/eventstore/query" "github.com/caos/zitadel/internal/eventstore/spooler" + iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model" + org_model "github.com/caos/zitadel/internal/org/model" + org_event "github.com/caos/zitadel/internal/org/repository/eventsourcing" org_es_model "github.com/caos/zitadel/internal/org/repository/eventsourcing/model" + proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing" + proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model" usr_model "github.com/caos/zitadel/internal/user/model" + "github.com/caos/zitadel/internal/user/repository/eventsourcing/model" usr_es_model "github.com/caos/zitadel/internal/user/repository/view/model" ) @@ -88,7 +85,7 @@ func (m *UserMembership) EventQuery() (*models.SearchQuery, error) { func (m *UserMembership) Reduce(event *models.Event) (err error) { switch event.AggregateType { case iam_es_model.IAMAggregate: - err = m.processIam(event) + err = m.processIAM(event) case org_es_model.OrgAggregate: err = m.processOrg(event) case proj_es_model.ProjectAggregate: @@ -99,7 +96,7 @@ func (m *UserMembership) Reduce(event *models.Event) (err error) { return err } -func (m *UserMembership) processIam(event *models.Event) (err error) { +func (m *UserMembership) processIAM(event *models.Event) (err error) { member := new(usr_es_model.UserMembershipView) err = member.AppendEvent(event) if err != nil { diff --git a/internal/auth/repository/eventsourcing/view/application.go b/internal/auth/repository/eventsourcing/view/application.go index dabe7727cb..5a7f2c6f1b 100644 --- a/internal/auth/repository/eventsourcing/view/application.go +++ b/internal/auth/repository/eventsourcing/view/application.go @@ -46,8 +46,8 @@ func (v *View) PutApplications(apps []*model.ApplicationView, event *models.Even func (v *View) DeleteApplication(appID string, event *models.Event) error { err := view.DeleteApplication(v.Db, applicationTable, appID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedApplicationSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/key.go b/internal/auth/repository/eventsourcing/view/key.go index 49b65cf38d..f8fca89445 100644 --- a/internal/auth/repository/eventsourcing/view/key.go +++ b/internal/auth/repository/eventsourcing/view/key.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" key_model "github.com/caos/zitadel/internal/key/model" "github.com/caos/zitadel/internal/key/repository/view" @@ -42,16 +43,16 @@ func (v *View) PutKeys(privateKey, publicKey *model.KeyView, event *models.Event func (v *View) DeleteKey(keyID string, private bool, event *models.Event) error { err := view.DeleteKey(v.Db, keyTable, keyID, private) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedKeySequence(event) } func (v *View) DeleteKeyPair(keyID string, event *models.Event) error { err := view.DeleteKeyPair(v.Db, keyTable, keyID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedKeySequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/machine_keys.go b/internal/auth/repository/eventsourcing/view/machine_keys.go index 4aab211292..b5412d7240 100644 --- a/internal/auth/repository/eventsourcing/view/machine_keys.go +++ b/internal/auth/repository/eventsourcing/view/machine_keys.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -33,24 +34,21 @@ func (v *View) PutMachineKey(key *model.MachineKeyView, event *models.Event) err if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedMachineKeySequence(event) - } - return nil + return v.ProcessedMachineKeySequence(event) } func (v *View) DeleteMachineKey(keyID string, event *models.Event) error { err := view.DeleteMachineKey(v.Db, machineKeyTable, keyID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedMachineKeySequence(event) } func (v *View) DeleteMachineKeysByUserID(userID string, event *models.Event) error { err := view.DeleteMachineKey(v.Db, machineKeyTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedMachineKeySequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/project_role.go b/internal/auth/repository/eventsourcing/view/project_role.go index 5c2751e154..c5a99bfe4a 100644 --- a/internal/auth/repository/eventsourcing/view/project_role.go +++ b/internal/auth/repository/eventsourcing/view/project_role.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -42,8 +43,8 @@ func (v *View) PutProjectRole(role *model.ProjectRoleView, event *models.Event) func (v *View) DeleteProjectRole(projectID, orgID, key string, event *models.Event) error { err := view.DeleteProjectRole(v.Db, projectRoleTable, projectID, orgID, key) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedProjectRoleSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/token.go b/internal/auth/repository/eventsourcing/view/token.go index 67644c0865..105034a364 100644 --- a/internal/auth/repository/eventsourcing/view/token.go +++ b/internal/auth/repository/eventsourcing/view/token.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_view "github.com/caos/zitadel/internal/user/repository/view" "github.com/caos/zitadel/internal/user/repository/view/model" @@ -37,32 +38,32 @@ func (v *View) PutTokens(token []*model.TokenView, event *models.Event) error { func (v *View) DeleteToken(tokenID string, event *models.Event) error { err := usr_view.DeleteToken(v.Db, tokenTable, tokenID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } func (v *View) DeleteSessionTokens(agentID, userID string, event *models.Event) error { err := usr_view.DeleteSessionTokens(v.Db, tokenTable, agentID, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } func (v *View) DeleteUserTokens(userID string, event *models.Event) error { err := usr_view.DeleteUserTokens(v.Db, tokenTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } func (v *View) DeleteApplicationTokens(event *models.Event, ids ...string) error { err := usr_view.DeleteApplicationTokens(v.Db, tokenTable, ids) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/user.go b/internal/auth/repository/eventsourcing/view/user.go index f7eaea8505..21de8cef8d 100644 --- a/internal/auth/repository/eventsourcing/view/user.go +++ b/internal/auth/repository/eventsourcing/view/user.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -70,8 +71,8 @@ func (v *View) PutUsers(users []*model.UserView, event *models.Event) error { func (v *View) DeleteUser(userID string, event *models.Event) error { err := view.DeleteUser(v.Db, userTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/user_grant.go b/internal/auth/repository/eventsourcing/view/user_grant.go index 4121a210ff..d9a52a586e 100644 --- a/internal/auth/repository/eventsourcing/view/user_grant.go +++ b/internal/auth/repository/eventsourcing/view/user_grant.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" grant_model "github.com/caos/zitadel/internal/usergrant/model" "github.com/caos/zitadel/internal/usergrant/repository/view" @@ -54,8 +55,8 @@ func (v *View) PutUserGrants(grants []*model.UserGrantView, event *models.Event) func (v *View) DeleteUserGrant(grantID string, event *models.Event) error { err := view.DeleteUserGrant(v.Db, userGrantTable, grantID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserGrantSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/user_membership.go b/internal/auth/repository/eventsourcing/view/user_membership.go index d13ec0ec9e..f3d9fae70e 100644 --- a/internal/auth/repository/eventsourcing/view/user_membership.go +++ b/internal/auth/repository/eventsourcing/view/user_membership.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -46,32 +47,32 @@ func (v *View) BulkPutUserMemberships(memberships []*model.UserMembershipView, e func (v *View) DeleteUserMembership(userID, aggregateID, objectID string, memberType usr_model.MemberType, event *models.Event) error { err := view.DeleteUserMembership(v.Db, userMembershipTable, userID, aggregateID, objectID, memberType) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByUserID(userID string, event *models.Event) error { err := view.DeleteUserMembershipsByUserID(v.Db, userMembershipTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByAggregateID(aggregateID string, event *models.Event) error { err := view.DeleteUserMembershipsByAggregateID(v.Db, userMembershipTable, aggregateID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByAggregateIDAndObjectID(aggregateID, objectID string, event *models.Event) error { err := view.DeleteUserMembershipsByAggregateIDAndObjectID(v.Db, userMembershipTable, aggregateID, objectID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } diff --git a/internal/auth/repository/eventsourcing/view/user_session.go b/internal/auth/repository/eventsourcing/view/user_session.go index f0ac1ae3cd..16ce4c7986 100644 --- a/internal/auth/repository/eventsourcing/view/user_session.go +++ b/internal/auth/repository/eventsourcing/view/user_session.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" "github.com/caos/zitadel/internal/user/repository/view" "github.com/caos/zitadel/internal/user/repository/view/model" @@ -45,8 +46,8 @@ func (v *View) PutUserSessions(userSession []*model.UserSessionView, event *mode func (v *View) DeleteUserSessions(userID string, event *models.Event) error { err := view.DeleteUserSessions(v.Db, userSessionTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserSessionSequence(event) } diff --git a/internal/authz/repository/eventsourcing/view/application.go b/internal/authz/repository/eventsourcing/view/application.go index 07448bf64f..84ae564981 100644 --- a/internal/authz/repository/eventsourcing/view/application.go +++ b/internal/authz/repository/eventsourcing/view/application.go @@ -3,6 +3,7 @@ package view import ( "context" + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -44,8 +45,8 @@ func (v *View) PutApplication(project *model.ApplicationView, event *models.Even func (v *View) DeleteApplication(appID string, event *models.Event) error { err := view.DeleteApplication(v.Db, applicationTable, appID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedApplicationSequence(event) } diff --git a/internal/authz/repository/eventsourcing/view/token.go b/internal/authz/repository/eventsourcing/view/token.go index da411da937..d2987a8194 100644 --- a/internal/authz/repository/eventsourcing/view/token.go +++ b/internal/authz/repository/eventsourcing/view/token.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_view "github.com/caos/zitadel/internal/user/repository/view" usr_view_model "github.com/caos/zitadel/internal/user/repository/view/model" @@ -25,16 +26,16 @@ func (v *View) PutToken(token *usr_view_model.TokenView, event *models.Event) er func (v *View) DeleteToken(tokenID string, event *models.Event) error { err := usr_view.DeleteToken(v.Db, tokenTable, tokenID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } func (v *View) DeleteSessionTokens(agentID, userID string, event *models.Event) error { err := usr_view.DeleteSessionTokens(v.Db, tokenTable, agentID, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedTokenSequence(event) } diff --git a/internal/authz/repository/eventsourcing/view/user_grant.go b/internal/authz/repository/eventsourcing/view/user_grant.go index 119ab9ae23..84de9319ea 100644 --- a/internal/authz/repository/eventsourcing/view/user_grant.go +++ b/internal/authz/repository/eventsourcing/view/user_grant.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" grant_model "github.com/caos/zitadel/internal/usergrant/model" "github.com/caos/zitadel/internal/usergrant/repository/view" @@ -42,8 +43,8 @@ func (v *View) PutUserGrant(grant *model.UserGrantView, event *models.Event) err func (v *View) DeleteUserGrant(grantID string, event *models.Event) error { err := view.DeleteUserGrant(v.Db, userGrantTable, grantID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserGrantSequence(event) } diff --git a/internal/eventstore/models/object.go b/internal/eventstore/models/object.go index 62a6df14de..4444fc02d6 100644 --- a/internal/eventstore/models/object.go +++ b/internal/eventstore/models/object.go @@ -15,6 +15,8 @@ type ObjectRoot struct { func (o *ObjectRoot) AppendEvent(event *Event) { if o.AggregateID == "" { o.AggregateID = event.AggregateID + } else if o.AggregateID != event.AggregateID { + return } if o.ResourceOwner == "" { o.ResourceOwner = event.ResourceOwner diff --git a/internal/eventstore/models/search_query_old.go b/internal/eventstore/models/search_query_old.go index 0d6e1ebc6c..1748b9d4d7 100644 --- a/internal/eventstore/models/search_query_old.go +++ b/internal/eventstore/models/search_query_old.go @@ -70,7 +70,7 @@ func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery { func (q *SearchQuery) setFilter(filter *Filter) *SearchQuery { for i, f := range q.Filters { - if f.field == filter.field { + if f.field == filter.field && f.field != Field_LatestSequence { q.Filters[i] = filter return q } diff --git a/internal/eventstore/query/handler.go b/internal/eventstore/query/handler.go index 07f4f6ce3d..84a79d3079 100755 --- a/internal/eventstore/query/handler.go +++ b/internal/eventstore/query/handler.go @@ -9,6 +9,10 @@ import ( "github.com/caos/zitadel/internal/eventstore/models" ) +const ( + eventLimit = 10000 +) + type Handler interface { ViewModel() string EventQuery() (*models.SearchQuery, error) @@ -29,28 +33,47 @@ func ReduceEvent(handler Handler, event *models.Event) { logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence") return } - if event.PreviousSequence > currentSequence { - searchQuery := models.NewSearchQuery(). - AggregateTypeFilter(handler.AggregateTypes()...). - SequenceBetween(currentSequence, event.PreviousSequence) - events, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery) + searchQuery := models.NewSearchQuery(). + AggregateTypeFilter(handler.AggregateTypes()...). + SequenceBetween(currentSequence, event.Sequence). + SetLimit(eventLimit) + + unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery) + if err != nil { + logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed") + return + } + + processedSequences := map[models.AggregateType]uint64{} + + for _, unprocessedEvent := range unprocessedEvents { + currentSequence, err := handler.CurrentSequence(unprocessedEvent) if err != nil { - logging.LogWithFields("HANDL-L6YH1", "seq", event.Sequence).Warn("filter failed") + logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence") return } - for _, previousEvent := range events { - //if other process already updated view - //TODO: correct? - if event.PreviousSequence > previousEvent.Sequence { - continue + _, ok := processedSequences[unprocessedEvent.AggregateType] + if !ok { + processedSequences[unprocessedEvent.AggregateType] = currentSequence + } + if processedSequences[unprocessedEvent.AggregateType] != currentSequence { + if currentSequence < processedSequences[unprocessedEvent.AggregateType] { + logging.LogWithFields("QUERY-DOYVN", + "processed", processedSequences[unprocessedEvent.AggregateType], + "current", currentSequence, + "view", handler.ViewModel()). + Warn("sequence not matching") } - err = handler.Reduce(previousEvent) - logging.LogWithFields("HANDL-V42TI", "seq", previousEvent.Sequence).OnError(err).Warn("reduce failed") return } - } else if event.PreviousSequence > 0 && event.PreviousSequence < currentSequence { - logging.LogWithFields("HANDL-w9Bdy", "previousSeq", event.PreviousSequence, "currentSeq", currentSequence).Debug("already processed") + + err = handler.Reduce(unprocessedEvent) + logging.LogWithFields("HANDL-V42TI", "seq", unprocessedEvent.Sequence).OnError(err).Warn("reduce failed") + processedSequences[unprocessedEvent.AggregateType] = unprocessedEvent.Sequence + } + if len(unprocessedEvents) == eventLimit { + logging.LogWithFields("QUERY-BSqe9", "seq", event.Sequence).Warn("didnt process event") return } err = handler.Reduce(event) diff --git a/internal/eventstore/spooler/spooler.go b/internal/eventstore/spooler/spooler.go index 723fbe9229..b5c844e482 100644 --- a/internal/eventstore/spooler/spooler.go +++ b/internal/eventstore/spooler/spooler.go @@ -4,6 +4,7 @@ import ( "context" "strconv" "sync" + "time" "github.com/caos/logging" "github.com/caos/zitadel/internal/eventstore" @@ -11,8 +12,6 @@ import ( "github.com/caos/zitadel/internal/eventstore/query" "github.com/caos/zitadel/internal/telemetry/tracing" "github.com/caos/zitadel/internal/view/repository" - - "time" ) type Spooler struct { @@ -71,14 +70,26 @@ func (s *spooledHandler) load(workerID string) { hasLocked := s.lock(ctx, errs, workerID) if <-hasLocked { - events, err := s.query(ctx) - if err != nil { - errs <- err - } else { - errs <- s.process(ctx, events, workerID) - logging.Log("SPOOL-0pV8o").WithField("view", s.ViewModel()).WithField("worker", workerID).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("process done") + for { + events, err := s.query(ctx) + if err != nil { + errs <- err + break + } + err = s.process(ctx, events, workerID) + if err != nil { + errs <- err + break + } + if uint64(len(events)) < s.QueryLimit() { + // no more events to process + // stop chan + if ctx.Err() == nil { + errs <- nil + } + break + } } - } <-ctx.Done() } @@ -92,14 +103,19 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str } func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error { - for _, event := range events { + for i, event := range events { select { case <-ctx.Done(): logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled") return nil default: if err := s.Reduce(event); err != nil { - return s.OnError(event, err) + err = s.OnError(event, err) + if err == nil { + continue + } + time.Sleep(100 * time.Millisecond) + return s.process(ctx, events[i:], workerID) } } } @@ -167,7 +183,8 @@ func (s *spooledHandler) lock(ctx context.Context, errs chan<- error, workerID s func HandleError(event *models.Event, failedErr error, latestFailedEvent func(sequence uint64) (*repository.FailedEvent, error), processFailedEvent func(*repository.FailedEvent) error, - processSequence func(*models.Event) error, errorCountUntilSkip uint64) error { + processSequence func(*models.Event) error, + errorCountUntilSkip uint64) error { failedEvent, err := latestFailedEvent(event.Sequence) if err != nil { return err @@ -181,7 +198,7 @@ func HandleError(event *models.Event, failedErr error, if errorCountUntilSkip <= failedEvent.FailureCount { return processSequence(event) } - return nil + return failedErr } func HandleSuccess(updateSpoolerRunTimestamp func() error) error { diff --git a/internal/eventstore/spooler/spooler_test.go b/internal/eventstore/spooler/spooler_test.go index 13c6737aa0..8192e4b908 100644 --- a/internal/eventstore/spooler/spooler_test.go +++ b/internal/eventstore/spooler/spooler_test.go @@ -22,6 +22,7 @@ type testHandler struct { queryError error viewModel string bulkLimit uint64 + maxErrCount int } func (h *testHandler) AggregateTypes() []models.AggregateType { @@ -50,6 +51,10 @@ func (h *testHandler) Reduce(*models.Event) error { return h.processError } func (h *testHandler) OnError(event *models.Event, err error) error { + if h.maxErrCount == 2 { + return nil + } + h.maxErrCount++ return err } func (h *testHandler) OnSuccess() error { @@ -93,17 +98,18 @@ func (es *eventstoreStub) LatestSequence(ctx context.Context, in *models.SearchQ func TestSpooler_process(t *testing.T) { type fields struct { - currentHandler query.Handler + currentHandler *testHandler } type args struct { timeout time.Duration events []*models.Event } tests := []struct { - name string - fields fields - args args - wantErr bool + name string + fields fields + args args + wantErr bool + wantRetries int }{ { name: "process all events", @@ -135,7 +141,8 @@ func TestSpooler_process(t *testing.T) { args: args{ events: []*models.Event{{}, {}}, }, - wantErr: true, + wantErr: false, + wantRetries: 2, }, } for _, tt := range tests { @@ -154,6 +161,9 @@ func TestSpooler_process(t *testing.T) { if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr { t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr) } + if tt.fields.currentHandler.maxErrCount != tt.wantRetries { + t.Errorf("Spooler.process() wrong retry count got: %d want %d", tt.fields.currentHandler.maxErrCount, tt.wantRetries) + } elapsed := time.Since(start).Round(1 * time.Second) if tt.args.timeout != 0 && elapsed != tt.args.timeout { @@ -222,14 +232,14 @@ func TestSpooler_load(t *testing.T) { { "lock exists", fields{ - currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second}, + currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10}, locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond), }, }, { "lock fails", fields{ - currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second}, + currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second, bulkLimit: 10}, locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond), eventstore: &eventstoreStub{events: []*models.Event{{}}}, }, @@ -237,7 +247,7 @@ func TestSpooler_load(t *testing.T) { { "query fails", fields{ - currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second}, + currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second, bulkLimit: 10}, locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond), eventstore: &eventstoreStub{err: fmt.Errorf("fail")}, }, @@ -245,8 +255,8 @@ func TestSpooler_load(t *testing.T) { { "process event fails", fields{ - currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond}, - locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond), + currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond, bulkLimit: 10}, + locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond).expectRenew(t, nil, 1000*time.Millisecond), eventstore: &eventstoreStub{events: []*models.Event{{}}}, }, }, @@ -433,6 +443,7 @@ func TestHandleError(t *testing.T) { }, res: res{ shouldProcessSequence: false, + wantErr: true, }, }, } diff --git a/internal/management/repository/eventsourcing/eventstore/user.go b/internal/management/repository/eventsourcing/eventstore/user.go index f1b6e62ef7..1c16407b4f 100644 --- a/internal/management/repository/eventsourcing/eventstore/user.go +++ b/internal/management/repository/eventsourcing/eventstore/user.go @@ -3,24 +3,22 @@ package eventstore import ( "context" - es_int "github.com/caos/zitadel/internal/eventstore" - es_models "github.com/caos/zitadel/internal/eventstore/models" - es_sdk "github.com/caos/zitadel/internal/eventstore/sdk" - iam_es_model "github.com/caos/zitadel/internal/iam/repository/view/model" - usr_grant_event "github.com/caos/zitadel/internal/usergrant/repository/eventsourcing" - "github.com/caos/logging" - "github.com/caos/zitadel/internal/api/authz" "github.com/caos/zitadel/internal/config/systemdefaults" "github.com/caos/zitadel/internal/errors" caos_errs "github.com/caos/zitadel/internal/errors" + es_int "github.com/caos/zitadel/internal/eventstore" + es_models "github.com/caos/zitadel/internal/eventstore/models" + es_sdk "github.com/caos/zitadel/internal/eventstore/sdk" + iam_es_model "github.com/caos/zitadel/internal/iam/repository/view/model" "github.com/caos/zitadel/internal/management/repository/eventsourcing/view" global_model "github.com/caos/zitadel/internal/model" org_event "github.com/caos/zitadel/internal/org/repository/eventsourcing" usr_model "github.com/caos/zitadel/internal/user/model" usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing" "github.com/caos/zitadel/internal/user/repository/view/model" + usr_grant_event "github.com/caos/zitadel/internal/usergrant/repository/eventsourcing" "github.com/caos/zitadel/internal/view/repository" ) diff --git a/internal/management/repository/eventsourcing/handler/project_grant.go b/internal/management/repository/eventsourcing/handler/project_grant.go index b01898bb29..918eb54f56 100644 --- a/internal/management/repository/eventsourcing/handler/project_grant.go +++ b/internal/management/repository/eventsourcing/handler/project_grant.go @@ -125,7 +125,11 @@ func (p *ProjectGrant) Reduce(event *models.Event) (err error) { } return p.view.DeleteProjectGrant(grant.GrantID, event) case es_model.ProjectRemoved: - return p.view.DeleteProjectGrantsByProjectID(event.AggregateID) + err = p.view.DeleteProjectGrantsByProjectID(event.AggregateID) + if err != nil { + return err + } + return p.view.ProcessedProjectGrantSequence(event) default: return p.view.ProcessedProjectGrantSequence(event) } diff --git a/internal/management/repository/eventsourcing/handler/project_grant_member.go b/internal/management/repository/eventsourcing/handler/project_grant_member.go index c00eb64763..159be6d870 100644 --- a/internal/management/repository/eventsourcing/handler/project_grant_member.go +++ b/internal/management/repository/eventsourcing/handler/project_grant_member.go @@ -112,7 +112,11 @@ func (p *ProjectGrantMember) processProjectGrantMember(event *models.Event) (err } return p.view.DeleteProjectGrantMember(member.GrantID, member.UserID, event) case proj_es_model.ProjectRemoved: - return p.view.DeleteProjectGrantMembersByProjectID(event.AggregateID) + err = p.view.DeleteProjectGrantMembersByProjectID(event.AggregateID) + if err != nil { + return err + } + return p.view.ProcessedProjectGrantMemberSequence(event) default: return p.view.ProcessedProjectGrantMemberSequence(event) } diff --git a/internal/management/repository/eventsourcing/handler/user.go b/internal/management/repository/eventsourcing/handler/user.go index 05497a0260..bf415d1602 100644 --- a/internal/management/repository/eventsourcing/handler/user.go +++ b/internal/management/repository/eventsourcing/handler/user.go @@ -214,7 +214,7 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *models.Event) error { } } if !policy.UserLoginMustBeDomain { - return nil + return u.view.ProcessedUserSequence(event) } users, err := u.view.UsersByOrgID(event.AggregateID) if err != nil { diff --git a/internal/management/repository/eventsourcing/handler/user_grant.go b/internal/management/repository/eventsourcing/handler/user_grant.go index 3c9f0a0f8f..a4f0869783 100644 --- a/internal/management/repository/eventsourcing/handler/user_grant.go +++ b/internal/management/repository/eventsourcing/handler/user_grant.go @@ -152,7 +152,6 @@ func (u *UserGrant) processUser(event *es_models.Event) (err error) { default: return u.view.ProcessedUserGrantSequence(event) } - return nil } func (u *UserGrant) processProject(event *es_models.Event) (err error) { @@ -176,7 +175,6 @@ func (u *UserGrant) processProject(event *es_models.Event) (err error) { default: return u.view.ProcessedUserGrantSequence(event) } - return nil } func (u *UserGrant) fillData(grant *view_model.UserGrantView, resourceOwner string) (err error) { diff --git a/internal/management/repository/eventsourcing/handler/user_membership.go b/internal/management/repository/eventsourcing/handler/user_membership.go index e6a56fab90..9358acc4e2 100644 --- a/internal/management/repository/eventsourcing/handler/user_membership.go +++ b/internal/management/repository/eventsourcing/handler/user_membership.go @@ -85,7 +85,7 @@ func (m *UserMembership) EventQuery() (*es_models.SearchQuery, error) { func (m *UserMembership) Reduce(event *es_models.Event) (err error) { switch event.AggregateType { case iam_es_model.IAMAggregate: - err = m.processIam(event) + err = m.processIAM(event) case org_es_model.OrgAggregate: err = m.processOrg(event) case proj_es_model.ProjectAggregate: @@ -96,7 +96,7 @@ func (m *UserMembership) Reduce(event *es_models.Event) (err error) { return err } -func (m *UserMembership) processIam(event *es_models.Event) (err error) { +func (m *UserMembership) processIAM(event *es_models.Event) (err error) { member := new(usr_es_model.UserMembershipView) err = member.AppendEvent(event) if err != nil { diff --git a/internal/management/repository/eventsourcing/view/application.go b/internal/management/repository/eventsourcing/view/application.go index eeb5c344e6..c9b4a6fbd0 100644 --- a/internal/management/repository/eventsourcing/view/application.go +++ b/internal/management/repository/eventsourcing/view/application.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -26,7 +27,7 @@ func (v *View) SearchApplications(request *proj_model.ApplicationSearchRequest) func (v *View) PutApplication(app *model.ApplicationView, event *models.Event) error { err := view.PutApplication(v.Db, applicationTable, app) - if err != nil { + if err != nil && !errors.IsNotFound(err) { return err } return v.ProcessedApplicationSequence(event) @@ -43,7 +44,7 @@ func (v *View) PutApplications(apps []*model.ApplicationView, event *models.Even func (v *View) DeleteApplication(appID string, event *models.Event) error { err := view.DeleteApplication(v.Db, applicationTable, appID) if err != nil { - return nil + return err } return v.ProcessedApplicationSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/machine_keys.go b/internal/management/repository/eventsourcing/view/machine_keys.go index da50ed6760..c861cfd58c 100644 --- a/internal/management/repository/eventsourcing/view/machine_keys.go +++ b/internal/management/repository/eventsourcing/view/machine_keys.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -29,24 +30,21 @@ func (v *View) PutMachineKey(org *model.MachineKeyView, event *models.Event) err if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedMachineKeySequence(event) - } - return nil + return v.ProcessedMachineKeySequence(event) } func (v *View) DeleteMachineKey(keyID string, event *models.Event) error { err := view.DeleteMachineKey(v.Db, machineKeyTable, keyID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedMachineKeySequence(event) } func (v *View) DeleteMachineKeysByUserID(userID string, event *models.Event) error { err := view.DeleteMachineKey(v.Db, machineKeyTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedMachineKeySequence(event) } diff --git a/internal/management/repository/eventsourcing/view/org_domain.go b/internal/management/repository/eventsourcing/view/org_domain.go index f8103674aa..45ee4ad912 100644 --- a/internal/management/repository/eventsourcing/view/org_domain.go +++ b/internal/management/repository/eventsourcing/view/org_domain.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" org_model "github.com/caos/zitadel/internal/org/model" "github.com/caos/zitadel/internal/org/repository/view" @@ -33,10 +34,7 @@ func (v *View) PutOrgDomain(org *model.OrgDomainView, event *models.Event) error if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedOrgDomainSequence(event) - } - return nil + return v.ProcessedOrgDomainSequence(event) } func (v *View) PutOrgDomains(domains []*model.OrgDomainView, event *models.Event) error { @@ -49,8 +47,8 @@ func (v *View) PutOrgDomains(domains []*model.OrgDomainView, event *models.Event func (v *View) DeleteOrgDomain(orgID, domain string, event *models.Event) error { err := view.DeleteOrgDomain(v.Db, orgDomainTable, orgID, domain) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedOrgDomainSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/org_member.go b/internal/management/repository/eventsourcing/view/org_member.go index 2ea8f79c90..ee6deac739 100644 --- a/internal/management/repository/eventsourcing/view/org_member.go +++ b/internal/management/repository/eventsourcing/view/org_member.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" org_model "github.com/caos/zitadel/internal/org/model" "github.com/caos/zitadel/internal/org/repository/view" @@ -42,16 +43,16 @@ func (v *View) PutOrgMembers(members []*model.OrgMemberView, event *models.Event func (v *View) DeleteOrgMember(orgID, userID string, event *models.Event) error { err := view.DeleteOrgMember(v.Db, orgMemberTable, orgID, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedOrgMemberSequence(event) } func (v *View) DeleteOrgMembersByUserID(userID string, event *models.Event) error { err := view.DeleteOrgMembersByUserID(v.Db, orgMemberTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedOrgMemberSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/project.go b/internal/management/repository/eventsourcing/view/project.go index 8676b0ed2c..f5ac609ddc 100644 --- a/internal/management/repository/eventsourcing/view/project.go +++ b/internal/management/repository/eventsourcing/view/project.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -30,8 +31,8 @@ func (v *View) PutProject(project *model.ProjectView, event *models.Event) error func (v *View) DeleteProject(projectID string, event *models.Event) error { err := view.DeleteProject(v.Db, projectTable, projectID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedProjectSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/project_grant_member.go b/internal/management/repository/eventsourcing/view/project_grant_member.go index 83ae463190..7634cefbf5 100644 --- a/internal/management/repository/eventsourcing/view/project_grant_member.go +++ b/internal/management/repository/eventsourcing/view/project_grant_member.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -46,8 +47,8 @@ func (v *View) PutProjectGrantMembers(members []*model.ProjectGrantMemberView, e func (v *View) DeleteProjectGrantMember(grantID, userID string, event *models.Event) error { err := view.DeleteProjectGrantMember(v.Db, projectGrantMemberTable, grantID, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedProjectGrantMemberSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/project_member.go b/internal/management/repository/eventsourcing/view/project_member.go index a867916939..09f3b31518 100644 --- a/internal/management/repository/eventsourcing/view/project_member.go +++ b/internal/management/repository/eventsourcing/view/project_member.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -46,8 +47,8 @@ func (v *View) PutProjectMembers(project []*model.ProjectMemberView, event *mode func (v *View) DeleteProjectMember(projectID, userID string, event *models.Event) error { err := view.DeleteProjectMember(v.Db, projectMemberTable, projectID, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedProjectMemberSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/project_role.go b/internal/management/repository/eventsourcing/view/project_role.go index df56939309..5b251fa24b 100644 --- a/internal/management/repository/eventsourcing/view/project_role.go +++ b/internal/management/repository/eventsourcing/view/project_role.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" proj_model "github.com/caos/zitadel/internal/project/model" "github.com/caos/zitadel/internal/project/repository/view" @@ -42,8 +43,8 @@ func (v *View) PutProjectRole(project *model.ProjectRoleView, event *models.Even func (v *View) DeleteProjectRole(projectID, orgID, key string, event *models.Event) error { err := view.DeleteProjectRole(v.Db, projectRoleTable, projectID, orgID, key) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedProjectRoleSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/user.go b/internal/management/repository/eventsourcing/view/user.go index cd7b2a4744..d589b546f7 100644 --- a/internal/management/repository/eventsourcing/view/user.go +++ b/internal/management/repository/eventsourcing/view/user.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -53,16 +54,13 @@ func (v *View) PutUser(user *model.UserView, event *models.Event) error { if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedUserSequence(event) - } - return nil + return v.ProcessedUserSequence(event) } func (v *View) DeleteUser(userID string, event *models.Event) error { err := view.DeleteUser(v.Db, userTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/user_grant.go b/internal/management/repository/eventsourcing/view/user_grant.go index 5f20eaa0f2..15dd3220d2 100644 --- a/internal/management/repository/eventsourcing/view/user_grant.go +++ b/internal/management/repository/eventsourcing/view/user_grant.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" grant_model "github.com/caos/zitadel/internal/usergrant/model" "github.com/caos/zitadel/internal/usergrant/repository/view" @@ -58,8 +59,8 @@ func (v *View) PutUserGrants(grants []*model.UserGrantView, event *models.Event) func (v *View) DeleteUserGrant(grantID string, event *models.Event) error { err := view.DeleteUserGrant(v.Db, userGrantTable, grantID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserGrantSequence(event) } diff --git a/internal/management/repository/eventsourcing/view/user_membership.go b/internal/management/repository/eventsourcing/view/user_membership.go index fe80081194..90f64906a7 100644 --- a/internal/management/repository/eventsourcing/view/user_membership.go +++ b/internal/management/repository/eventsourcing/view/user_membership.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" usr_model "github.com/caos/zitadel/internal/user/model" "github.com/caos/zitadel/internal/user/repository/view" @@ -42,32 +43,32 @@ func (v *View) BulkPutUserMemberships(memberships []*model.UserMembershipView, e func (v *View) DeleteUserMembership(userID, aggregateID, objectID string, memberType usr_model.MemberType, event *models.Event) error { err := view.DeleteUserMembership(v.Db, userMembershipTable, userID, aggregateID, objectID, memberType) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByUserID(userID string, event *models.Event) error { err := view.DeleteUserMembershipsByUserID(v.Db, userMembershipTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByAggregateID(aggregateID string, event *models.Event) error { err := view.DeleteUserMembershipsByAggregateID(v.Db, userMembershipTable, aggregateID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } func (v *View) DeleteUserMembershipsByAggregateIDAndObjectID(aggregateID, objectID string, event *models.Event) error { err := view.DeleteUserMembershipsByAggregateIDAndObjectID(v.Db, userMembershipTable, aggregateID, objectID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedUserMembershipSequence(event) } diff --git a/internal/notification/repository/eventsourcing/handler/notification.go b/internal/notification/repository/eventsourcing/handler/notification.go index cebde591bf..0d37914e3a 100644 --- a/internal/notification/repository/eventsourcing/handler/notification.go +++ b/internal/notification/repository/eventsourcing/handler/notification.go @@ -111,8 +111,6 @@ func (n *Notification) Reduce(event *models.Event) (err error) { err = n.handlePasswordCode(event) case es_model.DomainClaimed: err = n.handleDomainClaimed(event) - default: - return n.view.ProcessedNotificationSequence(event) } if err != nil { return err diff --git a/internal/notification/repository/eventsourcing/view/notify_user.go b/internal/notification/repository/eventsourcing/view/notify_user.go index b7a24899f7..f81c986d0a 100644 --- a/internal/notification/repository/eventsourcing/view/notify_user.go +++ b/internal/notification/repository/eventsourcing/view/notify_user.go @@ -1,6 +1,7 @@ package view import ( + "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/models" "github.com/caos/zitadel/internal/user/repository/view" "github.com/caos/zitadel/internal/user/repository/view/model" @@ -20,10 +21,7 @@ func (v *View) PutNotifyUser(user *model.NotifyUser, event *models.Event) error if err != nil { return err } - if event.Sequence != 0 { - return v.ProcessedNotifyUserSequence(event) - } - return nil + return v.ProcessedNotifyUserSequence(event) } func (v *View) NotifyUsersByOrgID(orgID string) ([]*model.NotifyUser, error) { @@ -32,8 +30,8 @@ func (v *View) NotifyUsersByOrgID(orgID string) ([]*model.NotifyUser, error) { func (v *View) DeleteNotifyUser(userID string, event *models.Event) error { err := view.DeleteNotifyUser(v.Db, notifyUserTable, userID) - if err != nil { - return nil + if err != nil && !errors.IsNotFound(err) { + return err } return v.ProcessedNotifyUserSequence(event) } diff --git a/internal/notification/templates/templateData.go b/internal/notification/templates/templateData.go index ee7b910709..676af69d4b 100644 --- a/internal/notification/templates/templateData.go +++ b/internal/notification/templates/templateData.go @@ -24,6 +24,8 @@ func (data *TemplateData) Translate(i18n *i18n.Translator, args map[string]inter data.Subject = i18n.Localize(data.Subject, nil, langs...) data.Greeting = i18n.Localize(data.Greeting, args, langs...) data.Text = html.UnescapeString(i18n.Localize(data.Text, args, langs...)) - data.Href = i18n.Localize(data.Href, nil, langs...) + if data.Href != "" { + data.Href = i18n.Localize(data.Href, nil, langs...) + } data.ButtonText = i18n.Localize(data.ButtonText, nil, langs...) } diff --git a/internal/org/repository/eventsourcing/eventstore_mock_test.go b/internal/org/repository/eventsourcing/eventstore_mock_test.go index 6d7d175317..f36f48d7f9 100644 --- a/internal/org/repository/eventsourcing/eventstore_mock_test.go +++ b/internal/org/repository/eventsourcing/eventstore_mock_test.go @@ -42,7 +42,7 @@ func GetMockChangesOrgOK(ctrl *gomock.Controller) *OrgEventstore { } events := []*es_models.Event{ - {AggregateID: "AggregateIDApp", Sequence: 1, AggregateType: repo_model.OrgAggregate, Data: data}, + {AggregateID: "AggregateID", Sequence: 1, AggregateType: repo_model.OrgAggregate, Data: data}, } mockEs := mock.NewMockEventstore(ctrl) mockEs.EXPECT().FilterEvents(gomock.Any(), gomock.Any()).Return(events, nil) diff --git a/internal/org/repository/eventsourcing/eventstore_test.go b/internal/org/repository/eventsourcing/eventstore_test.go index 237a55f63e..e4b5f76111 100644 --- a/internal/org/repository/eventsourcing/eventstore_test.go +++ b/internal/org/repository/eventsourcing/eventstore_test.go @@ -179,7 +179,7 @@ func TestOrgEventstore_OrgByID(t *testing.T) { { name: "new events found and added success", fields: fields{Eventstore: newTestEventstore(t).expectFilterEvents([]*es_models.Event{ - {Sequence: 6}, + {Sequence: 6, AggregateID: "hodor-org"}, }, nil)}, args: args{ ctx: authz.NewMockContext("user", "org"), diff --git a/internal/project/repository/view/application_view.go b/internal/project/repository/view/application_view.go index 414d7dfcf7..2abfd9f47d 100644 --- a/internal/project/repository/view/application_view.go +++ b/internal/project/repository/view/application_view.go @@ -24,7 +24,7 @@ func ApplicationByID(db *gorm.DB, table, projectID, appID string) (*model.Applic func ApplicationsByProjectID(db *gorm.DB, table, projectID string) ([]*model.ApplicationView, error) { applications := make([]*model.ApplicationView, 0) queries := []*proj_model.ApplicationSearchQuery{ - &proj_model.ApplicationSearchQuery{Key: proj_model.AppSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, + {Key: proj_model.AppSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, } query := repository.PrepareSearchQuery(table, model.ApplicationSearchRequest{Queries: queries}) _, err := query(db, &applications) diff --git a/internal/project/repository/view/project_role_view.go b/internal/project/repository/view/project_role_view.go index 12248b6640..4065439d46 100644 --- a/internal/project/repository/view/project_role_view.go +++ b/internal/project/repository/view/project_role_view.go @@ -26,7 +26,7 @@ func ProjectRoleByIDs(db *gorm.DB, table, projectID, orgID, key string) (*model. func ProjectRolesByProjectID(db *gorm.DB, table, projectID string) ([]*model.ProjectRoleView, error) { roles := make([]*model.ProjectRoleView, 0) queries := []*proj_model.ProjectRoleSearchQuery{ - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, } query := repository.PrepareSearchQuery(table, model.ProjectRoleSearchRequest{Queries: queries}) _, err := query(db, &roles) @@ -39,9 +39,9 @@ func ProjectRolesByProjectID(db *gorm.DB, table, projectID string) ([]*model.Pro func ResourceOwnerProjectRolesByKey(db *gorm.DB, table, projectID, resourceOwner, key string) ([]*model.ProjectRoleView, error) { roles := make([]*model.ProjectRoleView, 0) queries := []*proj_model.ProjectRoleSearchQuery{ - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyResourceOwner, Value: resourceOwner, Method: global_model.SearchMethodEquals}, - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyKey, Value: key, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyResourceOwner, Value: resourceOwner, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyKey, Value: key, Method: global_model.SearchMethodEquals}, } query := repository.PrepareSearchQuery(table, model.ProjectRoleSearchRequest{Queries: queries}) _, err := query(db, &roles) @@ -54,8 +54,8 @@ func ResourceOwnerProjectRolesByKey(db *gorm.DB, table, projectID, resourceOwner func ResourceOwnerProjectRoles(db *gorm.DB, table, projectID, resourceOwner string) ([]*model.ProjectRoleView, error) { roles := make([]*model.ProjectRoleView, 0) queries := []*proj_model.ProjectRoleSearchQuery{ - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, - &proj_model.ProjectRoleSearchQuery{Key: proj_model.ProjectRoleSearchKeyResourceOwner, Value: resourceOwner, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyProjectID, Value: projectID, Method: global_model.SearchMethodEquals}, + {Key: proj_model.ProjectRoleSearchKeyResourceOwner, Value: resourceOwner, Method: global_model.SearchMethodEquals}, } query := repository.PrepareSearchQuery(table, model.ProjectRoleSearchRequest{Queries: queries}) _, err := query(db, &roles) diff --git a/internal/user/repository/eventsourcing/eventstore.go b/internal/user/repository/eventsourcing/eventstore.go index acbf78048b..12688f855d 100644 --- a/internal/user/repository/eventsourcing/eventstore.go +++ b/internal/user/repository/eventsourcing/eventstore.go @@ -1624,11 +1624,10 @@ func (es *UserEventstore) AddMachineKey(ctx context.Context, key *usr_model.Mach return nil, errors.ThrowPreconditionFailed(nil, "EVENT-5ROh4", "Errors.User.NotMachine") } - id, err := es.idGenerator.Next() + key.KeyID, err = es.idGenerator.Next() if err != nil { return nil, err } - key.KeyID = id if key.ExpirationDate.IsZero() { key.ExpirationDate, err = time.Parse(yearLayout, defaultExpirationDate)