diff --git a/cmd/admin/setup/01.go b/cmd/admin/setup/01.go index a34b478b35..372d0e263c 100644 --- a/cmd/admin/setup/01.go +++ b/cmd/admin/setup/01.go @@ -11,8 +11,6 @@ var ( createAdminViews string //go:embed 01_sql/auth.sql createAuthViews string - //go:embed 01_sql/authz.sql - createAuthzViews string //go:embed 01_sql/notification.sql createNotificationViews string //go:embed 01_sql/projections.sql @@ -24,7 +22,7 @@ type ProjectionTable struct { } func (mig *ProjectionTable) Execute(ctx context.Context) error { - stmt := createAdminViews + createAuthViews + createAuthzViews + createNotificationViews + createProjections + stmt := createAdminViews + createAuthViews + createNotificationViews + createProjections _, err := mig.dbClient.ExecContext(ctx, stmt) return err } diff --git a/cmd/admin/setup/01_sql/authz.sql b/cmd/admin/setup/01_sql/authz.sql deleted file mode 100644 index ae7007cb47..0000000000 --- a/cmd/admin/setup/01_sql/authz.sql +++ /dev/null @@ -1,47 +0,0 @@ -CREATE SCHEMA authz; - -CREATE TABLE authz.locks ( - locker_id TEXT, - locked_until TIMESTAMPTZ(3), - view_name TEXT, - instance_id TEXT NOT NULL, - - PRIMARY KEY (view_name, instance_id) -); - -CREATE TABLE authz.current_sequences ( - view_name TEXT, - current_sequence BIGINT, - event_timestamp TIMESTAMPTZ, - last_successful_spooler_run TIMESTAMPTZ, - instance_id TEXT NOT NULL, - - PRIMARY KEY (view_name, instance_id) -); - -CREATE TABLE authz.failed_events ( - view_name TEXT, - failed_sequence BIGINT, - failure_count SMALLINT, - err_msg TEXT, - instance_id TEXT NOT NULL, - - PRIMARY KEY (view_name, failed_sequence, instance_id) -); - -CREATE TABLE authz.user_memberships ( - user_id STRING NOT NULL, - member_type INT2 NOT NULL, - aggregate_id STRING NOT NULL, - object_id STRING NOT NULL, - roles STRING[] NULL, - display_name STRING NULL, - resource_owner STRING NULL, - resource_owner_name STRING NULL, - creation_date TIMESTAMPTZ NULL, - change_date TIMESTAMPTZ NULL, - sequence INT8 NULL, - instance_id STRING NULL, - - PRIMARY KEY (user_id, member_type, aggregate_id, object_id) -); diff --git a/cmd/admin/start/config.go b/cmd/admin/start/config.go index 2550ae8947..f3cd144374 100644 --- a/cmd/admin/start/config.go +++ b/cmd/admin/start/config.go @@ -14,7 +14,6 @@ import ( "github.com/zitadel/zitadel/internal/api/ui/console" "github.com/zitadel/zitadel/internal/api/ui/login" auth_es "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing" - "github.com/zitadel/zitadel/internal/authz" "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/config/hook" "github.com/zitadel/zitadel/internal/config/systemdefaults" @@ -38,7 +37,6 @@ type Config struct { Database database.Config Tracing tracing.Config Projections projection.Config - AuthZ authz.Config Auth auth_es.Config Admin admin_es.Config UserAgentCookie *middleware.UserAgentCookieConfig diff --git a/cmd/admin/start/start.go b/cmd/admin/start/start.go index 99d5cd7838..ee14552706 100644 --- a/cmd/admin/start/start.go +++ b/cmd/admin/start/start.go @@ -98,7 +98,7 @@ func startZitadel(config *Config, masterKey string) error { return fmt.Errorf("cannot start queries: %w", err) } - authZRepo, err := authz.Start(config.AuthZ, config.SystemDefaults, queries, dbClient, keys.OIDC) + authZRepo, err := authz.Start(queries, dbClient, keys.OIDC) if err != nil { return fmt.Errorf("error starting authz repo: %w", err) } diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 2e57c810cd..5a7722eca4 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -81,13 +81,6 @@ Projections: projects: BulkLimit: 2000 -AuthZ: - Repository: - Spooler: - ConcurrentWorkers: 1 - BulkLimit: 10000 - FailureCountUntilSkip: 5 - Auth: SearchLimit: 1000 Spooler: diff --git a/internal/authz/authz.go b/internal/authz/authz.go index 3d16f0dccb..d5c0b8300a 100644 --- a/internal/authz/authz.go +++ b/internal/authz/authz.go @@ -5,15 +5,10 @@ import ( "github.com/zitadel/zitadel/internal/authz/repository" "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing" - sd "github.com/zitadel/zitadel/internal/config/systemdefaults" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/query" ) -type Config struct { - Repository eventsourcing.Config -} - -func Start(config Config, systemDefaults sd.SystemDefaults, queries *query.Queries, dbClient *sql.DB, keyEncryptionAlgorithm crypto.EncryptionAlgorithm) (repository.Repository, error) { - return eventsourcing.Start(config.Repository, systemDefaults, queries, dbClient, keyEncryptionAlgorithm) +func Start(queries *query.Queries, dbClient *sql.DB, keyEncryptionAlgorithm crypto.EncryptionAlgorithm) (repository.Repository, error) { + return eventsourcing.Start(queries, dbClient, keyEncryptionAlgorithm) } diff --git a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go index 7260310fa9..650be34bae 100644 --- a/internal/authz/repository/eventsourcing/eventstore/token_verifier.go +++ b/internal/authz/repository/eventsourcing/eventstore/token_verifier.go @@ -29,6 +29,10 @@ type TokenVerifierRepo struct { Query *query.Queries } +func (repo *TokenVerifierRepo) Health() error { + return repo.View.Health() +} + func (repo *TokenVerifierRepo) tokenByID(ctx context.Context, tokenID, userID string) (_ *usr_model.TokenView, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() diff --git a/internal/authz/repository/eventsourcing/eventstore/user_membership.go b/internal/authz/repository/eventsourcing/eventstore/user_membership.go index e34e37018e..9dc5bb57c9 100644 --- a/internal/authz/repository/eventsourcing/eventstore/user_membership.go +++ b/internal/authz/repository/eventsourcing/eventstore/user_membership.go @@ -4,19 +4,12 @@ import ( "context" "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/view" - "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/query" "github.com/zitadel/zitadel/internal/telemetry/tracing" - user_model "github.com/zitadel/zitadel/internal/user/model" - user_view_model "github.com/zitadel/zitadel/internal/user/repository/view/model" ) type UserMembershipRepo struct { - View *view.View -} - -func (repo *UserMembershipRepo) Health() error { - return repo.View.Health() + Queries *query.Queries } func (repo *UserMembershipRepo) SearchMyMemberships(ctx context.Context) (_ []*authz.Membership, err error) { @@ -29,75 +22,61 @@ func (repo *UserMembershipRepo) SearchMyMemberships(ctx context.Context) (_ []*a return userMembershipsToMemberships(memberships), nil } -func (repo *UserMembershipRepo) searchUserMemberships(ctx context.Context) (_ []*user_view_model.UserMembershipView, err error) { +func (repo *UserMembershipRepo) searchUserMemberships(ctx context.Context) (_ []*query.Membership, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() ctxData := authz.GetCtxData(ctx) - instance := authz.GetInstance(ctx) - ctx, orgSpan := tracing.NewSpan(ctx) - orgMemberships, orgCount, err := repo.View.SearchUserMemberships(&user_model.UserMembershipSearchRequest{ - Queries: []*user_model.UserMembershipSearchQuery{ - { - Key: user_model.UserMembershipSearchKeyUserID, - Method: domain.SearchMethodEquals, - Value: ctxData.UserID, - }, - { - Key: user_model.UserMembershipSearchKeyResourceOwner, - Method: domain.SearchMethodEquals, - Value: ctxData.OrgID, - }, - { - Key: user_model.UserMembershipSearchKeyInstanceID, - Method: domain.SearchMethodEquals, - Value: instance.InstanceID(), - }, - }, - }) - orgSpan.EndWithError(err) + userIDQuery, err := query.NewMembershipUserIDQuery(ctxData.UserID) if err != nil { return nil, err } - ctx, iamSpan := tracing.NewSpan(ctx) - iamMemberships, iamCount, err := repo.View.SearchUserMemberships(&user_model.UserMembershipSearchRequest{ - Queries: []*user_model.UserMembershipSearchQuery{ - { - Key: user_model.UserMembershipSearchKeyUserID, - Method: domain.SearchMethodEquals, - Value: ctxData.UserID, - }, - { - Key: user_model.UserMembershipSearchKeyAggregateID, - Method: domain.SearchMethodEquals, - Value: instance.InstanceID(), - }, - { - Key: user_model.UserMembershipSearchKeyInstanceID, - Method: domain.SearchMethodEquals, - Value: instance.InstanceID(), - }, - }, - }) - iamSpan.EndWithError(err) + orgIDsQuery, err := query.NewMembershipResourceOwnersSearchQuery(ctxData.OrgID, authz.GetInstance(ctx).InstanceID()) if err != nil { return nil, err } - if orgCount == 0 && iamCount == 0 { - return []*user_view_model.UserMembershipView{}, nil + memberships, err := repo.Queries.Memberships(ctx, &query.MembershipSearchQuery{ + Queries: []query.SearchQuery{userIDQuery, orgIDsQuery}, + }) + if err != nil { + return nil, err } - return append(orgMemberships, iamMemberships...), nil + return memberships.Memberships, nil } -func userMembershipToMembership(membership *user_view_model.UserMembershipView) *authz.Membership { +func userMembershipToMembership(membership *query.Membership) *authz.Membership { + if membership.IAM != nil { + return &authz.Membership{ + MemberType: authz.MemberTypeIam, + AggregateID: membership.IAM.IAMID, + ObjectID: membership.IAM.IAMID, + Roles: membership.Roles, + } + } + if membership.Org != nil { + return &authz.Membership{ + MemberType: authz.MemberTypeOrganisation, + AggregateID: membership.Org.OrgID, + ObjectID: membership.Org.OrgID, + Roles: membership.Roles, + } + } + if membership.Project != nil { + return &authz.Membership{ + MemberType: authz.MemberTypeProject, + AggregateID: membership.Project.ProjectID, + ObjectID: membership.Project.ProjectID, + Roles: membership.Roles, + } + } return &authz.Membership{ - MemberType: authz.MemberType(membership.MemberType), - AggregateID: membership.AggregateID, - ObjectID: membership.ObjectID, + MemberType: authz.MemberTypeProjectGrant, + AggregateID: membership.ProjectGrant.ProjectID, + ObjectID: membership.ProjectGrant.GrantID, Roles: membership.Roles, } } -func userMembershipsToMemberships(memberships []*user_view_model.UserMembershipView) []*authz.Membership { +func userMembershipsToMemberships(memberships []*query.Membership) []*authz.Membership { result := make([]*authz.Membership, len(memberships)) for i, m := range memberships { result[i] = userMembershipToMembership(m) diff --git a/internal/authz/repository/eventsourcing/handler/handler.go b/internal/authz/repository/eventsourcing/handler/handler.go deleted file mode 100644 index a680c34f6d..0000000000 --- a/internal/authz/repository/eventsourcing/handler/handler.go +++ /dev/null @@ -1,56 +0,0 @@ -package handler - -import ( - "time" - - "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/view" - sd "github.com/zitadel/zitadel/internal/config/systemdefaults" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - "github.com/zitadel/zitadel/internal/eventstore/v1/query" -) - -type Configs map[string]*Config - -type Config struct { - MinimumCycleDuration time.Duration -} - -type handler struct { - view *view.View - bulkLimit uint64 - cycleDuration time.Duration - errorCountUntilSkip uint64 - - es v1.Eventstore -} - -func (h *handler) Eventstore() v1.Eventstore { - return h.es -} - -func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults) []query.Handler { - return []query.Handler{ - newUserMembership( - handler{view, bulkLimit, configs.cycleDuration("UserMemberships"), errorCount, es}), - } -} - -func (configs Configs) cycleDuration(viewModel string) time.Duration { - c, ok := configs[viewModel] - if !ok { - return 3 * time.Minute - } - return c.MinimumCycleDuration -} - -func (h *handler) MinimumCycleDuration() time.Duration { - return h.cycleDuration -} - -func (h *handler) LockDuration() time.Duration { - return h.cycleDuration / 3 -} - -func (h *handler) QueryLimit() uint64 { - return h.bulkLimit -} diff --git a/internal/authz/repository/eventsourcing/handler/user_membership.go b/internal/authz/repository/eventsourcing/handler/user_membership.go deleted file mode 100644 index 8a816e0578..0000000000 --- a/internal/authz/repository/eventsourcing/handler/user_membership.go +++ /dev/null @@ -1,344 +0,0 @@ -package handler - -import ( - "context" - - "github.com/zitadel/logging" - - "github.com/zitadel/zitadel/internal/errors" - "github.com/zitadel/zitadel/internal/eventstore" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - es_models "github.com/zitadel/zitadel/internal/eventstore/v1/models" - "github.com/zitadel/zitadel/internal/eventstore/v1/query" - es_sdk "github.com/zitadel/zitadel/internal/eventstore/v1/sdk" - "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" - org_model "github.com/zitadel/zitadel/internal/org/model" - org_es_model "github.com/zitadel/zitadel/internal/org/repository/eventsourcing/model" - org_view "github.com/zitadel/zitadel/internal/org/repository/view" - proj_model "github.com/zitadel/zitadel/internal/project/model" - proj_es_model "github.com/zitadel/zitadel/internal/project/repository/eventsourcing/model" - proj_view "github.com/zitadel/zitadel/internal/project/repository/view" - "github.com/zitadel/zitadel/internal/repository/instance" - "github.com/zitadel/zitadel/internal/repository/org" - "github.com/zitadel/zitadel/internal/repository/project" - "github.com/zitadel/zitadel/internal/repository/user" - usr_model "github.com/zitadel/zitadel/internal/user/model" - usr_es_model "github.com/zitadel/zitadel/internal/user/repository/view/model" -) - -const ( - userMembershipTable = "authz.user_memberships" -) - -type UserMembership struct { - handler - subscription *v1.Subscription -} - -func newUserMembership( - handler handler, -) *UserMembership { - h := &UserMembership{ - handler: handler, - } - - h.subscribe() - - return h -} - -func (m *UserMembership) subscribe() { - m.subscription = m.es.Subscribe(m.AggregateTypes()...) - go func() { - for event := range m.subscription.Events { - query.ReduceEvent(m, event) - } - }() -} - -func (m *UserMembership) ViewModel() string { - return userMembershipTable -} - -func (m *UserMembership) Subscription() *v1.Subscription { - return m.subscription -} - -func (_ *UserMembership) AggregateTypes() []es_models.AggregateType { - return []es_models.AggregateType{instance.AggregateType, org.AggregateType, project.AggregateType, user.AggregateType} -} - -func (m *UserMembership) CurrentSequence(instanceID string) (uint64, error) { - sequence, err := m.view.GetLatestUserMembershipSequence(instanceID) - if err != nil { - return 0, err - } - return sequence.CurrentSequence, nil -} - -func (m *UserMembership) EventQuery() (*es_models.SearchQuery, error) { - sequences, err := m.view.GetLatestUserMembershipSequences() - if err != nil { - return nil, err - } - query := es_models.NewSearchQuery() - instances := make([]string, 0) - for _, sequence := range sequences { - for _, instance := range instances { - if sequence.InstanceID == instance { - break - } - } - instances = append(instances, sequence.InstanceID) - query.AddQuery(). - AggregateTypeFilter(m.AggregateTypes()...). - LatestSequenceFilter(sequence.CurrentSequence). - InstanceIDFilter(sequence.InstanceID) - } - return query.AddQuery(). - AggregateTypeFilter(m.AggregateTypes()...). - LatestSequenceFilter(0). - ExcludedInstanceIDsFilter(instances...). - SearchQuery(), nil -} - -func (m *UserMembership) Reduce(event *es_models.Event) (err error) { - switch event.AggregateType { - case instance.AggregateType: - err = m.processIAM(event) - case org.AggregateType: - err = m.processOrg(event) - case project.AggregateType: - err = m.processProject(event) - case user.AggregateType: - err = m.processUser(event) - } - return err -} - -func (m *UserMembership) processIAM(event *es_models.Event) (err error) { - member := new(usr_es_model.UserMembershipView) - err = member.AppendEvent(event) - if err != nil { - return err - } - switch eventstore.EventType(event.Type) { - case instance.MemberAddedEventType: - m.fillIamDisplayName(member) - case instance.MemberChangedEventType: - member, err = m.view.UserMembershipByIDs(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeIam) - if err != nil { - return err - } - err = member.AppendEvent(event) - case instance.MemberRemovedEventType, - instance.MemberCascadeRemovedEventType: - return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeIam, event) - default: - return m.view.ProcessedUserMembershipSequence(event) - } - if err != nil { - return err - } - return m.view.PutUserMembership(member, event) -} - -func (m *UserMembership) fillIamDisplayName(member *usr_es_model.UserMembershipView) { - member.DisplayName = member.AggregateID - member.ResourceOwnerName = member.ResourceOwner -} - -func (m *UserMembership) processOrg(event *es_models.Event) (err error) { - member := new(usr_es_model.UserMembershipView) - err = member.AppendEvent(event) - if err != nil { - return err - } - switch eventstore.EventType(event.Type) { - case org.MemberAddedEventType: - err = m.fillOrgName(member) - case org.MemberChangedEventType: - member, err = m.view.UserMembershipByIDs(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeOrganisation) - if err != nil { - return err - } - err = member.AppendEvent(event) - case org.MemberRemovedEventType, - org.MemberCascadeRemovedEventType: - return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeOrganisation, event) - case org.OrgChangedEventType: - return m.updateOrgName(event) - default: - return m.view.ProcessedUserMembershipSequence(event) - } - if err != nil { - return err - } - return m.view.PutUserMembership(member, event) -} - -func (m *UserMembership) fillOrgName(member *usr_es_model.UserMembershipView) (err error) { - org, err := m.getOrgByID(context.Background(), member.ResourceOwner, member.InstanceID) - if err != nil { - return err - } - member.ResourceOwnerName = org.Name - if member.AggregateID == org.AggregateID { - member.DisplayName = org.Name - } - return nil -} - -func (m *UserMembership) updateOrgName(event *es_models.Event) error { - org, err := m.getOrgByID(context.Background(), event.AggregateID, event.InstanceID) - if err != nil { - return err - } - - memberships, err := m.view.UserMembershipsByResourceOwner(event.ResourceOwner, event.InstanceID) - if err != nil { - return err - } - for _, membership := range memberships { - membership.ResourceOwnerName = org.Name - if membership.AggregateID == event.AggregateID { - membership.DisplayName = org.Name - } - } - return m.view.BulkPutUserMemberships(memberships, event) -} - -func (m *UserMembership) processProject(event *es_models.Event) (err error) { - member := new(usr_es_model.UserMembershipView) - err = member.AppendEvent(event) - if err != nil { - return err - } - switch eventstore.EventType(event.Type) { - case project.MemberAddedType, project.GrantMemberAddedType: - err = m.fillProjectDisplayName(member) - if err != nil { - return err - } - err = m.fillOrgName(member) - case project.MemberChangedType: - member, err = m.view.UserMembershipByIDs(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeProject) - if err != nil { - return err - } - err = member.AppendEvent(event) - case project.MemberRemovedType, project.MemberCascadeRemovedType: - return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, event.InstanceID, usr_model.MemberTypeProject, event) - case project.GrantMemberChangedType: - member, err = m.view.UserMembershipByIDs(member.UserID, event.AggregateID, member.ObjectID, event.InstanceID, usr_model.MemberTypeProjectGrant) - if err != nil { - return err - } - err = member.AppendEvent(event) - case project.GrantMemberRemovedType, - project.GrantMemberCascadeRemovedType: - return m.view.DeleteUserMembership(member.UserID, event.AggregateID, member.ObjectID, member.InstanceID, usr_model.MemberTypeProjectGrant, event) - case project.ProjectChangedType: - return m.updateProjectDisplayName(event) - case project.ProjectRemovedType: - return m.view.DeleteUserMembershipsByAggregateID(event.AggregateID, event.InstanceID, event) - case project.GrantRemovedType: - return m.view.DeleteUserMembershipsByAggregateIDAndObjectID(event.AggregateID, member.ObjectID, member.InstanceID, event) - default: - return m.view.ProcessedUserMembershipSequence(event) - } - if err != nil { - return err - } - return m.view.PutUserMembership(member, event) -} - -func (m *UserMembership) fillProjectDisplayName(member *usr_es_model.UserMembershipView) (err error) { - project, err := m.getProjectByID(context.Background(), member.AggregateID, member.InstanceID) - if err != nil { - return err - } - member.DisplayName = project.Name - return nil -} - -func (m *UserMembership) updateProjectDisplayName(event *es_models.Event) error { - proj := new(proj_es_model.Project) - err := proj.SetData(event) - if err != nil { - return err - } - if proj.Name == "" { - return m.view.ProcessedUserMembershipSequence(event) - } - - memberships, err := m.view.UserMembershipsByAggregateID(event.AggregateID, event.InstanceID) - if err != nil { - return err - } - for _, membership := range memberships { - membership.DisplayName = proj.Name - } - return m.view.BulkPutUserMemberships(memberships, event) -} - -func (m *UserMembership) processUser(event *es_models.Event) (err error) { - switch eventstore.EventType(event.Type) { - case user.UserRemovedType: - return m.view.DeleteUserMembershipsByUserID(event.AggregateID, event.InstanceID, event) - default: - return m.view.ProcessedUserMembershipSequence(event) - } -} - -func (m *UserMembership) OnError(event *es_models.Event, err error) error { - logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user membership handler") - return spooler.HandleError(event, err, m.view.GetLatestUserMembershipFailedEvent, m.view.ProcessedUserMembershipFailedEvent, m.view.ProcessedUserMembershipSequence, m.errorCountUntilSkip) -} - -func (m *UserMembership) OnSuccess() error { - return spooler.HandleSuccess(m.view.UpdateUserMembershipSpoolerRunTimestamp) -} - -func (u *UserMembership) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { - query, err := org_view.OrgByIDQuery(orgID, instanceID, 0) - if err != nil { - return nil, err - } - - esOrg := &org_es_model.Org{ - ObjectRoot: es_models.ObjectRoot{ - AggregateID: orgID, - }, - } - err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query) - if err != nil && !errors.IsNotFound(err) { - return nil, err - } - if esOrg.Sequence == 0 { - return nil, errors.ThrowNotFound(nil, "EVENT-3m9vs", "Errors.Org.NotFound") - } - - return org_es_model.OrgToModel(esOrg), nil -} - -func (u *UserMembership) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) { - query, err := proj_view.ProjectByIDQuery(projID, instanceID, 0) - if err != nil { - return nil, err - } - esProject := &proj_es_model.Project{ - ObjectRoot: es_models.ObjectRoot{ - AggregateID: projID, - }, - } - err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esProject.AppendEvents, query) - if err != nil && !errors.IsNotFound(err) { - return nil, err - } - if esProject.Sequence == 0 { - return nil, errors.ThrowNotFound(nil, "EVENT-Dfrt2", "Errors.Project.NotFound") - } - - return proj_es_model.ProjectToModel(esProject), nil -} diff --git a/internal/authz/repository/eventsourcing/repository.go b/internal/authz/repository/eventsourcing/repository.go index 6488184a63..71af744bf9 100644 --- a/internal/authz/repository/eventsourcing/repository.go +++ b/internal/authz/repository/eventsourcing/repository.go @@ -6,27 +6,19 @@ import ( "github.com/zitadel/zitadel/internal/authz/repository" "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/eventstore" - "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/spooler" authz_view "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/view" - sd "github.com/zitadel/zitadel/internal/config/systemdefaults" "github.com/zitadel/zitadel/internal/crypto" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/query" ) -type Config struct { - Spooler spooler.SpoolerConfig -} - type EsRepository struct { - spooler *es_spol.Spooler eventstore.UserMembershipRepo eventstore.TokenVerifierRepo } -func Start(conf Config, systemDefaults sd.SystemDefaults, queries *query.Queries, dbClient *sql.DB, keyEncryptionAlgorithm crypto.EncryptionAlgorithm) (repository.Repository, error) { +func Start(queries *query.Queries, dbClient *sql.DB, keyEncryptionAlgorithm crypto.EncryptionAlgorithm) (repository.Repository, error) { es, err := v1.Start(dbClient) if err != nil { return nil, err @@ -38,12 +30,9 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, queries *query.Queries return nil, err } - spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, systemDefaults) - return &EsRepository{ - spool, eventstore.UserMembershipRepo{ - View: view, + Queries: queries, }, eventstore.TokenVerifierRepo{ TokenVerificationKey: keyEncryptionAlgorithm, @@ -55,7 +44,7 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, queries *query.Queries } func (repo *EsRepository) Health(ctx context.Context) error { - if err := repo.UserMembershipRepo.Health(); err != nil { + if err := repo.TokenVerifierRepo.Health(); err != nil { return err } return nil diff --git a/internal/authz/repository/eventsourcing/spooler/lock.go b/internal/authz/repository/eventsourcing/spooler/lock.go deleted file mode 100644 index e0f0f8e6de..0000000000 --- a/internal/authz/repository/eventsourcing/spooler/lock.go +++ /dev/null @@ -1,20 +0,0 @@ -package spooler - -import ( - "database/sql" - "time" - - es_locker "github.com/zitadel/zitadel/internal/eventstore/v1/locker" -) - -const ( - lockTable = "authz.locks" -) - -type locker struct { - dbClient *sql.DB -} - -func (l *locker) Renew(lockerID, viewModel, instanceID string, waitTime time.Duration) error { - return es_locker.Renew(l.dbClient, lockTable, lockerID, viewModel, instanceID, waitTime) -} diff --git a/internal/authz/repository/eventsourcing/spooler/spooler.go b/internal/authz/repository/eventsourcing/spooler/spooler.go deleted file mode 100644 index 592bdd1db0..0000000000 --- a/internal/authz/repository/eventsourcing/spooler/spooler.go +++ /dev/null @@ -1,33 +0,0 @@ -package spooler - -import ( - "database/sql" - - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - - sd "github.com/zitadel/zitadel/internal/config/systemdefaults" - - "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/handler" - "github.com/zitadel/zitadel/internal/authz/repository/eventsourcing/view" - - "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" -) - -type SpoolerConfig struct { - BulkLimit uint64 - FailureCountUntilSkip uint64 - ConcurrentWorkers int - Handlers handler.Configs -} - -func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, sql *sql.DB, systemDefaults sd.SystemDefaults) *spooler.Spooler { - spoolerConfig := spooler.Config{ - Eventstore: es, - Locker: &locker{dbClient: sql}, - ConcurrentWorkers: c.ConcurrentWorkers, - ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults), - } - spool := spoolerConfig.New() - spool.Start() - return spool -} diff --git a/internal/authz/repository/eventsourcing/view/user_membership.go b/internal/authz/repository/eventsourcing/view/user_membership.go deleted file mode 100644 index 645285623d..0000000000 --- a/internal/authz/repository/eventsourcing/view/user_membership.go +++ /dev/null @@ -1,102 +0,0 @@ -package view - -import ( - "github.com/zitadel/zitadel/internal/errors" - "github.com/zitadel/zitadel/internal/eventstore/v1/models" - usr_model "github.com/zitadel/zitadel/internal/user/model" - "github.com/zitadel/zitadel/internal/user/repository/view" - "github.com/zitadel/zitadel/internal/user/repository/view/model" - "github.com/zitadel/zitadel/internal/view/repository" -) - -const ( - userMembershipTable = "authz.user_memberships" -) - -func (v *View) UserMembershipByIDs(userID, aggregateID, objectID, instanceID string, memberType usr_model.MemberType) (*model.UserMembershipView, error) { - return view.UserMembershipByIDs(v.Db, userMembershipTable, userID, aggregateID, objectID, instanceID, memberType) -} - -func (v *View) UserMembershipsByAggregateID(aggregateID, instanceID string) ([]*model.UserMembershipView, error) { - return view.UserMembershipsByAggregateID(v.Db, userMembershipTable, aggregateID, instanceID) -} - -func (v *View) UserMembershipsByResourceOwner(resourceOwner, instanceID string) ([]*model.UserMembershipView, error) { - return view.UserMembershipsByResourceOwner(v.Db, userMembershipTable, resourceOwner, instanceID) -} - -func (v *View) SearchUserMemberships(request *usr_model.UserMembershipSearchRequest) ([]*model.UserMembershipView, uint64, error) { - return view.SearchUserMemberships(v.Db, userMembershipTable, request) -} - -func (v *View) PutUserMembership(membership *model.UserMembershipView, event *models.Event) error { - err := view.PutUserMembership(v.Db, userMembershipTable, membership) - if err != nil { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) BulkPutUserMemberships(memberships []*model.UserMembershipView, event *models.Event) error { - err := view.PutUserMemberships(v.Db, userMembershipTable, memberships...) - if err != nil { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) DeleteUserMembership(userID, aggregateID, objectID, instanceID string, memberType usr_model.MemberType, event *models.Event) error { - err := view.DeleteUserMembership(v.Db, userMembershipTable, userID, aggregateID, objectID, instanceID, memberType) - if err != nil && !errors.IsNotFound(err) { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) DeleteUserMembershipsByUserID(userID, instanceID string, event *models.Event) error { - err := view.DeleteUserMembershipsByUserID(v.Db, userMembershipTable, userID, instanceID) - if err != nil && !errors.IsNotFound(err) { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) DeleteUserMembershipsByAggregateID(aggregateID, instanceID string, event *models.Event) error { - err := view.DeleteUserMembershipsByAggregateID(v.Db, userMembershipTable, aggregateID, instanceID) - if err != nil && !errors.IsNotFound(err) { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) DeleteUserMembershipsByAggregateIDAndObjectID(aggregateID, objectID, instanceID string, event *models.Event) error { - err := view.DeleteUserMembershipsByAggregateIDAndObjectID(v.Db, userMembershipTable, aggregateID, objectID, instanceID) - if err != nil && !errors.IsNotFound(err) { - return err - } - return v.ProcessedUserMembershipSequence(event) -} - -func (v *View) GetLatestUserMembershipSequence(instanceID string) (*repository.CurrentSequence, error) { - return v.latestSequence(userMembershipTable, instanceID) -} - -func (v *View) GetLatestUserMembershipSequences() ([]*repository.CurrentSequence, error) { - return v.latestSequences(userMembershipTable) -} - -func (v *View) ProcessedUserMembershipSequence(event *models.Event) error { - return v.saveCurrentSequence(userMembershipTable, event) -} - -func (v *View) UpdateUserMembershipSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(userMembershipTable) -} - -func (v *View) GetLatestUserMembershipFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { - return v.latestFailedEvent(userMembershipTable, instanceID, sequence) -} - -func (v *View) ProcessedUserMembershipFailedEvent(failedEvent *repository.FailedEvent) error { - return v.saveFailedEvent(failedEvent) -} diff --git a/internal/user/repository/view/usermembership_view.go b/internal/user/repository/view/usermembership_view.go deleted file mode 100644 index c388877a12..0000000000 --- a/internal/user/repository/view/usermembership_view.go +++ /dev/null @@ -1,109 +0,0 @@ -package view - -import ( - "github.com/jinzhu/gorm" - - "github.com/zitadel/zitadel/internal/domain" - caos_errs "github.com/zitadel/zitadel/internal/errors" - usr_model "github.com/zitadel/zitadel/internal/user/model" - "github.com/zitadel/zitadel/internal/user/repository/view/model" - "github.com/zitadel/zitadel/internal/view/repository" -) - -func UserMembershipByIDs(db *gorm.DB, table, userID, aggregateID, objectID, instanceID string, membertype usr_model.MemberType) (*model.UserMembershipView, error) { - memberships := new(model.UserMembershipView) - userIDQuery := &model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyUserID, Value: userID, Method: domain.SearchMethodEquals} - aggregateIDQuery := &model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyAggregateID, Value: aggregateID, Method: domain.SearchMethodEquals} - objectIDQuery := &model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyObjectID, Value: objectID, Method: domain.SearchMethodEquals} - memberTypeQuery := &model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyMemberType, Value: int32(membertype), Method: domain.SearchMethodEquals} - instanceIDQuery := &model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyInstanceID, Value: instanceID, Method: domain.SearchMethodEquals} - - query := repository.PrepareGetByQuery(table, userIDQuery, aggregateIDQuery, objectIDQuery, memberTypeQuery, instanceIDQuery) - err := query(db, memberships) - if caos_errs.IsNotFound(err) { - return nil, caos_errs.ThrowNotFound(nil, "VIEW-5Tsji", "Errors.UserMembership.NotFound") - } - return memberships, err -} - -func UserMembershipsByAggregateID(db *gorm.DB, table, aggregateID, instanceID string) ([]*model.UserMembershipView, error) { - memberships := make([]*model.UserMembershipView, 0) - aggregateIDQuery := &usr_model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyAggregateID, Value: aggregateID, Method: domain.SearchMethodEquals} - instanceIDQuery := &usr_model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyInstanceID, Value: instanceID, Method: domain.SearchMethodEquals} - query := repository.PrepareSearchQuery(table, model.UserMembershipSearchRequest{ - Queries: []*usr_model.UserMembershipSearchQuery{aggregateIDQuery, instanceIDQuery}, - }) - _, err := query(db, &memberships) - return memberships, err -} - -func UserMembershipsByResourceOwner(db *gorm.DB, table, resourceOwner, instanceID string) ([]*model.UserMembershipView, error) { - memberships := make([]*model.UserMembershipView, 0) - aggregateIDQuery := &usr_model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyResourceOwner, Value: resourceOwner, Method: domain.SearchMethodEquals} - instanceIDQuery := &usr_model.UserMembershipSearchQuery{Key: usr_model.UserMembershipSearchKeyInstanceID, Value: instanceID, Method: domain.SearchMethodEquals} - query := repository.PrepareSearchQuery(table, model.UserMembershipSearchRequest{ - Queries: []*usr_model.UserMembershipSearchQuery{aggregateIDQuery, instanceIDQuery}, - }) - _, err := query(db, &memberships) - return memberships, err -} - -func SearchUserMemberships(db *gorm.DB, table string, req *usr_model.UserMembershipSearchRequest) ([]*model.UserMembershipView, uint64, error) { - users := make([]*model.UserMembershipView, 0) - query := repository.PrepareSearchQuery(table, model.UserMembershipSearchRequest{Limit: req.Limit, Offset: req.Offset, Queries: req.Queries}) - count, err := query(db, &users) - if err != nil { - return nil, 0, err - } - return users, count, nil -} - -func PutUserMemberships(db *gorm.DB, table string, users ...*model.UserMembershipView) error { - save := repository.PrepareBulkSave(table) - u := make([]interface{}, len(users)) - for i, user := range users { - u[i] = user - } - return save(db, u...) -} - -func PutUserMembership(db *gorm.DB, table string, user *model.UserMembershipView) error { - save := repository.PrepareSave(table) - return save(db, user) -} - -func DeleteUserMembership(db *gorm.DB, table, userID, aggregateID, objectID, instanceID string, membertype usr_model.MemberType) error { - delete := repository.PrepareDeleteByKeys(table, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyUserID), Value: userID}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyAggregateID), Value: aggregateID}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyObjectID), Value: objectID}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyMemberType), Value: membertype}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyInstanceID), Value: instanceID}, - ) - return delete(db) -} - -func DeleteUserMembershipsByUserID(db *gorm.DB, table, userID, instanceID string) error { - delete := repository.PrepareDeleteByKeys(table, - repository.Key{model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyUserID), userID}, - repository.Key{model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyInstanceID), instanceID}, - ) - return delete(db) -} - -func DeleteUserMembershipsByAggregateID(db *gorm.DB, table, aggregateID, instanceID string) error { - delete := repository.PrepareDeleteByKeys(table, - repository.Key{model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyAggregateID), aggregateID}, - repository.Key{model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyInstanceID), instanceID}, - ) - return delete(db) -} - -func DeleteUserMembershipsByAggregateIDAndObjectID(db *gorm.DB, table, aggregateID, objectID, instanceID string) error { - delete := repository.PrepareDeleteByKeys(table, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyAggregateID), Value: aggregateID}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyObjectID), Value: objectID}, - repository.Key{Key: model.UserMembershipSearchKey(usr_model.UserMembershipSearchKeyInstanceID), Value: instanceID}, - ) - return delete(db) -}