diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index 0ff4356d6f..2a54055eaa 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -123,7 +123,6 @@ func projections( newEventstore := new_es.NewEventstore(client) config.Eventstore.Querier = old_es.NewPostgres(client) config.Eventstore.Pusher = newEventstore - config.Eventstore.Searcher = newEventstore es := eventstore.NewEventstore(config.Eventstore) esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{ @@ -282,13 +281,6 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc continue } - err = projection.ProjectInstanceFields(ctx) - if err != nil { - logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed") - failedInstances <- instance - continue - } - err = admin_handler.ProjectInstance(ctx) if err != nil { logging.WithFields("instance", instance).WithError(err).Info("trigger admin handler failed") @@ -296,13 +288,6 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc continue } - err = projection.ProjectInstanceFields(ctx) - if err != nil { - logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed") - failedInstances <- instance - continue - } - err = auth_handler.ProjectInstance(ctx) if err != nil { logging.WithFields("instance", instance).WithError(err).Info("trigger auth handler failed") diff --git a/cmd/setup/29.go b/cmd/setup/29.go deleted file mode 100644 index 8df1047ec9..0000000000 --- a/cmd/setup/29.go +++ /dev/null @@ -1,40 +0,0 @@ -package setup - -import ( - "context" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/query/projection" - "github.com/zitadel/zitadel/internal/repository/instance" -) - -type FillFieldsForProjectGrant struct { - eventstore *eventstore.Eventstore -} - -func (mig *FillFieldsForProjectGrant) Execute(ctx context.Context, _ eventstore.Event) error { - instances, err := mig.eventstore.InstanceIDs( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs). - OrderDesc(). - AddQuery(). - AggregateTypes("instance"). - EventTypes(instance.InstanceAddedEventType). - Builder(), - ) - if err != nil { - return err - } - for _, instance := range instances { - ctx := authz.WithInstanceID(ctx, instance) - if err := projection.ProjectGrantFields.Trigger(ctx); err != nil { - return err - } - } - return nil -} - -func (mig *FillFieldsForProjectGrant) String() string { - return "29_init_fields_for_project_grant" -} diff --git a/cmd/setup/30.go b/cmd/setup/30.go deleted file mode 100644 index c2037a7f23..0000000000 --- a/cmd/setup/30.go +++ /dev/null @@ -1,40 +0,0 @@ -package setup - -import ( - "context" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/query/projection" - "github.com/zitadel/zitadel/internal/repository/instance" -) - -type FillFieldsForOrgDomainVerified struct { - eventstore *eventstore.Eventstore -} - -func (mig *FillFieldsForOrgDomainVerified) Execute(ctx context.Context, _ eventstore.Event) error { - instances, err := mig.eventstore.InstanceIDs( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs). - OrderDesc(). - AddQuery(). - AggregateTypes("instance"). - EventTypes(instance.InstanceAddedEventType). - Builder(), - ) - if err != nil { - return err - } - for _, instance := range instances { - ctx := authz.WithInstanceID(ctx, instance) - if err := projection.OrgDomainVerifiedFields.Trigger(ctx); err != nil { - return err - } - } - return nil -} - -func (mig *FillFieldsForOrgDomainVerified) String() string { - return "30_fill_fields_for_org_domain_verified" -} diff --git a/cmd/setup/41.go b/cmd/setup/41.go deleted file mode 100644 index fa4a1d5a4b..0000000000 --- a/cmd/setup/41.go +++ /dev/null @@ -1,44 +0,0 @@ -package setup - -import ( - "context" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/query/projection" - "github.com/zitadel/zitadel/internal/repository/instance" -) - -type FillFieldsForInstanceDomains struct { - eventstore *eventstore.Eventstore -} - -func (mig *FillFieldsForInstanceDomains) Execute(ctx context.Context, _ eventstore.Event) error { - instances, err := mig.eventstore.InstanceIDs( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs). - OrderDesc(). - AddQuery(). - AggregateTypes("instance"). - EventTypes(instance.InstanceAddedEventType). - Builder(), - ) - if err != nil { - return err - } - for _, instance := range instances { - ctx := authz.WithInstanceID(ctx, instance) - if err := projection.InstanceDomainFields.Trigger(ctx); err != nil { - return err - } - } - return nil -} - -func (mig *FillFieldsForInstanceDomains) String() string { - return "repeatable_fill_fields_for_instance_domains" -} - -func (f *FillFieldsForInstanceDomains) Check(lastRun map[string]interface{}) bool { - return true -} diff --git a/cmd/setup/47_fill_membership_fields.go b/cmd/setup/47_fill_membership_fields.go deleted file mode 100644 index 4e97b0403d..0000000000 --- a/cmd/setup/47_fill_membership_fields.go +++ /dev/null @@ -1,43 +0,0 @@ -package setup - -import ( - "context" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/query/projection" - "github.com/zitadel/zitadel/internal/repository/instance" -) - -type FillMembershipFields struct { - eventstore *eventstore.Eventstore -} - -func (mig *FillMembershipFields) Execute(ctx context.Context, _ eventstore.Event) error { - instances, err := mig.eventstore.InstanceIDs( - ctx, - eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs). - OrderDesc(). - AddQuery(). - AggregateTypes("instance"). - EventTypes(instance.InstanceAddedEventType). - Builder().ExcludeAggregateIDs(). - AggregateTypes("instance"). - EventTypes(instance.InstanceRemovedEventType). - Builder(), - ) - if err != nil { - return err - } - for _, instance := range instances { - ctx := authz.WithInstanceID(ctx, instance) - if err := projection.MembershipFields.Trigger(ctx); err != nil { - return err - } - } - return nil -} - -func (mig *FillMembershipFields) String() string { - return "47_fill_membership_fields" -} diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 0c3f726902..faeeb7d6cd 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -127,8 +127,6 @@ type Steps struct { s26AuthUsers3 *AuthUsers3 s27IDPTemplate6SAMLNameIDFormat *IDPTemplate6SAMLNameIDFormat s28AddFieldTable *AddFieldTable - s29FillFieldsForProjectGrant *FillFieldsForProjectGrant - s30FillFieldsForOrgDomainVerified *FillFieldsForOrgDomainVerified s31AddAggregateIndexToFields *AddAggregateIndexToFields s32AddAuthSessionID *AddAuthSessionID s33SMSConfigs3TwilioAddVerifyServiceSid *SMSConfigs3TwilioAddVerifyServiceSid @@ -143,7 +141,6 @@ type Steps struct { s44ReplaceCurrentSequencesIndex *ReplaceCurrentSequencesIndex s45CorrectProjectOwners *CorrectProjectOwners s46InitPermissionFunctions *InitPermissionFunctions - s47FillMembershipFields *FillMembershipFields s48Apps7SAMLConfigsLoginVersion *Apps7SAMLConfigsLoginVersion s49InitPermittedOrgsFunction *InitPermittedOrgsFunction s50IDPTemplate6UsePKCE *IDPTemplate6UsePKCE diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 8ee8d7fc68..3269e1c689 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -142,7 +142,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) config.Eventstore.Querier = old_es.NewPostgres(dbClient) esV3 := new_es.NewEventstore(dbClient) config.Eventstore.Pusher = esV3 - config.Eventstore.Searcher = esV3 eventstoreClient := eventstore.NewEventstore(config.Eventstore) logging.OnError(err).Fatal("unable to start eventstore") @@ -189,8 +188,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s26AuthUsers3 = &AuthUsers3{dbClient: dbClient} steps.s27IDPTemplate6SAMLNameIDFormat = &IDPTemplate6SAMLNameIDFormat{dbClient: dbClient} steps.s28AddFieldTable = &AddFieldTable{dbClient: dbClient} - steps.s29FillFieldsForProjectGrant = &FillFieldsForProjectGrant{eventstore: eventstoreClient} - steps.s30FillFieldsForOrgDomainVerified = &FillFieldsForOrgDomainVerified{eventstore: eventstoreClient} steps.s31AddAggregateIndexToFields = &AddAggregateIndexToFields{dbClient: dbClient} steps.s32AddAuthSessionID = &AddAuthSessionID{dbClient: dbClient} steps.s33SMSConfigs3TwilioAddVerifyServiceSid = &SMSConfigs3TwilioAddVerifyServiceSid{dbClient: dbClient} @@ -205,7 +202,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s44ReplaceCurrentSequencesIndex = &ReplaceCurrentSequencesIndex{dbClient: dbClient} steps.s45CorrectProjectOwners = &CorrectProjectOwners{eventstore: eventstoreClient} steps.s46InitPermissionFunctions = &InitPermissionFunctions{eventstoreClient: dbClient} - steps.s47FillMembershipFields = &FillMembershipFields{eventstore: eventstoreClient} steps.s48Apps7SAMLConfigsLoginVersion = &Apps7SAMLConfigsLoginVersion{dbClient: dbClient} steps.s49InitPermittedOrgsFunction = &InitPermittedOrgsFunction{eventstoreClient: dbClient} steps.s50IDPTemplate6UsePKCE = &IDPTemplate6UsePKCE{dbClient: dbClient} @@ -245,15 +241,12 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s23CorrectGlobalUniqueConstraints, steps.s24AddActorToAuthTokens, steps.s26AuthUsers3, - steps.s29FillFieldsForProjectGrant, - steps.s30FillFieldsForOrgDomainVerified, steps.s34AddCacheSchema, steps.s35AddPositionToIndexEsWm, steps.s36FillV2Milestones, steps.s38BackChannelLogoutNotificationStart, steps.s44ReplaceCurrentSequencesIndex, steps.s45CorrectProjectOwners, - steps.s47FillMembershipFields, steps.s49InitPermittedOrgsFunction, steps.s50IDPTemplate6UsePKCE, steps.s51IDPTemplate6RootCA, @@ -288,9 +281,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) &DeleteStaleOrgFields{ eventstore: eventstoreClient, }, - &FillFieldsForInstanceDomains{ - eventstore: eventstoreClient, - }, &SyncRolePermissions{ commands: commands, eventstore: eventstoreClient, diff --git a/cmd/start/start.go b/cmd/start/start.go index 8820480f0c..ba51958f1d 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -166,7 +166,6 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server } config.Eventstore.Pusher = new_es.NewEventstore(dbClient) - config.Eventstore.Searcher = new_es.NewEventstore(dbClient) config.Eventstore.Querier = old_es.NewPostgres(dbClient) eventstoreClient := eventstore.NewEventstore(config.Eventstore) eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(dbClient, &es_v4_pg.Config{ diff --git a/internal/api/grpc/feature/v2/converter.go b/internal/api/grpc/feature/v2/converter.go index e146ac2db6..b1dc20350e 100644 --- a/internal/api/grpc/feature/v2/converter.go +++ b/internal/api/grpc/feature/v2/converter.go @@ -176,14 +176,6 @@ func improvedPerformanceTypeToPb(typ feature.ImprovedPerformanceType) feature_pb return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_UNSPECIFIED case feature.ImprovedPerformanceTypeOrgByID: return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID - case feature.ImprovedPerformanceTypeProjectGrant: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT - case feature.ImprovedPerformanceTypeProject: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT - case feature.ImprovedPerformanceTypeUserGrant: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT - case feature.ImprovedPerformanceTypeOrgDomainVerified: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED default: return feature_pb.ImprovedPerformance(typ) } @@ -208,14 +200,6 @@ func improvedPerformanceToDomain(typ feature_pb.ImprovedPerformance) feature.Imp return feature.ImprovedPerformanceTypeUnspecified case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID: return feature.ImprovedPerformanceTypeOrgByID - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT: - return feature.ImprovedPerformanceTypeProjectGrant - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT: - return feature.ImprovedPerformanceTypeProject - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT: - return feature.ImprovedPerformanceTypeUserGrant - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED: - return feature.ImprovedPerformanceTypeOrgDomainVerified default: return feature.ImprovedPerformanceTypeUnspecified } diff --git a/internal/api/grpc/feature/v2beta/converter.go b/internal/api/grpc/feature/v2beta/converter.go index 9739e1c4c8..4b1e71afe9 100644 --- a/internal/api/grpc/feature/v2beta/converter.go +++ b/internal/api/grpc/feature/v2beta/converter.go @@ -113,14 +113,6 @@ func improvedPerformanceTypeToPb(typ feature.ImprovedPerformanceType) feature_pb return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_UNSPECIFIED case feature.ImprovedPerformanceTypeOrgByID: return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID - case feature.ImprovedPerformanceTypeProjectGrant: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT - case feature.ImprovedPerformanceTypeProject: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT - case feature.ImprovedPerformanceTypeUserGrant: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT - case feature.ImprovedPerformanceTypeOrgDomainVerified: - return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED default: return feature_pb.ImprovedPerformance(typ) } @@ -145,14 +137,6 @@ func improvedPerformanceToDomain(typ feature_pb.ImprovedPerformance) feature.Imp return feature.ImprovedPerformanceTypeUnspecified case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID: return feature.ImprovedPerformanceTypeOrgByID - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT: - return feature.ImprovedPerformanceTypeProjectGrant - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT: - return feature.ImprovedPerformanceTypeProject - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT: - return feature.ImprovedPerformanceTypeUserGrant - case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED: - return feature.ImprovedPerformanceTypeOrgDomainVerified default: return feature.ImprovedPerformanceTypeUnspecified } diff --git a/internal/command/org_domain.go b/internal/command/org_domain.go index 2e132f6c47..29cade7fb9 100644 --- a/internal/command/org_domain.go +++ b/internal/command/org_domain.go @@ -7,14 +7,11 @@ import ( "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" http_utils "github.com/zitadel/zitadel/internal/api/http" "github.com/zitadel/zitadel/internal/command/preparation" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/feature" - "github.com/zitadel/zitadel/internal/query/projection" "github.com/zitadel/zitadel/internal/repository/org" "github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/zerrors" @@ -353,38 +350,6 @@ func (c *Commands) changeDefaultDomain(ctx context.Context, orgID, newName strin return nil, nil } -func (c *Commands) removeCustomDomains(ctx context.Context, orgID string) ([]eventstore.Command, error) { - orgDomains := NewOrgDomainsWriteModel(orgID) - err := c.eventstore.FilterToQueryReducer(ctx, orgDomains) - if err != nil { - return nil, err - } - hasDefault := false - defaultDomain, _ := domain.NewIAMDomainName(orgDomains.OrgName, http_utils.DomainContext(ctx).RequestedDomain()) - isPrimary := defaultDomain == orgDomains.PrimaryDomain - orgAgg := OrgAggregateFromWriteModel(&orgDomains.WriteModel) - events := make([]eventstore.Command, 0, len(orgDomains.Domains)) - for _, orgDomain := range orgDomains.Domains { - if orgDomain.State == domain.OrgDomainStateActive { - if orgDomain.Domain == defaultDomain { - hasDefault = true - continue - } - events = append(events, org.NewDomainRemovedEvent(ctx, orgAgg, orgDomain.Domain, orgDomain.Verified)) - } - } - if !hasDefault { - return append([]eventstore.Command{ - org.NewDomainAddedEvent(ctx, orgAgg, defaultDomain), - org.NewDomainPrimarySetEvent(ctx, orgAgg, defaultDomain), - }, events...), nil - } - if !isPrimary { - return append([]eventstore.Command{org.NewDomainPrimarySetEvent(ctx, orgAgg, defaultDomain)}, events...), nil - } - return events, nil -} - func (c *Commands) getOrgDomainWriteModel(ctx context.Context, orgID, domain string) (_ *OrgDomainWriteModel, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() @@ -404,45 +369,6 @@ type OrgDomainVerified struct { } func (c *Commands) searchOrgDomainVerifiedByDomain(ctx context.Context, domain string) (_ *OrgDomainVerified, err error) { - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeOrgDomainVerified) { - return c.searchOrgDomainVerifiedByDomainOld(ctx, domain) - } - - ctx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() - - condition := map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: org.AggregateType, - eventstore.FieldTypeObjectType: org.OrgDomainSearchType, - eventstore.FieldTypeObjectID: domain, - eventstore.FieldTypeObjectRevision: org.OrgDomainObjectRevision, - eventstore.FieldTypeFieldName: org.OrgDomainVerifiedSearchField, - } - - results, err := c.eventstore.Search(ctx, condition) - if err != nil { - return nil, err - } - if len(results) == 0 { - _ = projection.OrgDomainVerifiedFields.Trigger(ctx) - results, err = c.eventstore.Search(ctx, condition) - if err != nil { - return nil, err - } - } - - orgDomain := new(OrgDomainVerified) - for _, result := range results { - orgDomain.OrgID = result.Aggregate.ID - if err = result.Value.Unmarshal(&orgDomain.Verified); err != nil { - return nil, err - } - } - - return orgDomain, nil -} - -func (c *Commands) searchOrgDomainVerifiedByDomainOld(ctx context.Context, domain string) (_ *OrgDomainVerified, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() diff --git a/internal/command/project.go b/internal/command/project.go index 40aa79f186..d17c71be60 100644 --- a/internal/command/project.go +++ b/internal/command/project.go @@ -7,13 +7,10 @@ import ( "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/command/preparation" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/v1/models" - "github.com/zitadel/zitadel/internal/feature" - "github.com/zitadel/zitadel/internal/query/projection" "github.com/zitadel/zitadel/internal/repository/project" "github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/zerrors" @@ -138,53 +135,18 @@ func projectWriteModel(ctx context.Context, filter preparation.FilterToQueryRedu return project, nil } -func (c *Commands) projectAggregateByID(ctx context.Context, projectID, resourceOwner string) (*eventstore.Aggregate, domain.ProjectState, error) { - result, err := c.projectState(ctx, projectID, resourceOwner) - if err != nil { - return nil, domain.ProjectStateUnspecified, zerrors.ThrowNotFound(err, "COMMA-NDQoF", "Errors.Project.NotFound") - } - if len(result) == 0 { - _ = projection.ProjectGrantFields.Trigger(ctx) - result, err = c.projectState(ctx, projectID, resourceOwner) - if err != nil || len(result) == 0 { - return nil, domain.ProjectStateUnspecified, zerrors.ThrowNotFound(err, "COMMA-U1nza", "Errors.Project.NotFound") - } - } - - var state domain.ProjectState - err = result[0].Value.Unmarshal(&state) - if err != nil { - return nil, state, zerrors.ThrowNotFound(err, "COMMA-o4n6F", "Errors.Project.NotFound") - } - return &result[0].Aggregate, state, nil -} - -func (c *Commands) projectState(ctx context.Context, projectID, resourceOwner string) ([]*eventstore.SearchResult, error) { - return c.eventstore.Search( - ctx, - map[eventstore.FieldType]any{ - eventstore.FieldTypeObjectType: project.ProjectSearchType, - eventstore.FieldTypeObjectID: projectID, - eventstore.FieldTypeObjectRevision: project.ProjectObjectRevision, - eventstore.FieldTypeFieldName: project.ProjectStateSearchField, - eventstore.FieldTypeResourceOwner: resourceOwner, - }, - ) -} - func (c *Commands) checkProjectExists(ctx context.Context, projectID, resourceOwner string) (_ string, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) { - return c.checkProjectExistsOld(ctx, projectID, resourceOwner) + projectWriteModel, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) + if err != nil { + return "", err } - - agg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner) - if err != nil || !state.Valid() { - return "", zerrors.ThrowPreconditionFailed(err, "COMMA-VCnwD", "Errors.Project.NotFound") + if !isProjectStateExists(projectWriteModel.State) { + return "", zerrors.ThrowPreconditionFailed(nil, "COMMAND-EbFMN", "Errors.Project.NotFound") } - return agg.ResourceOwner, nil + return projectWriteModel.ResourceOwner, nil } type ChangeProject struct { @@ -246,35 +208,31 @@ func (c *Commands) DeactivateProject(ctx context.Context, projectID string, reso return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-88iF0", "Errors.Project.ProjectIDMissing") } - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) { - return c.deactivateProjectOld(ctx, projectID, resourceOwner) - } - - projectAgg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner) + existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) if err != nil { return nil, err } - - if !isProjectStateExists(state) { + if !isProjectStateExists(existingProject.State) { return nil, zerrors.ThrowNotFound(nil, "COMMAND-112M9", "Errors.Project.NotFound") } - if state != domain.ProjectStateActive { + if existingProject.State != domain.ProjectStateActive { return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-mki55", "Errors.Project.NotActive") } - if err := c.checkPermissionUpdateProject(ctx, projectAgg.ResourceOwner, projectAgg.ID); err != nil { + if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil { return nil, err } + //nolint: contextcheck + projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel) pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectDeactivatedEvent(ctx, projectAgg)) if err != nil { return nil, err } - - return &domain.ObjectDetails{ - ResourceOwner: pushedEvents[0].Aggregate().ResourceOwner, - Sequence: pushedEvents[0].Sequence(), - EventDate: pushedEvents[0].CreatedAt(), - }, nil + err = AppendAndReduce(existingProject, pushedEvents...) + if err != nil { + return nil, err + } + return writeModelToObjectDetails(&existingProject.WriteModel), nil } func (c *Commands) ReactivateProject(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) { @@ -282,35 +240,31 @@ func (c *Commands) ReactivateProject(ctx context.Context, projectID string, reso return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-3ihsF", "Errors.Project.ProjectIDMissing") } - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) { - return c.reactivateProjectOld(ctx, projectID, resourceOwner) - } - - projectAgg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner) + existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) if err != nil { return nil, err } - - if !isProjectStateExists(state) { + if !isProjectStateExists(existingProject.State) { return nil, zerrors.ThrowNotFound(nil, "COMMAND-3M9sd", "Errors.Project.NotFound") } - if state != domain.ProjectStateInactive { + if existingProject.State != domain.ProjectStateInactive { return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-5M9bs", "Errors.Project.NotInactive") } - if err := c.checkPermissionUpdateProject(ctx, projectAgg.ResourceOwner, projectAgg.ID); err != nil { + if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil { return nil, err } + //nolint: contextcheck + projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel) pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectReactivatedEvent(ctx, projectAgg)) if err != nil { return nil, err } - - return &domain.ObjectDetails{ - ResourceOwner: pushedEvents[0].Aggregate().ResourceOwner, - Sequence: pushedEvents[0].Sequence(), - EventDate: pushedEvents[0].CreatedAt(), - }, nil + err = AppendAndReduce(existingProject, pushedEvents...) + if err != nil { + return nil, err + } + return writeModelToObjectDetails(&existingProject.WriteModel), nil } // Deprecated: use commands.DeleteProject diff --git a/internal/command/project_grant.go b/internal/command/project_grant.go index b613974b7e..08f09f73e9 100644 --- a/internal/command/project_grant.go +++ b/internal/command/project_grant.go @@ -6,12 +6,9 @@ import ( "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" es_models "github.com/zitadel/zitadel/internal/eventstore/v1/models" - "github.com/zitadel/zitadel/internal/feature" - "github.com/zitadel/zitadel/internal/repository/org" "github.com/zitadel/zitadel/internal/repository/project" "github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/zerrors" @@ -379,94 +376,19 @@ func (c *Commands) projectGrantWriteModelByID(ctx context.Context, grantID, gran } func (c *Commands) checkProjectGrantPreCondition(ctx context.Context, projectID, grantedOrgID, resourceOwner string, roles []string) (string, error) { - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProjectGrant) { - return c.checkProjectGrantPreConditionOld(ctx, projectID, grantedOrgID, resourceOwner, roles) - } - projectResourceOwner, existingRoleKeys, err := c.searchProjectGrantState(ctx, projectID, grantedOrgID, resourceOwner) + preConditions := NewProjectGrantPreConditionReadModel(projectID, grantedOrgID, resourceOwner) + err := c.eventstore.FilterToQueryReducer(ctx, preConditions) if err != nil { return "", err } - - if domain.HasInvalidRoles(existingRoleKeys, roles) { + if !preConditions.ProjectExists { + return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound") + } + if !preConditions.GrantedOrgExists { + return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound") + } + if domain.HasInvalidRoles(preConditions.ExistingRoleKeys, roles) { return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-6m9gd", "Errors.Project.Role.NotFound") } - return projectResourceOwner, nil -} - -func (c *Commands) searchProjectGrantState(ctx context.Context, projectID, grantedOrgID, resourceOwner string) (_ string, existingRoleKeys []string, err error) { - projectStateQuery := map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: project.AggregateType, - eventstore.FieldTypeAggregateID: projectID, - eventstore.FieldTypeFieldName: project.ProjectStateSearchField, - eventstore.FieldTypeObjectType: project.ProjectSearchType, - } - grantedOrgQuery := map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: org.AggregateType, - eventstore.FieldTypeAggregateID: grantedOrgID, - eventstore.FieldTypeFieldName: org.OrgStateSearchField, - eventstore.FieldTypeObjectType: org.OrgSearchType, - } - roleQuery := map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: project.AggregateType, - eventstore.FieldTypeAggregateID: projectID, - eventstore.FieldTypeFieldName: project.ProjectRoleKeySearchField, - eventstore.FieldTypeObjectType: project.ProjectRoleSearchType, - } - - // as resourceowner is not always provided, it has to be separately - if resourceOwner != "" { - projectStateQuery[eventstore.FieldTypeResourceOwner] = resourceOwner - roleQuery[eventstore.FieldTypeResourceOwner] = resourceOwner - } - - results, err := c.eventstore.Search( - ctx, - projectStateQuery, - grantedOrgQuery, - roleQuery, - ) - if err != nil { - return "", nil, err - } - - var ( - existsProject bool - existingProjectResourceOwner string - existsGrantedOrg bool - ) - - for _, result := range results { - switch result.Object.Type { - case project.ProjectRoleSearchType: - var role string - err := result.Value.Unmarshal(&role) - if err != nil { - return "", nil, err - } - existingRoleKeys = append(existingRoleKeys, role) - case org.OrgSearchType: - var state domain.OrgState - err := result.Value.Unmarshal(&state) - if err != nil { - return "", nil, err - } - existsGrantedOrg = state.Valid() && state != domain.OrgStateRemoved - case project.ProjectSearchType: - var state domain.ProjectState - err := result.Value.Unmarshal(&state) - if err != nil { - return "", nil, err - } - existsProject = state.Valid() && state != domain.ProjectStateRemoved - existingProjectResourceOwner = result.Aggregate.ResourceOwner - } - } - - if !existsProject { - return "", nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound") - } - if !existsGrantedOrg { - return "", nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound") - } - return existingProjectResourceOwner, existingRoleKeys, nil + return preConditions.ProjectResourceOwner, nil } diff --git a/internal/command/project_old.go b/internal/command/project_old.go deleted file mode 100644 index 99d7dd2e34..0000000000 --- a/internal/command/project_old.go +++ /dev/null @@ -1,98 +0,0 @@ -package command - -import ( - "context" - - "github.com/zitadel/zitadel/internal/domain" - "github.com/zitadel/zitadel/internal/repository/project" - "github.com/zitadel/zitadel/internal/telemetry/tracing" - "github.com/zitadel/zitadel/internal/zerrors" -) - -func (c *Commands) checkProjectExistsOld(ctx context.Context, projectID, resourceOwner string) (_ string, err error) { - ctx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() - - projectWriteModel, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) - if err != nil { - return "", err - } - if !isProjectStateExists(projectWriteModel.State) { - return "", zerrors.ThrowPreconditionFailed(nil, "COMMAND-EbFMN", "Errors.Project.NotFound") - } - return projectWriteModel.ResourceOwner, nil -} - -func (c *Commands) deactivateProjectOld(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) { - existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) - if err != nil { - return nil, err - } - if !isProjectStateExists(existingProject.State) { - return nil, zerrors.ThrowNotFound(nil, "COMMAND-112M9", "Errors.Project.NotFound") - } - if existingProject.State != domain.ProjectStateActive { - return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-mki55", "Errors.Project.NotActive") - } - if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil { - return nil, err - } - - //nolint: contextcheck - projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel) - pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectDeactivatedEvent(ctx, projectAgg)) - if err != nil { - return nil, err - } - err = AppendAndReduce(existingProject, pushedEvents...) - if err != nil { - return nil, err - } - return writeModelToObjectDetails(&existingProject.WriteModel), nil -} - -func (c *Commands) reactivateProjectOld(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) { - existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner) - if err != nil { - return nil, err - } - if !isProjectStateExists(existingProject.State) { - return nil, zerrors.ThrowNotFound(nil, "COMMAND-3M9sd", "Errors.Project.NotFound") - } - if existingProject.State != domain.ProjectStateInactive { - return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-5M9bs", "Errors.Project.NotInactive") - } - if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil { - return nil, err - } - - //nolint: contextcheck - projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel) - pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectReactivatedEvent(ctx, projectAgg)) - if err != nil { - return nil, err - } - err = AppendAndReduce(existingProject, pushedEvents...) - if err != nil { - return nil, err - } - return writeModelToObjectDetails(&existingProject.WriteModel), nil -} - -func (c *Commands) checkProjectGrantPreConditionOld(ctx context.Context, projectID, grantedOrgID, resourceOwner string, roles []string) (string, error) { - preConditions := NewProjectGrantPreConditionReadModel(projectID, grantedOrgID, resourceOwner) - err := c.eventstore.FilterToQueryReducer(ctx, preConditions) - if err != nil { - return "", err - } - if !preConditions.ProjectExists { - return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound") - } - if !preConditions.GrantedOrgExists { - return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound") - } - if domain.HasInvalidRoles(preConditions.ExistingRoleKeys, roles) { - return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-6m9gd", "Errors.Project.Role.NotFound") - } - return preConditions.ProjectResourceOwner, nil -} diff --git a/internal/command/user_grant.go b/internal/command/user_grant.go index 6bb4a20b0a..ba4a5d9115 100644 --- a/internal/command/user_grant.go +++ b/internal/command/user_grant.go @@ -4,12 +4,8 @@ import ( "context" "reflect" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/feature" - "github.com/zitadel/zitadel/internal/repository/org" - "github.com/zitadel/zitadel/internal/repository/project" "github.com/zitadel/zitadel/internal/repository/usergrant" "github.com/zitadel/zitadel/internal/telemetry/tracing" "github.com/zitadel/zitadel/internal/zerrors" @@ -292,140 +288,6 @@ func (c *Commands) userGrantWriteModelByID(ctx context.Context, userGrantID, res } func (c *Commands) checkUserGrantPreCondition(ctx context.Context, usergrant *domain.UserGrant, resourceOwner string) (err error) { - if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeUserGrant) { - return c.checkUserGrantPreConditionOld(ctx, usergrant, resourceOwner) - } - - ctx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() - - if err := c.checkUserExists(ctx, usergrant.UserID, ""); err != nil { - return err - } - existingRoleKeys, err := c.searchUserGrantPreConditionState(ctx, usergrant, resourceOwner) - if err != nil { - return err - } - if usergrant.HasInvalidRoles(existingRoleKeys) { - return zerrors.ThrowPreconditionFailed(err, "COMMAND-mm9F4", "Errors.Project.Role.NotFound") - } - return nil -} - -// this code needs to be rewritten anyways as soon as we improved the fields handling -// -//nolint:gocognit -func (c *Commands) searchUserGrantPreConditionState(ctx context.Context, userGrant *domain.UserGrant, resourceOwner string) (existingRoleKeys []string, err error) { - criteria := []map[eventstore.FieldType]any{ - // project state query - { - eventstore.FieldTypeAggregateType: project.AggregateType, - eventstore.FieldTypeAggregateID: userGrant.ProjectID, - eventstore.FieldTypeFieldName: project.ProjectStateSearchField, - eventstore.FieldTypeObjectType: project.ProjectSearchType, - }, - // granted org query - { - eventstore.FieldTypeAggregateType: org.AggregateType, - eventstore.FieldTypeAggregateID: resourceOwner, - eventstore.FieldTypeFieldName: org.OrgStateSearchField, - eventstore.FieldTypeObjectType: org.OrgSearchType, - }, - } - if userGrant.ProjectGrantID != "" { - criteria = append(criteria, map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: project.AggregateType, - eventstore.FieldTypeAggregateID: userGrant.ProjectID, - eventstore.FieldTypeObjectType: project.ProjectGrantSearchType, - eventstore.FieldTypeObjectID: userGrant.ProjectGrantID, - }) - } else { - criteria = append(criteria, map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateType: project.AggregateType, - eventstore.FieldTypeAggregateID: userGrant.ProjectID, - eventstore.FieldTypeObjectType: project.ProjectRoleSearchType, - eventstore.FieldTypeFieldName: project.ProjectRoleKeySearchField, - }) - } - results, err := c.eventstore.Search(ctx, criteria...) - if err != nil { - return nil, err - } - - var ( - existsProject bool - existsGrantedOrg bool - existsGrant bool - ) - - for _, result := range results { - switch result.Object.Type { - case project.ProjectRoleSearchType: - var role string - err := result.Value.Unmarshal(&role) - if err != nil { - return nil, err - } - existingRoleKeys = append(existingRoleKeys, role) - case org.OrgSearchType: - var state domain.OrgState - err := result.Value.Unmarshal(&state) - if err != nil { - return nil, err - } - existsGrantedOrg = state.Valid() && state != domain.OrgStateRemoved - case project.ProjectSearchType: - var state domain.ProjectState - err := result.Value.Unmarshal(&state) - if err != nil { - return nil, err - } - existsProject = state.Valid() && state != domain.ProjectStateRemoved - case project.ProjectGrantSearchType: - switch result.FieldName { - case project.ProjectGrantGrantedOrgIDSearchField: - var orgID string - err := result.Value.Unmarshal(&orgID) - if err != nil || orgID != resourceOwner { - return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound") - } - case project.ProjectGrantStateSearchField: - var state domain.ProjectGrantState - err := result.Value.Unmarshal(&state) - if err != nil { - return nil, err - } - existsGrant = state.Valid() && state != domain.ProjectGrantStateRemoved - case project.ProjectGrantRoleKeySearchField: - var role string - err := result.Value.Unmarshal(&role) - if err != nil { - return nil, err - } - existingRoleKeys = append(existingRoleKeys, role) - case project.ProjectGrantGrantIDSearchField: - var grantID string - err := result.Value.Unmarshal(&grantID) - if err != nil || grantID != userGrant.ProjectGrantID { - return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-huvKF", "Errors.Project.Grant.NotFound") - } - } - } - } - - if !existsProject { - return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound") - } - if !existsGrantedOrg { - return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound") - } - if userGrant.ProjectGrantID != "" && !existsGrant { - return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-huvKF", "Errors.Project.Grant.NotFound") - } - return existingRoleKeys, nil -} - -func (c *Commands) checkUserGrantPreConditionOld(ctx context.Context, usergrant *domain.UserGrant, resourceOwner string) (err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() diff --git a/internal/eventstore/config.go b/internal/eventstore/config.go index 6fc3f79f2c..fd39556676 100644 --- a/internal/eventstore/config.go +++ b/internal/eventstore/config.go @@ -8,7 +8,6 @@ type Config struct { PushTimeout time.Duration MaxRetries uint32 - Pusher Pusher - Querier Querier - Searcher Searcher + Pusher Pusher + Querier Querier } diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 8a8d32bc43..db5e3d5967 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -12,7 +12,6 @@ import ( "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/zerrors" ) func init() { @@ -27,9 +26,8 @@ type Eventstore struct { PushTimeout time.Duration maxRetries int - pusher Pusher - querier Querier - searcher Searcher + pusher Pusher + querier Querier } var ( @@ -67,9 +65,8 @@ func NewEventstore(config *Config) *Eventstore { PushTimeout: config.PushTimeout, maxRetries: int(config.MaxRetries), - pusher: config.Pusher, - querier: config.Querier, - searcher: config.Searcher, + pusher: config.Pusher, + querier: config.Querier, } } @@ -149,20 +146,6 @@ func (es *Eventstore) AggregateTypes() []string { return aggregateTypes } -// FillFields implements the [Searcher] interface -func (es *Eventstore) FillFields(ctx context.Context, events ...FillFieldsEvent) error { - return es.searcher.FillFields(ctx, events...) -} - -// Search implements the [Searcher] interface -func (es *Eventstore) Search(ctx context.Context, conditions ...map[FieldType]any) ([]*SearchResult, error) { - if len(conditions) == 0 { - return nil, zerrors.ThrowInvalidArgument(nil, "V3-5Xbr1", "no search conditions") - } - - return es.searcher.Search(ctx, conditions...) -} - // Filter filters the stored events based on the searchQuery // and maps the events to the defined event structs // @@ -289,22 +272,6 @@ type Pusher interface { Client() *database.DB } -type FillFieldsEvent interface { - Event - Fields() []*FieldOperation -} - -type Searcher interface { - // Search allows to search for specific fields of objects - // The instance id is taken from the context - // The list of conditions are combined with AND - // The search fields are combined with OR - // At least one must be defined - Search(ctx context.Context, conditions ...map[FieldType]any) (result []*SearchResult, err error) - // FillFields is to insert the fields of previously stored events - FillFields(ctx context.Context, events ...FillFieldsEvent) error -} - func appendEventType(typ EventType) { i := sort.SearchStrings(eventTypes, string(typ)) if i < len(eventTypes) && eventTypes[i] == string(typ) { diff --git a/internal/eventstore/handler/v2/field_handler.go b/internal/eventstore/handler/v2/field_handler.go deleted file mode 100644 index 3c25731c83..0000000000 --- a/internal/eventstore/handler/v2/field_handler.go +++ /dev/null @@ -1,213 +0,0 @@ -package handler - -import ( - "context" - "database/sql" - "errors" - "sync" - "time" - - "github.com/jackc/pgx/v5/pgconn" - "github.com/shopspring/decimal" - - "github.com/zitadel/zitadel/internal/eventstore" -) - -type FieldHandler struct { - Handler -} - -type fieldProjection struct { - name string -} - -// Name implements Projection. -func (f *fieldProjection) Name() string { - return f.name -} - -// Reducers implements Projection. -func (f *fieldProjection) Reducers() []AggregateReducer { - return nil -} - -var _ Projection = (*fieldProjection)(nil) - -// NewFieldHandler returns a projection handler which backfills the `eventstore.fields` table with historic events which -// might have existed before they had and Field Operations defined. -// The events are filtered by the mapped aggregate types and each event type for that aggregate. -func NewFieldHandler(config *Config, name string, eventTypes map[eventstore.AggregateType][]eventstore.EventType) *FieldHandler { - return &FieldHandler{ - Handler: Handler{ - projection: &fieldProjection{name: name}, - client: config.Client, - es: config.Eventstore, - bulkLimit: config.BulkLimit, - eventTypes: eventTypes, - requeueEvery: config.RequeueEvery, - now: time.Now, - maxFailureCount: config.MaxFailureCount, - retryFailedAfter: config.RetryFailedAfter, - triggeredInstancesSync: sync.Map{}, - triggerWithoutEvents: config.TriggerWithoutEvents, - txDuration: config.TransactionDuration, - }, - } -} - -// Trigger executes the backfill job of events for the instance currently in the context. -func (h *FieldHandler) Trigger(ctx context.Context, opts ...TriggerOpt) (err error) { - config := new(triggerConfig) - for _, opt := range opts { - opt(config) - } - - cancel := h.lockInstance(ctx, config) - if cancel == nil { - return nil - } - defer cancel() - - for i := 0; ; i++ { - additionalIteration, err := h.processEvents(ctx, config) - h.log().OnError(err).Info("process events failed") - h.log().WithField("iteration", i).Debug("trigger iteration") - if !additionalIteration || err != nil { - return err - } - } -} - -func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) { - defer func() { - pgErr := new(pgconn.PgError) - if errors.As(err, &pgErr) { - // error returned if the row is currently locked by another connection - if pgErr.Code == "55P03" { - h.log().Debug("state already locked") - err = nil - additionalIteration = false - } - } - }() - - txCtx := ctx - if h.txDuration > 0 { - var cancel, cancelTx func() - // add 100ms to store current state if iteration takes too long - txCtx, cancelTx = context.WithTimeout(ctx, h.txDuration+100*time.Millisecond) - defer cancelTx() - ctx, cancel = context.WithTimeout(ctx, h.txDuration) - defer cancel() - } - - tx, err := h.client.BeginTx(txCtx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) - if err != nil { - return false, err - } - defer func() { - if err != nil && !errors.Is(err, &executionError{}) { - rollbackErr := tx.Rollback() - h.log().OnError(rollbackErr).Debug("unable to rollback tx") - return - } - commitErr := tx.Commit() - if err == nil { - err = commitErr - } - }() - - // always await currently running transactions - config.awaitRunning = true - currentState, err := h.currentState(ctx, tx, config) - if err != nil { - if errors.Is(err, errJustUpdated) { - return false, nil - } - return additionalIteration, err - } - // stop execution if currentState.eventTimestamp >= config.maxCreatedAt - if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) { - return false, nil - } - - if config.minPosition.GreaterThan(decimal.NewFromInt(0)) { - currentState.position = config.minPosition - currentState.offset = 0 - } - - events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState) - if err != nil { - return additionalIteration, err - } - if len(events) == 0 { - err = h.setState(tx, currentState) - return additionalIteration, err - } - - err = h.es.FillFields(ctx, events...) - if err != nil { - return false, err - } - - err = h.setState(tx, currentState) - - return additionalIteration, err -} - -func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) { - events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx)) - if err != nil || len(events) == 0 { - h.log().OnError(err).Debug("filter eventstore failed") - return nil, false, err - } - eventAmount := len(events) - - idx, offset := skipPreviouslyReducedEvents(events, currentState) - - if currentState.position.Equal(events[len(events)-1].Position()) { - offset += currentState.offset - } - currentState.position = events[len(events)-1].Position() - currentState.offset = offset - currentState.aggregateID = events[len(events)-1].Aggregate().ID - currentState.aggregateType = events[len(events)-1].Aggregate().Type - currentState.sequence = events[len(events)-1].Sequence() - currentState.eventTimestamp = events[len(events)-1].CreatedAt() - - if idx+1 == len(events) { - return nil, false, nil - } - events = events[idx+1:] - - additionalIteration = eventAmount == int(h.bulkLimit) - - fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events)) - highestPosition := events[len(events)-1].Position() - for i, event := range events { - if event.Position().Equal(highestPosition) { - offset++ - } - fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent) - } - - return fillFieldsEvents, additionalIteration, nil -} - -func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) { - var position decimal.Decimal - for i, event := range events { - if !event.Position().Equal(position) { - offset = 0 - position = event.Position() - } - offset++ - if event.Position().Equal(currentState.position) && - event.Aggregate().ID == currentState.aggregateID && - event.Aggregate().Type == currentState.aggregateType && - event.Sequence() == currentState.sequence { - return i, offset - } - } - return -1, 0 -} diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index fd8b206b38..91b78cb0ef 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -27,7 +27,6 @@ type EventStore interface { FilterToQueryReducer(ctx context.Context, reducer eventstore.QueryReducer) error Filter(ctx context.Context, queryFactory *eventstore.SearchQueryBuilder) ([]eventstore.Event, error) Push(ctx context.Context, cmds ...eventstore.Command) ([]eventstore.Event, error) - FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) error } type Config struct { diff --git a/internal/eventstore/v3/field.go b/internal/eventstore/v3/field.go deleted file mode 100644 index e8f761d410..0000000000 --- a/internal/eventstore/v3/field.go +++ /dev/null @@ -1,369 +0,0 @@ -package eventstore - -import ( - "context" - "database/sql" - _ "embed" - "encoding/json" - "reflect" - "slices" - "strconv" - "strings" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/database" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/telemetry/tracing" - "github.com/zitadel/zitadel/internal/zerrors" -) - -type fieldValue struct { - value []byte -} - -func (value *fieldValue) Unmarshal(ptr any) error { - return json.Unmarshal(value.value, ptr) -} - -func (es *Eventstore) FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) (err error) { - ctx, span := tracing.NewSpan(ctx) - defer span.End() - - tx, err := es.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) - if err != nil { - return err - } - defer func() { - if err != nil { - _ = tx.Rollback() - return - } - err = tx.Commit() - }() - - return handleFieldFillEvents(ctx, tx, events) -} - -// Search implements the [eventstore.Search] method -func (es *Eventstore) Search(ctx context.Context, conditions ...map[eventstore.FieldType]any) (result []*eventstore.SearchResult, err error) { - ctx, span := tracing.NewSpan(ctx) - defer func() { span.EndWithError(err) }() - - var builder strings.Builder - args := buildSearchStatement(ctx, &builder, conditions...) - - err = es.client.QueryContext( - ctx, - func(rows *sql.Rows) error { - for rows.Next() { - var ( - res eventstore.SearchResult - value fieldValue - ) - err = rows.Scan( - &res.Aggregate.InstanceID, - &res.Aggregate.ResourceOwner, - &res.Aggregate.Type, - &res.Aggregate.ID, - &res.Object.Type, - &res.Object.ID, - &res.Object.Revision, - &res.FieldName, - &value.value, - ) - if err != nil { - return err - } - res.Value = &value - - result = append(result, &res) - } - return nil - }, - builder.String(), - args..., - ) - if err != nil { - return nil, err - } - - return result, nil -} - -const searchQueryPrefix = `SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1` - -func buildSearchStatement(ctx context.Context, builder *strings.Builder, conditions ...map[eventstore.FieldType]any) []any { - args := make([]any, 0, len(conditions)*4+1) - args = append(args, authz.GetInstance(ctx).InstanceID()) - - builder.WriteString(searchQueryPrefix) - - builder.WriteString(" AND ") - if len(conditions) > 1 { - builder.WriteRune('(') - } - for i, condition := range conditions { - if i > 0 { - builder.WriteString(" OR ") - } - if len(condition) > 1 { - builder.WriteRune('(') - } - args = append(args, buildSearchCondition(builder, len(args)+1, condition)...) - if len(condition) > 1 { - builder.WriteRune(')') - } - } - if len(conditions) > 1 { - builder.WriteRune(')') - } - - return args -} - -func buildSearchCondition(builder *strings.Builder, index int, conditions map[eventstore.FieldType]any) []any { - args := make([]any, 0, len(conditions)) - - orderedCondition := make([]eventstore.FieldType, 0, len(conditions)) - for field := range conditions { - orderedCondition = append(orderedCondition, field) - } - slices.Sort(orderedCondition) - - for _, field := range orderedCondition { - if len(args) > 0 { - builder.WriteString(" AND ") - } - builder.WriteString(fieldNameByType(field, conditions[field])) - builder.WriteString(" = $") - builder.WriteString(strconv.Itoa(index + len(args))) - args = append(args, conditions[field]) - } - - return args -} - -func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Tx, commands []eventstore.Command) error { - for _, command := range commands { - if len(command.Fields()) > 0 { - if err := handleFieldOperations(ctx, tx, command.Fields()); err != nil { - return err - } - } - } - return nil -} - -func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventstore.FillFieldsEvent) error { - for _, event := range events { - if len(event.Fields()) == 0 { - continue - } - if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil { - return err - } - } - return nil -} - -func handleFieldOperations(ctx context.Context, tx database.Tx, operations []*eventstore.FieldOperation) error { - for _, operation := range operations { - if operation.Set != nil { - if err := handleFieldSet(ctx, tx, operation.Set); err != nil { - return err - } - continue - } - if operation.Remove != nil { - if err := handleSearchDelete(ctx, tx, operation.Remove); err != nil { - return err - } - } - } - - return nil -} - -func handleFieldSet(ctx context.Context, tx database.Tx, field *eventstore.Field) error { - if len(field.UpsertConflictFields) == 0 { - return handleSearchInsert(ctx, tx, field) - } - return handleSearchUpsert(ctx, tx, field) -} - -const ( - insertField = `INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)` -) - -func handleSearchInsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error { - value, err := json.Marshal(field.Value.Value) - if err != nil { - return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value") - } - _, err = tx.ExecContext( - ctx, - insertField, - - field.Aggregate.InstanceID, - field.Aggregate.ResourceOwner, - field.Aggregate.Type, - field.Aggregate.ID, - field.Object.Type, - field.Object.ID, - field.Object.Revision, - field.FieldName, - value, - field.Value.MustBeUnique, - field.Value.ShouldIndex, - ) - return err -} - -const ( - fieldsUpsertPrefix = `WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE ` - fieldsUpsertSuffix = ` RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)` -) - -func handleSearchUpsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error { - value, err := json.Marshal(field.Value.Value) - if err != nil { - return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value") - } - - _, err = tx.ExecContext( - ctx, - writeUpsertField(field.UpsertConflictFields), - - field.Aggregate.InstanceID, - field.Aggregate.ResourceOwner, - field.Aggregate.Type, - field.Aggregate.ID, - field.Object.Type, - field.Object.ID, - field.Object.Revision, - field.FieldName, - value, - field.Value.MustBeUnique, - field.Value.ShouldIndex, - ) - return err -} - -func writeUpsertField(fields []eventstore.FieldType) string { - var builder strings.Builder - - builder.WriteString(fieldsUpsertPrefix) - for i, fieldName := range fields { - if i > 0 { - builder.WriteString(" AND ") - } - name, index := searchFieldNameAndIndexByTypeForPush(fieldName) - - builder.WriteString(name) - builder.WriteString(" = ") - builder.WriteString(index) - } - builder.WriteString(fieldsUpsertSuffix) - - return builder.String() -} - -const removeSearch = `DELETE FROM eventstore.fields WHERE ` - -func handleSearchDelete(ctx context.Context, tx database.Tx, clauses map[eventstore.FieldType]any) error { - if len(clauses) == 0 { - return zerrors.ThrowInvalidArgument(nil, "V3-oqlBZ", "no conditions") - } - stmt, args := writeDeleteField(clauses) - _, err := tx.ExecContext(ctx, stmt, args...) - return err -} - -func writeDeleteField(clauses map[eventstore.FieldType]any) (string, []any) { - var ( - builder strings.Builder - args = make([]any, 0, len(clauses)) - ) - builder.WriteString(removeSearch) - - orderedCondition := make([]eventstore.FieldType, 0, len(clauses)) - for field := range clauses { - orderedCondition = append(orderedCondition, field) - } - slices.Sort(orderedCondition) - - for _, fieldName := range orderedCondition { - if len(args) > 0 { - builder.WriteString(" AND ") - } - builder.WriteString(fieldNameByType(fieldName, clauses[fieldName])) - - builder.WriteString(" = $") - builder.WriteString(strconv.Itoa(len(args) + 1)) - - args = append(args, clauses[fieldName]) - } - - return builder.String(), args -} - -func fieldNameByType(typ eventstore.FieldType, value any) string { - switch typ { - case eventstore.FieldTypeAggregateID: - return "aggregate_id" - case eventstore.FieldTypeAggregateType: - return "aggregate_type" - case eventstore.FieldTypeInstanceID: - return "instance_id" - case eventstore.FieldTypeResourceOwner: - return "resource_owner" - case eventstore.FieldTypeFieldName: - return "field_name" - case eventstore.FieldTypeObjectType: - return "object_type" - case eventstore.FieldTypeObjectID: - return "object_id" - case eventstore.FieldTypeObjectRevision: - return "object_revision" - case eventstore.FieldTypeValue: - return valueColumn(value) - } - return "" -} - -func searchFieldNameAndIndexByTypeForPush(typ eventstore.FieldType) (string, string) { - switch typ { - case eventstore.FieldTypeInstanceID: - return "instance_id", "$1" - case eventstore.FieldTypeResourceOwner: - return "resource_owner", "$2" - case eventstore.FieldTypeAggregateType: - return "aggregate_type", "$3" - case eventstore.FieldTypeAggregateID: - return "aggregate_id", "$4" - case eventstore.FieldTypeObjectType: - return "object_type", "$5" - case eventstore.FieldTypeObjectID: - return "object_id", "$6" - case eventstore.FieldTypeObjectRevision: - return "object_revision", "$7" - case eventstore.FieldTypeFieldName: - return "field_name", "$8" - case eventstore.FieldTypeValue: - return "value", "$9" - } - return "", "" -} - -func valueColumn(value any) string { - //nolint: exhaustive - switch reflect.TypeOf(value).Kind() { - case reflect.Bool: - return "bool_value" - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64: - return "number_value" - case reflect.String: - return "text_value" - } - return "" -} diff --git a/internal/eventstore/v3/field_test.go b/internal/eventstore/v3/field_test.go deleted file mode 100644 index 0847c1b8ad..0000000000 --- a/internal/eventstore/v3/field_test.go +++ /dev/null @@ -1,260 +0,0 @@ -package eventstore - -import ( - "context" - _ "embed" - "reflect" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/eventstore" -) - -func Test_handleSearchDelete(t *testing.T) { - type args struct { - clauses map[eventstore.FieldType]any - } - type want struct { - stmt string - args []any - } - tests := []struct { - name string - args args - want want - }{ - { - name: "1 condition", - args: args{ - clauses: map[eventstore.FieldType]any{ - eventstore.FieldTypeInstanceID: "i_id", - }, - }, - want: want{ - stmt: "DELETE FROM eventstore.fields WHERE instance_id = $1", - args: []any{"i_id"}, - }, - }, - { - name: "2 conditions", - args: args{ - clauses: map[eventstore.FieldType]any{ - eventstore.FieldTypeInstanceID: "i_id", - eventstore.FieldTypeAggregateID: "a_id", - }, - }, - want: want{ - stmt: "DELETE FROM eventstore.fields WHERE aggregate_id = $1 AND instance_id = $2", - args: []any{"a_id", "i_id"}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - stmt, args := writeDeleteField(tt.args.clauses) - if stmt != tt.want.stmt { - t.Errorf("handleSearchDelete() stmt = %q, want %q", stmt, tt.want.stmt) - } - assert.Equal(t, tt.want.args, args) - }) - } -} - -func Test_writeUpsertField(t *testing.T) { - type args struct { - fields []eventstore.FieldType - } - tests := []struct { - name string - args args - want string - }{ - { - name: "1 field", - args: args{ - fields: []eventstore.FieldType{ - eventstore.FieldTypeInstanceID, - }, - }, - want: "WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE instance_id = $1 RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)", - }, - { - name: "2 fields", - args: args{ - fields: []eventstore.FieldType{ - eventstore.FieldTypeInstanceID, - eventstore.FieldTypeAggregateType, - }, - }, - want: "WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE instance_id = $1 AND aggregate_type = $3 RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := writeUpsertField(tt.args.fields); got != tt.want { - t.Errorf("writeUpsertField() = %q, want %q", got, tt.want) - } - }) - } -} - -func Test_buildSearchCondition(t *testing.T) { - type args struct { - index int - conditions map[eventstore.FieldType]any - } - type want struct { - stmt string - args []any - } - tests := []struct { - name string - args args - want want - }{ - { - name: "1 condition", - args: args{ - index: 1, - conditions: map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateID: "a_id", - }, - }, - want: want{ - stmt: "aggregate_id = $1", - args: []any{"a_id"}, - }, - }, - { - name: "3 condition", - args: args{ - index: 1, - conditions: map[eventstore.FieldType]any{ - eventstore.FieldTypeAggregateID: "a_id", - eventstore.FieldTypeInstanceID: "i_id", - eventstore.FieldTypeAggregateType: "a_type", - }, - }, - want: want{ - stmt: "aggregate_type = $1 AND aggregate_id = $2 AND instance_id = $3", - args: []any{"a_type", "a_id", "i_id"}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var builder strings.Builder - - if got := buildSearchCondition(&builder, tt.args.index, tt.args.conditions); !reflect.DeepEqual(got, tt.want.args) { - t.Errorf("buildSearchCondition() = %v, want %v", got, tt.want) - } - if tt.want.stmt != builder.String() { - t.Errorf("buildSearchCondition() stmt = %q, want %q", builder.String(), tt.want.stmt) - } - }) - } -} - -func Test_buildSearchStatement(t *testing.T) { - type args struct { - index int - conditions []map[eventstore.FieldType]any - } - type want struct { - stmt string - args []any - } - tests := []struct { - name string - args args - want want - }{ - { - name: "1 condition with 1 field", - args: args{ - index: 1, - conditions: []map[eventstore.FieldType]any{ - { - eventstore.FieldTypeAggregateID: "a_id", - }, - }, - }, - want: want{ - stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND aggregate_id = $2", - args: []any{"a_id"}, - }, - }, - { - name: "1 condition with 3 fields", - args: args{ - index: 1, - conditions: []map[eventstore.FieldType]any{ - { - eventstore.FieldTypeAggregateID: "a_id", - eventstore.FieldTypeInstanceID: "i_id", - eventstore.FieldTypeAggregateType: "a_type", - }, - }, - }, - want: want{ - stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND (aggregate_type = $2 AND aggregate_id = $3 AND instance_id = $4)", - args: []any{"a_type", "a_id", "i_id"}, - }, - }, - { - name: "2 condition with 1 field", - args: args{ - index: 1, - conditions: []map[eventstore.FieldType]any{ - { - eventstore.FieldTypeAggregateID: "a_id", - }, - { - eventstore.FieldTypeAggregateType: "a_type", - }, - }, - }, - want: want{ - stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND (aggregate_id = $2 OR aggregate_type = $3)", - args: []any{"a_id", "a_type"}, - }, - }, - { - name: "2 condition with 2 fields", - args: args{ - index: 1, - conditions: []map[eventstore.FieldType]any{ - { - eventstore.FieldTypeAggregateID: "a_id1", - eventstore.FieldTypeAggregateType: "a_type1", - }, - { - eventstore.FieldTypeAggregateID: "a_id2", - eventstore.FieldTypeAggregateType: "a_type2", - }, - }, - }, - want: want{ - stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND ((aggregate_type = $2 AND aggregate_id = $3) OR (aggregate_type = $4 AND aggregate_id = $5))", - args: []any{"a_type1", "a_id1", "a_type2", "a_id2"}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var builder strings.Builder - tt.want.args = append([]any{"i_id"}, tt.want.args...) - ctx := authz.WithInstanceID(context.Background(), "i_id") - - if got := buildSearchStatement(ctx, &builder, tt.args.conditions...); !reflect.DeepEqual(got, tt.want.args) { - t.Errorf("buildSearchStatement() = %v, want %v", got, tt.want) - } - if tt.want.stmt != builder.String() { - t.Errorf("buildSearchStatement() stmt = %q, want %q", builder.String(), tt.want.stmt) - } - }) - } -} diff --git a/internal/eventstore/v3/push.go b/internal/eventstore/v3/push.go index 6497b96ed8..a4994f5810 100644 --- a/internal/eventstore/v3/push.go +++ b/internal/eventstore/v3/push.go @@ -71,11 +71,6 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context return nil, err } - err = es.handleFieldCommands(ctx, tx, commands) - if err != nil { - return nil, err - } - return events, nil } diff --git a/internal/eventstore/v3/push_without_func.go b/internal/eventstore/v3/push_without_func.go index b94a9e8f54..dc72665635 100644 --- a/internal/eventstore/v3/push_without_func.go +++ b/internal/eventstore/v3/push_without_func.go @@ -61,11 +61,6 @@ func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.Conte return nil, err } - err = es.handleFieldCommands(ctx, tx, commands) - if err != nil { - return nil, err - } - return events, nil } diff --git a/internal/feature/feature.go b/internal/feature/feature.go index b5f5a901d4..c8d2864f62 100644 --- a/internal/feature/feature.go +++ b/internal/feature/feature.go @@ -64,10 +64,6 @@ type ImprovedPerformanceType int32 const ( ImprovedPerformanceTypeUnspecified ImprovedPerformanceType = iota ImprovedPerformanceTypeOrgByID - ImprovedPerformanceTypeProjectGrant - ImprovedPerformanceTypeProject - ImprovedPerformanceTypeUserGrant - ImprovedPerformanceTypeOrgDomainVerified ) func (f Features) ShouldUseImprovedPerformance(typ ImprovedPerformanceType) bool { diff --git a/internal/query/projection/eventstore_field.go b/internal/query/projection/eventstore_field.go deleted file mode 100644 index 73e3ac2c82..0000000000 --- a/internal/query/projection/eventstore_field.go +++ /dev/null @@ -1,100 +0,0 @@ -package projection - -import ( - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/eventstore/handler/v2" - "github.com/zitadel/zitadel/internal/repository/instance" - "github.com/zitadel/zitadel/internal/repository/org" - "github.com/zitadel/zitadel/internal/repository/permission" - "github.com/zitadel/zitadel/internal/repository/project" -) - -const ( - fieldsProjectGrant = "project_grant_fields" - fieldsOrgDomainVerified = "org_domain_verified_fields" - fieldsInstanceDomain = "instance_domain_fields" - fieldsMemberships = "membership_fields" - fieldsPermission = "permission_fields" -) - -func newFillProjectGrantFields(config handler.Config) *handler.FieldHandler { - return handler.NewFieldHandler( - &config, - fieldsProjectGrant, - map[eventstore.AggregateType][]eventstore.EventType{ - org.AggregateType: nil, - project.AggregateType: nil, - }, - ) -} - -func newFillOrgDomainVerifiedFields(config handler.Config) *handler.FieldHandler { - return handler.NewFieldHandler( - &config, - fieldsOrgDomainVerified, - map[eventstore.AggregateType][]eventstore.EventType{ - org.AggregateType: { - org.OrgDomainAddedEventType, - org.OrgDomainVerifiedEventType, - org.OrgDomainRemovedEventType, - }, - }, - ) -} - -func newFillInstanceDomainFields(config handler.Config) *handler.FieldHandler { - return handler.NewFieldHandler( - &config, - fieldsInstanceDomain, - map[eventstore.AggregateType][]eventstore.EventType{ - instance.AggregateType: { - instance.InstanceDomainAddedEventType, - instance.InstanceDomainRemovedEventType, - instance.InstanceRemovedEventType, - }, - }, - ) -} - -func newFillMembershipFields(config handler.Config) *handler.FieldHandler { - return handler.NewFieldHandler( - &config, - fieldsMemberships, - map[eventstore.AggregateType][]eventstore.EventType{ - instance.AggregateType: { - instance.MemberAddedEventType, - instance.MemberChangedEventType, - instance.MemberRemovedEventType, - instance.MemberCascadeRemovedEventType, - instance.InstanceRemovedEventType, - }, - org.AggregateType: { - org.MemberAddedEventType, - org.MemberChangedEventType, - org.MemberRemovedEventType, - org.MemberCascadeRemovedEventType, - org.OrgRemovedEventType, - }, - project.AggregateType: { - project.MemberAddedEventType, - project.MemberChangedEventType, - project.MemberRemovedEventType, - project.MemberCascadeRemovedEventType, - project.ProjectRemovedType, - }, - }, - ) -} - -func newFillPermissionFields(config handler.Config) *handler.FieldHandler { - return handler.NewFieldHandler( - &config, - permission.PermissionSearchField, - map[eventstore.AggregateType][]eventstore.EventType{ - permission.AggregateType: { - permission.AddedType, - permission.RemovedType, - }, - }, - ) -} diff --git a/internal/query/projection/eventstore_mock_test.go b/internal/query/projection/eventstore_mock_test.go index 4e280dce21..17961e996f 100644 --- a/internal/query/projection/eventstore_mock_test.go +++ b/internal/query/projection/eventstore_mock_test.go @@ -48,7 +48,3 @@ func (m *mockEventStore) Push(ctx context.Context, cmds ...eventstore.Command) ( m.pushCounter++ return m.pushResponse[m.pushCounter-1], nil } - -func (m *mockEventStore) FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) error { - return nil -} diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 77a28ac79a..99ec5ea4b5 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -86,12 +86,6 @@ var ( UserSchemaProjection *handler.Handler WebKeyProjection *handler.Handler DebugEventsProjection *handler.Handler - - ProjectGrantFields *handler.FieldHandler - OrgDomainVerifiedFields *handler.FieldHandler - InstanceDomainFields *handler.FieldHandler - MembershipFields *handler.FieldHandler - PermissionFields *handler.FieldHandler ) type projection interface { @@ -102,10 +96,7 @@ type projection interface { migration.Migration } -var ( - projections []projection - fields []*handler.FieldHandler -) +var projections []projection func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, systemUsers map[string]*internal_authz.SystemAPIUser) error { projectionConfig = handler.Config{ @@ -180,15 +171,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, WebKeyProjection = newWebKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["web_keys"])) DebugEventsProjection = newDebugEventsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["debug_events"])) - ProjectGrantFields = newFillProjectGrantFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsProjectGrant])) - OrgDomainVerifiedFields = newFillOrgDomainVerifiedFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsOrgDomainVerified])) - InstanceDomainFields = newFillInstanceDomainFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsInstanceDomain])) - MembershipFields = newFillMembershipFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsMemberships])) - PermissionFields = newFillPermissionFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsPermission])) - // Don't forget to add the new field handler to [ProjectInstanceFields] - newProjectionsList() - newFieldsList() return nil } @@ -231,26 +214,6 @@ func ProjectInstance(ctx context.Context) error { return nil } -func ProjectInstanceFields(ctx context.Context) error { - for i, fieldProjection := range fields { - logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection") - for { - err := fieldProjection.Trigger(ctx) - if err == nil { - break - } - var pgErr *pgconn.PgError - errors.As(err, &pgErr) - if pgErr.Code != database.PgUniqueConstraintErrorCode { - return err - } - logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying") - } - logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done") - } - return nil -} - func ApplyCustomConfig(customConfig CustomConfig) handler.Config { return applyCustomConfig(projectionConfig, customConfig) } @@ -275,20 +238,6 @@ func applyCustomConfig(config handler.Config, customConfig CustomConfig) handler return config } -// we know this is ugly, but we need to have a singleton slice of all projections -// and are only able to initialize it after all projections are created -// as setup and start currently create them individually, we make sure we get the right one -// will be refactored when changing to new id based projections -func newFieldsList() { - fields = []*handler.FieldHandler{ - ProjectGrantFields, - OrgDomainVerifiedFields, - InstanceDomainFields, - MembershipFields, - PermissionFields, - } -} - // we know this is ugly, but we need to have a singleton slice of all projections // and are only able to initialize it after all projections are created // as setup and start currently create them individually, we make sure we get the right one diff --git a/proto/zitadel/feature/v2/instance.proto b/proto/zitadel/feature/v2/instance.proto index fe8d3f7a39..3e265fc219 100644 --- a/proto/zitadel/feature/v2/instance.proto +++ b/proto/zitadel/feature/v2/instance.proto @@ -11,8 +11,9 @@ import "zitadel/feature/v2/feature.proto"; option go_package = "github.com/zitadel/zitadel/pkg/grpc/feature/v2;feature"; message SetInstanceFeaturesRequest{ - reserved 6; - reserved "actions"; + reserved 6, 7; + reserved "actions", "improved_performance"; + optional bool login_default_org = 1 [ (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { example: "true"; @@ -46,15 +47,6 @@ message SetInstanceFeaturesRequest{ } ]; - repeated ImprovedPerformance improved_performance = 7 [ - (validate.rules).repeated.unique = true, - (validate.rules).repeated.items.enum = {defined_only: true, not_in: [0]}, - (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { - example: "[1]"; - description: "Improves performance of specified execution paths."; - } - ]; - optional bool web_key = 8 [ (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { example: "true"; diff --git a/proto/zitadel/feature/v2/system.proto b/proto/zitadel/feature/v2/system.proto index d222e2a90c..77ee692a23 100644 --- a/proto/zitadel/feature/v2/system.proto +++ b/proto/zitadel/feature/v2/system.proto @@ -11,8 +11,8 @@ import "zitadel/feature/v2/feature.proto"; option go_package = "github.com/zitadel/zitadel/pkg/grpc/feature/v2;feature"; message SetSystemFeaturesRequest{ - reserved 6; - reserved "actions"; + reserved 6, 7; + reserved "actions", "improved_performance"; optional bool login_default_org = 1 [ (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { example: "true"; @@ -48,15 +48,6 @@ message SetSystemFeaturesRequest{ } ]; - repeated ImprovedPerformance improved_performance = 7 [ - (validate.rules).repeated.unique = true, - (validate.rules).repeated.items.enum = {defined_only: true, not_in: [0]}, - (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { - example: "[1]"; - description: "Improves performance of specified execution paths."; - } - ]; - optional bool oidc_single_v1_session_termination = 8 [ (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = { example: "true";