remove improved_performance from feature API

remove `eventstore.Searcher` interface

remove code which uses the `eventstore.Searcher` interface

remove implementation of `eventstore.Searcher` interface

cleanup code to use functions which were suffixed with `*Old`
This commit is contained in:
adlerhurst
2025-06-15 08:33:20 +02:00
parent 83839fc2ef
commit 7c9351e893
29 changed files with 50 additions and 1775 deletions

View File

@@ -123,7 +123,6 @@ func projections(
newEventstore := new_es.NewEventstore(client)
config.Eventstore.Querier = old_es.NewPostgres(client)
config.Eventstore.Pusher = newEventstore
config.Eventstore.Searcher = newEventstore
es := eventstore.NewEventstore(config.Eventstore)
esV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(client, &es_v4_pg.Config{
@@ -282,13 +281,6 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue
}
err = projection.ProjectInstanceFields(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
failedInstances <- instance
continue
}
err = admin_handler.ProjectInstance(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger admin handler failed")
@@ -296,13 +288,6 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
continue
}
err = projection.ProjectInstanceFields(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
failedInstances <- instance
continue
}
err = auth_handler.ProjectInstance(ctx)
if err != nil {
logging.WithFields("instance", instance).WithError(err).Info("trigger auth handler failed")

View File

@@ -1,40 +0,0 @@
package setup
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
)
type FillFieldsForProjectGrant struct {
eventstore *eventstore.Eventstore
}
func (mig *FillFieldsForProjectGrant) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder(),
)
if err != nil {
return err
}
for _, instance := range instances {
ctx := authz.WithInstanceID(ctx, instance)
if err := projection.ProjectGrantFields.Trigger(ctx); err != nil {
return err
}
}
return nil
}
func (mig *FillFieldsForProjectGrant) String() string {
return "29_init_fields_for_project_grant"
}

View File

@@ -1,40 +0,0 @@
package setup
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
)
type FillFieldsForOrgDomainVerified struct {
eventstore *eventstore.Eventstore
}
func (mig *FillFieldsForOrgDomainVerified) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder(),
)
if err != nil {
return err
}
for _, instance := range instances {
ctx := authz.WithInstanceID(ctx, instance)
if err := projection.OrgDomainVerifiedFields.Trigger(ctx); err != nil {
return err
}
}
return nil
}
func (mig *FillFieldsForOrgDomainVerified) String() string {
return "30_fill_fields_for_org_domain_verified"
}

View File

@@ -1,44 +0,0 @@
package setup
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
)
type FillFieldsForInstanceDomains struct {
eventstore *eventstore.Eventstore
}
func (mig *FillFieldsForInstanceDomains) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder(),
)
if err != nil {
return err
}
for _, instance := range instances {
ctx := authz.WithInstanceID(ctx, instance)
if err := projection.InstanceDomainFields.Trigger(ctx); err != nil {
return err
}
}
return nil
}
func (mig *FillFieldsForInstanceDomains) String() string {
return "repeatable_fill_fields_for_instance_domains"
}
func (f *FillFieldsForInstanceDomains) Check(lastRun map[string]interface{}) bool {
return true
}

View File

@@ -1,43 +0,0 @@
package setup
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/instance"
)
type FillMembershipFields struct {
eventstore *eventstore.Eventstore
}
func (mig *FillMembershipFields) Execute(ctx context.Context, _ eventstore.Event) error {
instances, err := mig.eventstore.InstanceIDs(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsInstanceIDs).
OrderDesc().
AddQuery().
AggregateTypes("instance").
EventTypes(instance.InstanceAddedEventType).
Builder().ExcludeAggregateIDs().
AggregateTypes("instance").
EventTypes(instance.InstanceRemovedEventType).
Builder(),
)
if err != nil {
return err
}
for _, instance := range instances {
ctx := authz.WithInstanceID(ctx, instance)
if err := projection.MembershipFields.Trigger(ctx); err != nil {
return err
}
}
return nil
}
func (mig *FillMembershipFields) String() string {
return "47_fill_membership_fields"
}

View File

@@ -127,8 +127,6 @@ type Steps struct {
s26AuthUsers3 *AuthUsers3
s27IDPTemplate6SAMLNameIDFormat *IDPTemplate6SAMLNameIDFormat
s28AddFieldTable *AddFieldTable
s29FillFieldsForProjectGrant *FillFieldsForProjectGrant
s30FillFieldsForOrgDomainVerified *FillFieldsForOrgDomainVerified
s31AddAggregateIndexToFields *AddAggregateIndexToFields
s32AddAuthSessionID *AddAuthSessionID
s33SMSConfigs3TwilioAddVerifyServiceSid *SMSConfigs3TwilioAddVerifyServiceSid
@@ -143,7 +141,6 @@ type Steps struct {
s44ReplaceCurrentSequencesIndex *ReplaceCurrentSequencesIndex
s45CorrectProjectOwners *CorrectProjectOwners
s46InitPermissionFunctions *InitPermissionFunctions
s47FillMembershipFields *FillMembershipFields
s48Apps7SAMLConfigsLoginVersion *Apps7SAMLConfigsLoginVersion
s49InitPermittedOrgsFunction *InitPermittedOrgsFunction
s50IDPTemplate6UsePKCE *IDPTemplate6UsePKCE

View File

@@ -142,7 +142,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
config.Eventstore.Querier = old_es.NewPostgres(dbClient)
esV3 := new_es.NewEventstore(dbClient)
config.Eventstore.Pusher = esV3
config.Eventstore.Searcher = esV3
eventstoreClient := eventstore.NewEventstore(config.Eventstore)
logging.OnError(err).Fatal("unable to start eventstore")
@@ -189,8 +188,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s26AuthUsers3 = &AuthUsers3{dbClient: dbClient}
steps.s27IDPTemplate6SAMLNameIDFormat = &IDPTemplate6SAMLNameIDFormat{dbClient: dbClient}
steps.s28AddFieldTable = &AddFieldTable{dbClient: dbClient}
steps.s29FillFieldsForProjectGrant = &FillFieldsForProjectGrant{eventstore: eventstoreClient}
steps.s30FillFieldsForOrgDomainVerified = &FillFieldsForOrgDomainVerified{eventstore: eventstoreClient}
steps.s31AddAggregateIndexToFields = &AddAggregateIndexToFields{dbClient: dbClient}
steps.s32AddAuthSessionID = &AddAuthSessionID{dbClient: dbClient}
steps.s33SMSConfigs3TwilioAddVerifyServiceSid = &SMSConfigs3TwilioAddVerifyServiceSid{dbClient: dbClient}
@@ -205,7 +202,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s44ReplaceCurrentSequencesIndex = &ReplaceCurrentSequencesIndex{dbClient: dbClient}
steps.s45CorrectProjectOwners = &CorrectProjectOwners{eventstore: eventstoreClient}
steps.s46InitPermissionFunctions = &InitPermissionFunctions{eventstoreClient: dbClient}
steps.s47FillMembershipFields = &FillMembershipFields{eventstore: eventstoreClient}
steps.s48Apps7SAMLConfigsLoginVersion = &Apps7SAMLConfigsLoginVersion{dbClient: dbClient}
steps.s49InitPermittedOrgsFunction = &InitPermittedOrgsFunction{eventstoreClient: dbClient}
steps.s50IDPTemplate6UsePKCE = &IDPTemplate6UsePKCE{dbClient: dbClient}
@@ -245,15 +241,12 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s23CorrectGlobalUniqueConstraints,
steps.s24AddActorToAuthTokens,
steps.s26AuthUsers3,
steps.s29FillFieldsForProjectGrant,
steps.s30FillFieldsForOrgDomainVerified,
steps.s34AddCacheSchema,
steps.s35AddPositionToIndexEsWm,
steps.s36FillV2Milestones,
steps.s38BackChannelLogoutNotificationStart,
steps.s44ReplaceCurrentSequencesIndex,
steps.s45CorrectProjectOwners,
steps.s47FillMembershipFields,
steps.s49InitPermittedOrgsFunction,
steps.s50IDPTemplate6UsePKCE,
steps.s51IDPTemplate6RootCA,
@@ -288,9 +281,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
&DeleteStaleOrgFields{
eventstore: eventstoreClient,
},
&FillFieldsForInstanceDomains{
eventstore: eventstoreClient,
},
&SyncRolePermissions{
commands: commands,
eventstore: eventstoreClient,

View File

@@ -166,7 +166,6 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
}
config.Eventstore.Pusher = new_es.NewEventstore(dbClient)
config.Eventstore.Searcher = new_es.NewEventstore(dbClient)
config.Eventstore.Querier = old_es.NewPostgres(dbClient)
eventstoreClient := eventstore.NewEventstore(config.Eventstore)
eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(dbClient, &es_v4_pg.Config{

View File

@@ -176,14 +176,6 @@ func improvedPerformanceTypeToPb(typ feature.ImprovedPerformanceType) feature_pb
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_UNSPECIFIED
case feature.ImprovedPerformanceTypeOrgByID:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID
case feature.ImprovedPerformanceTypeProjectGrant:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT
case feature.ImprovedPerformanceTypeProject:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT
case feature.ImprovedPerformanceTypeUserGrant:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT
case feature.ImprovedPerformanceTypeOrgDomainVerified:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED
default:
return feature_pb.ImprovedPerformance(typ)
}
@@ -208,14 +200,6 @@ func improvedPerformanceToDomain(typ feature_pb.ImprovedPerformance) feature.Imp
return feature.ImprovedPerformanceTypeUnspecified
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID:
return feature.ImprovedPerformanceTypeOrgByID
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT:
return feature.ImprovedPerformanceTypeProjectGrant
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT:
return feature.ImprovedPerformanceTypeProject
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT:
return feature.ImprovedPerformanceTypeUserGrant
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED:
return feature.ImprovedPerformanceTypeOrgDomainVerified
default:
return feature.ImprovedPerformanceTypeUnspecified
}

View File

@@ -113,14 +113,6 @@ func improvedPerformanceTypeToPb(typ feature.ImprovedPerformanceType) feature_pb
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_UNSPECIFIED
case feature.ImprovedPerformanceTypeOrgByID:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID
case feature.ImprovedPerformanceTypeProjectGrant:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT
case feature.ImprovedPerformanceTypeProject:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT
case feature.ImprovedPerformanceTypeUserGrant:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT
case feature.ImprovedPerformanceTypeOrgDomainVerified:
return feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED
default:
return feature_pb.ImprovedPerformance(typ)
}
@@ -145,14 +137,6 @@ func improvedPerformanceToDomain(typ feature_pb.ImprovedPerformance) feature.Imp
return feature.ImprovedPerformanceTypeUnspecified
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_BY_ID:
return feature.ImprovedPerformanceTypeOrgByID
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT_GRANT:
return feature.ImprovedPerformanceTypeProjectGrant
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_PROJECT:
return feature.ImprovedPerformanceTypeProject
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_USER_GRANT:
return feature.ImprovedPerformanceTypeUserGrant
case feature_pb.ImprovedPerformance_IMPROVED_PERFORMANCE_ORG_DOMAIN_VERIFIED:
return feature.ImprovedPerformanceTypeOrgDomainVerified
default:
return feature.ImprovedPerformanceTypeUnspecified
}

View File

@@ -7,14 +7,11 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
http_utils "github.com/zitadel/zitadel/internal/api/http"
"github.com/zitadel/zitadel/internal/command/preparation"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/feature"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/org"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -353,38 +350,6 @@ func (c *Commands) changeDefaultDomain(ctx context.Context, orgID, newName strin
return nil, nil
}
func (c *Commands) removeCustomDomains(ctx context.Context, orgID string) ([]eventstore.Command, error) {
orgDomains := NewOrgDomainsWriteModel(orgID)
err := c.eventstore.FilterToQueryReducer(ctx, orgDomains)
if err != nil {
return nil, err
}
hasDefault := false
defaultDomain, _ := domain.NewIAMDomainName(orgDomains.OrgName, http_utils.DomainContext(ctx).RequestedDomain())
isPrimary := defaultDomain == orgDomains.PrimaryDomain
orgAgg := OrgAggregateFromWriteModel(&orgDomains.WriteModel)
events := make([]eventstore.Command, 0, len(orgDomains.Domains))
for _, orgDomain := range orgDomains.Domains {
if orgDomain.State == domain.OrgDomainStateActive {
if orgDomain.Domain == defaultDomain {
hasDefault = true
continue
}
events = append(events, org.NewDomainRemovedEvent(ctx, orgAgg, orgDomain.Domain, orgDomain.Verified))
}
}
if !hasDefault {
return append([]eventstore.Command{
org.NewDomainAddedEvent(ctx, orgAgg, defaultDomain),
org.NewDomainPrimarySetEvent(ctx, orgAgg, defaultDomain),
}, events...), nil
}
if !isPrimary {
return append([]eventstore.Command{org.NewDomainPrimarySetEvent(ctx, orgAgg, defaultDomain)}, events...), nil
}
return events, nil
}
func (c *Commands) getOrgDomainWriteModel(ctx context.Context, orgID, domain string) (_ *OrgDomainWriteModel, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
@@ -404,45 +369,6 @@ type OrgDomainVerified struct {
}
func (c *Commands) searchOrgDomainVerifiedByDomain(ctx context.Context, domain string) (_ *OrgDomainVerified, err error) {
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeOrgDomainVerified) {
return c.searchOrgDomainVerifiedByDomainOld(ctx, domain)
}
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
condition := map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: org.AggregateType,
eventstore.FieldTypeObjectType: org.OrgDomainSearchType,
eventstore.FieldTypeObjectID: domain,
eventstore.FieldTypeObjectRevision: org.OrgDomainObjectRevision,
eventstore.FieldTypeFieldName: org.OrgDomainVerifiedSearchField,
}
results, err := c.eventstore.Search(ctx, condition)
if err != nil {
return nil, err
}
if len(results) == 0 {
_ = projection.OrgDomainVerifiedFields.Trigger(ctx)
results, err = c.eventstore.Search(ctx, condition)
if err != nil {
return nil, err
}
}
orgDomain := new(OrgDomainVerified)
for _, result := range results {
orgDomain.OrgID = result.Aggregate.ID
if err = result.Value.Unmarshal(&orgDomain.Verified); err != nil {
return nil, err
}
}
return orgDomain, nil
}
func (c *Commands) searchOrgDomainVerifiedByDomainOld(ctx context.Context, domain string) (_ *OrgDomainVerified, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()

View File

@@ -7,13 +7,10 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command/preparation"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/feature"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -138,53 +135,18 @@ func projectWriteModel(ctx context.Context, filter preparation.FilterToQueryRedu
return project, nil
}
func (c *Commands) projectAggregateByID(ctx context.Context, projectID, resourceOwner string) (*eventstore.Aggregate, domain.ProjectState, error) {
result, err := c.projectState(ctx, projectID, resourceOwner)
if err != nil {
return nil, domain.ProjectStateUnspecified, zerrors.ThrowNotFound(err, "COMMA-NDQoF", "Errors.Project.NotFound")
}
if len(result) == 0 {
_ = projection.ProjectGrantFields.Trigger(ctx)
result, err = c.projectState(ctx, projectID, resourceOwner)
if err != nil || len(result) == 0 {
return nil, domain.ProjectStateUnspecified, zerrors.ThrowNotFound(err, "COMMA-U1nza", "Errors.Project.NotFound")
}
}
var state domain.ProjectState
err = result[0].Value.Unmarshal(&state)
if err != nil {
return nil, state, zerrors.ThrowNotFound(err, "COMMA-o4n6F", "Errors.Project.NotFound")
}
return &result[0].Aggregate, state, nil
}
func (c *Commands) projectState(ctx context.Context, projectID, resourceOwner string) ([]*eventstore.SearchResult, error) {
return c.eventstore.Search(
ctx,
map[eventstore.FieldType]any{
eventstore.FieldTypeObjectType: project.ProjectSearchType,
eventstore.FieldTypeObjectID: projectID,
eventstore.FieldTypeObjectRevision: project.ProjectObjectRevision,
eventstore.FieldTypeFieldName: project.ProjectStateSearchField,
eventstore.FieldTypeResourceOwner: resourceOwner,
},
)
}
func (c *Commands) checkProjectExists(ctx context.Context, projectID, resourceOwner string) (_ string, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) {
return c.checkProjectExistsOld(ctx, projectID, resourceOwner)
projectWriteModel, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return "", err
}
agg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner)
if err != nil || !state.Valid() {
return "", zerrors.ThrowPreconditionFailed(err, "COMMA-VCnwD", "Errors.Project.NotFound")
if !isProjectStateExists(projectWriteModel.State) {
return "", zerrors.ThrowPreconditionFailed(nil, "COMMAND-EbFMN", "Errors.Project.NotFound")
}
return agg.ResourceOwner, nil
return projectWriteModel.ResourceOwner, nil
}
type ChangeProject struct {
@@ -246,35 +208,31 @@ func (c *Commands) DeactivateProject(ctx context.Context, projectID string, reso
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-88iF0", "Errors.Project.ProjectIDMissing")
}
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) {
return c.deactivateProjectOld(ctx, projectID, resourceOwner)
}
projectAgg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner)
existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return nil, err
}
if !isProjectStateExists(state) {
if !isProjectStateExists(existingProject.State) {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-112M9", "Errors.Project.NotFound")
}
if state != domain.ProjectStateActive {
if existingProject.State != domain.ProjectStateActive {
return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-mki55", "Errors.Project.NotActive")
}
if err := c.checkPermissionUpdateProject(ctx, projectAgg.ResourceOwner, projectAgg.ID); err != nil {
if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil {
return nil, err
}
//nolint: contextcheck
projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel)
pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectDeactivatedEvent(ctx, projectAgg))
if err != nil {
return nil, err
}
return &domain.ObjectDetails{
ResourceOwner: pushedEvents[0].Aggregate().ResourceOwner,
Sequence: pushedEvents[0].Sequence(),
EventDate: pushedEvents[0].CreatedAt(),
}, nil
err = AppendAndReduce(existingProject, pushedEvents...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&existingProject.WriteModel), nil
}
func (c *Commands) ReactivateProject(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) {
@@ -282,35 +240,31 @@ func (c *Commands) ReactivateProject(ctx context.Context, projectID string, reso
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-3ihsF", "Errors.Project.ProjectIDMissing")
}
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProject) {
return c.reactivateProjectOld(ctx, projectID, resourceOwner)
}
projectAgg, state, err := c.projectAggregateByID(ctx, projectID, resourceOwner)
existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return nil, err
}
if !isProjectStateExists(state) {
if !isProjectStateExists(existingProject.State) {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-3M9sd", "Errors.Project.NotFound")
}
if state != domain.ProjectStateInactive {
if existingProject.State != domain.ProjectStateInactive {
return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-5M9bs", "Errors.Project.NotInactive")
}
if err := c.checkPermissionUpdateProject(ctx, projectAgg.ResourceOwner, projectAgg.ID); err != nil {
if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil {
return nil, err
}
//nolint: contextcheck
projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel)
pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectReactivatedEvent(ctx, projectAgg))
if err != nil {
return nil, err
}
return &domain.ObjectDetails{
ResourceOwner: pushedEvents[0].Aggregate().ResourceOwner,
Sequence: pushedEvents[0].Sequence(),
EventDate: pushedEvents[0].CreatedAt(),
}, nil
err = AppendAndReduce(existingProject, pushedEvents...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&existingProject.WriteModel), nil
}
// Deprecated: use commands.DeleteProject

View File

@@ -6,12 +6,9 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
es_models "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/feature"
"github.com/zitadel/zitadel/internal/repository/org"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -379,94 +376,19 @@ func (c *Commands) projectGrantWriteModelByID(ctx context.Context, grantID, gran
}
func (c *Commands) checkProjectGrantPreCondition(ctx context.Context, projectID, grantedOrgID, resourceOwner string, roles []string) (string, error) {
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeProjectGrant) {
return c.checkProjectGrantPreConditionOld(ctx, projectID, grantedOrgID, resourceOwner, roles)
}
projectResourceOwner, existingRoleKeys, err := c.searchProjectGrantState(ctx, projectID, grantedOrgID, resourceOwner)
preConditions := NewProjectGrantPreConditionReadModel(projectID, grantedOrgID, resourceOwner)
err := c.eventstore.FilterToQueryReducer(ctx, preConditions)
if err != nil {
return "", err
}
if domain.HasInvalidRoles(existingRoleKeys, roles) {
if !preConditions.ProjectExists {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound")
}
if !preConditions.GrantedOrgExists {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound")
}
if domain.HasInvalidRoles(preConditions.ExistingRoleKeys, roles) {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-6m9gd", "Errors.Project.Role.NotFound")
}
return projectResourceOwner, nil
}
func (c *Commands) searchProjectGrantState(ctx context.Context, projectID, grantedOrgID, resourceOwner string) (_ string, existingRoleKeys []string, err error) {
projectStateQuery := map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: project.AggregateType,
eventstore.FieldTypeAggregateID: projectID,
eventstore.FieldTypeFieldName: project.ProjectStateSearchField,
eventstore.FieldTypeObjectType: project.ProjectSearchType,
}
grantedOrgQuery := map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: org.AggregateType,
eventstore.FieldTypeAggregateID: grantedOrgID,
eventstore.FieldTypeFieldName: org.OrgStateSearchField,
eventstore.FieldTypeObjectType: org.OrgSearchType,
}
roleQuery := map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: project.AggregateType,
eventstore.FieldTypeAggregateID: projectID,
eventstore.FieldTypeFieldName: project.ProjectRoleKeySearchField,
eventstore.FieldTypeObjectType: project.ProjectRoleSearchType,
}
// as resourceowner is not always provided, it has to be separately
if resourceOwner != "" {
projectStateQuery[eventstore.FieldTypeResourceOwner] = resourceOwner
roleQuery[eventstore.FieldTypeResourceOwner] = resourceOwner
}
results, err := c.eventstore.Search(
ctx,
projectStateQuery,
grantedOrgQuery,
roleQuery,
)
if err != nil {
return "", nil, err
}
var (
existsProject bool
existingProjectResourceOwner string
existsGrantedOrg bool
)
for _, result := range results {
switch result.Object.Type {
case project.ProjectRoleSearchType:
var role string
err := result.Value.Unmarshal(&role)
if err != nil {
return "", nil, err
}
existingRoleKeys = append(existingRoleKeys, role)
case org.OrgSearchType:
var state domain.OrgState
err := result.Value.Unmarshal(&state)
if err != nil {
return "", nil, err
}
existsGrantedOrg = state.Valid() && state != domain.OrgStateRemoved
case project.ProjectSearchType:
var state domain.ProjectState
err := result.Value.Unmarshal(&state)
if err != nil {
return "", nil, err
}
existsProject = state.Valid() && state != domain.ProjectStateRemoved
existingProjectResourceOwner = result.Aggregate.ResourceOwner
}
}
if !existsProject {
return "", nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound")
}
if !existsGrantedOrg {
return "", nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound")
}
return existingProjectResourceOwner, existingRoleKeys, nil
return preConditions.ProjectResourceOwner, nil
}

View File

@@ -1,98 +0,0 @@
package command
import (
"context"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
func (c *Commands) checkProjectExistsOld(ctx context.Context, projectID, resourceOwner string) (_ string, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
projectWriteModel, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return "", err
}
if !isProjectStateExists(projectWriteModel.State) {
return "", zerrors.ThrowPreconditionFailed(nil, "COMMAND-EbFMN", "Errors.Project.NotFound")
}
return projectWriteModel.ResourceOwner, nil
}
func (c *Commands) deactivateProjectOld(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) {
existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return nil, err
}
if !isProjectStateExists(existingProject.State) {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-112M9", "Errors.Project.NotFound")
}
if existingProject.State != domain.ProjectStateActive {
return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-mki55", "Errors.Project.NotActive")
}
if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil {
return nil, err
}
//nolint: contextcheck
projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel)
pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectDeactivatedEvent(ctx, projectAgg))
if err != nil {
return nil, err
}
err = AppendAndReduce(existingProject, pushedEvents...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&existingProject.WriteModel), nil
}
func (c *Commands) reactivateProjectOld(ctx context.Context, projectID string, resourceOwner string) (*domain.ObjectDetails, error) {
existingProject, err := c.getProjectWriteModelByID(ctx, projectID, resourceOwner)
if err != nil {
return nil, err
}
if !isProjectStateExists(existingProject.State) {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-3M9sd", "Errors.Project.NotFound")
}
if existingProject.State != domain.ProjectStateInactive {
return nil, zerrors.ThrowPreconditionFailed(nil, "COMMAND-5M9bs", "Errors.Project.NotInactive")
}
if err := c.checkPermissionUpdateProject(ctx, existingProject.ResourceOwner, existingProject.AggregateID); err != nil {
return nil, err
}
//nolint: contextcheck
projectAgg := ProjectAggregateFromWriteModel(&existingProject.WriteModel)
pushedEvents, err := c.eventstore.Push(ctx, project.NewProjectReactivatedEvent(ctx, projectAgg))
if err != nil {
return nil, err
}
err = AppendAndReduce(existingProject, pushedEvents...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&existingProject.WriteModel), nil
}
func (c *Commands) checkProjectGrantPreConditionOld(ctx context.Context, projectID, grantedOrgID, resourceOwner string, roles []string) (string, error) {
preConditions := NewProjectGrantPreConditionReadModel(projectID, grantedOrgID, resourceOwner)
err := c.eventstore.FilterToQueryReducer(ctx, preConditions)
if err != nil {
return "", err
}
if !preConditions.ProjectExists {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound")
}
if !preConditions.GrantedOrgExists {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound")
}
if domain.HasInvalidRoles(preConditions.ExistingRoleKeys, roles) {
return "", zerrors.ThrowPreconditionFailed(err, "COMMAND-6m9gd", "Errors.Project.Role.NotFound")
}
return preConditions.ProjectResourceOwner, nil
}

View File

@@ -4,12 +4,8 @@ import (
"context"
"reflect"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/feature"
"github.com/zitadel/zitadel/internal/repository/org"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/repository/usergrant"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -292,140 +288,6 @@ func (c *Commands) userGrantWriteModelByID(ctx context.Context, userGrantID, res
}
func (c *Commands) checkUserGrantPreCondition(ctx context.Context, usergrant *domain.UserGrant, resourceOwner string) (err error) {
if !authz.GetFeatures(ctx).ShouldUseImprovedPerformance(feature.ImprovedPerformanceTypeUserGrant) {
return c.checkUserGrantPreConditionOld(ctx, usergrant, resourceOwner)
}
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
if err := c.checkUserExists(ctx, usergrant.UserID, ""); err != nil {
return err
}
existingRoleKeys, err := c.searchUserGrantPreConditionState(ctx, usergrant, resourceOwner)
if err != nil {
return err
}
if usergrant.HasInvalidRoles(existingRoleKeys) {
return zerrors.ThrowPreconditionFailed(err, "COMMAND-mm9F4", "Errors.Project.Role.NotFound")
}
return nil
}
// this code needs to be rewritten anyways as soon as we improved the fields handling
//
//nolint:gocognit
func (c *Commands) searchUserGrantPreConditionState(ctx context.Context, userGrant *domain.UserGrant, resourceOwner string) (existingRoleKeys []string, err error) {
criteria := []map[eventstore.FieldType]any{
// project state query
{
eventstore.FieldTypeAggregateType: project.AggregateType,
eventstore.FieldTypeAggregateID: userGrant.ProjectID,
eventstore.FieldTypeFieldName: project.ProjectStateSearchField,
eventstore.FieldTypeObjectType: project.ProjectSearchType,
},
// granted org query
{
eventstore.FieldTypeAggregateType: org.AggregateType,
eventstore.FieldTypeAggregateID: resourceOwner,
eventstore.FieldTypeFieldName: org.OrgStateSearchField,
eventstore.FieldTypeObjectType: org.OrgSearchType,
},
}
if userGrant.ProjectGrantID != "" {
criteria = append(criteria, map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: project.AggregateType,
eventstore.FieldTypeAggregateID: userGrant.ProjectID,
eventstore.FieldTypeObjectType: project.ProjectGrantSearchType,
eventstore.FieldTypeObjectID: userGrant.ProjectGrantID,
})
} else {
criteria = append(criteria, map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateType: project.AggregateType,
eventstore.FieldTypeAggregateID: userGrant.ProjectID,
eventstore.FieldTypeObjectType: project.ProjectRoleSearchType,
eventstore.FieldTypeFieldName: project.ProjectRoleKeySearchField,
})
}
results, err := c.eventstore.Search(ctx, criteria...)
if err != nil {
return nil, err
}
var (
existsProject bool
existsGrantedOrg bool
existsGrant bool
)
for _, result := range results {
switch result.Object.Type {
case project.ProjectRoleSearchType:
var role string
err := result.Value.Unmarshal(&role)
if err != nil {
return nil, err
}
existingRoleKeys = append(existingRoleKeys, role)
case org.OrgSearchType:
var state domain.OrgState
err := result.Value.Unmarshal(&state)
if err != nil {
return nil, err
}
existsGrantedOrg = state.Valid() && state != domain.OrgStateRemoved
case project.ProjectSearchType:
var state domain.ProjectState
err := result.Value.Unmarshal(&state)
if err != nil {
return nil, err
}
existsProject = state.Valid() && state != domain.ProjectStateRemoved
case project.ProjectGrantSearchType:
switch result.FieldName {
case project.ProjectGrantGrantedOrgIDSearchField:
var orgID string
err := result.Value.Unmarshal(&orgID)
if err != nil || orgID != resourceOwner {
return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound")
}
case project.ProjectGrantStateSearchField:
var state domain.ProjectGrantState
err := result.Value.Unmarshal(&state)
if err != nil {
return nil, err
}
existsGrant = state.Valid() && state != domain.ProjectGrantStateRemoved
case project.ProjectGrantRoleKeySearchField:
var role string
err := result.Value.Unmarshal(&role)
if err != nil {
return nil, err
}
existingRoleKeys = append(existingRoleKeys, role)
case project.ProjectGrantGrantIDSearchField:
var grantID string
err := result.Value.Unmarshal(&grantID)
if err != nil || grantID != userGrant.ProjectGrantID {
return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-huvKF", "Errors.Project.Grant.NotFound")
}
}
}
}
if !existsProject {
return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-m9gsd", "Errors.Project.NotFound")
}
if !existsGrantedOrg {
return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-3m9gg", "Errors.Org.NotFound")
}
if userGrant.ProjectGrantID != "" && !existsGrant {
return nil, zerrors.ThrowPreconditionFailed(err, "COMMAND-huvKF", "Errors.Project.Grant.NotFound")
}
return existingRoleKeys, nil
}
func (c *Commands) checkUserGrantPreConditionOld(ctx context.Context, usergrant *domain.UserGrant, resourceOwner string) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()

View File

@@ -10,5 +10,4 @@ type Config struct {
Pusher Pusher
Querier Querier
Searcher Searcher
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/zerrors"
)
func init() {
@@ -29,7 +28,6 @@ type Eventstore struct {
pusher Pusher
querier Querier
searcher Searcher
}
var (
@@ -69,7 +67,6 @@ func NewEventstore(config *Config) *Eventstore {
pusher: config.Pusher,
querier: config.Querier,
searcher: config.Searcher,
}
}
@@ -149,20 +146,6 @@ func (es *Eventstore) AggregateTypes() []string {
return aggregateTypes
}
// FillFields implements the [Searcher] interface
func (es *Eventstore) FillFields(ctx context.Context, events ...FillFieldsEvent) error {
return es.searcher.FillFields(ctx, events...)
}
// Search implements the [Searcher] interface
func (es *Eventstore) Search(ctx context.Context, conditions ...map[FieldType]any) ([]*SearchResult, error) {
if len(conditions) == 0 {
return nil, zerrors.ThrowInvalidArgument(nil, "V3-5Xbr1", "no search conditions")
}
return es.searcher.Search(ctx, conditions...)
}
// Filter filters the stored events based on the searchQuery
// and maps the events to the defined event structs
//
@@ -289,22 +272,6 @@ type Pusher interface {
Client() *database.DB
}
type FillFieldsEvent interface {
Event
Fields() []*FieldOperation
}
type Searcher interface {
// Search allows to search for specific fields of objects
// The instance id is taken from the context
// The list of conditions are combined with AND
// The search fields are combined with OR
// At least one must be defined
Search(ctx context.Context, conditions ...map[FieldType]any) (result []*SearchResult, err error)
// FillFields is to insert the fields of previously stored events
FillFields(ctx context.Context, events ...FillFieldsEvent) error
}
func appendEventType(typ EventType) {
i := sort.SearchStrings(eventTypes, string(typ))
if i < len(eventTypes) && eventTypes[i] == string(typ) {

View File

@@ -1,213 +0,0 @@
package handler
import (
"context"
"database/sql"
"errors"
"sync"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
"github.com/zitadel/zitadel/internal/eventstore"
)
type FieldHandler struct {
Handler
}
type fieldProjection struct {
name string
}
// Name implements Projection.
func (f *fieldProjection) Name() string {
return f.name
}
// Reducers implements Projection.
func (f *fieldProjection) Reducers() []AggregateReducer {
return nil
}
var _ Projection = (*fieldProjection)(nil)
// NewFieldHandler returns a projection handler which backfills the `eventstore.fields` table with historic events which
// might have existed before they had and Field Operations defined.
// The events are filtered by the mapped aggregate types and each event type for that aggregate.
func NewFieldHandler(config *Config, name string, eventTypes map[eventstore.AggregateType][]eventstore.EventType) *FieldHandler {
return &FieldHandler{
Handler: Handler{
projection: &fieldProjection{name: name},
client: config.Client,
es: config.Eventstore,
bulkLimit: config.BulkLimit,
eventTypes: eventTypes,
requeueEvery: config.RequeueEvery,
now: time.Now,
maxFailureCount: config.MaxFailureCount,
retryFailedAfter: config.RetryFailedAfter,
triggeredInstancesSync: sync.Map{},
triggerWithoutEvents: config.TriggerWithoutEvents,
txDuration: config.TransactionDuration,
},
}
}
// Trigger executes the backfill job of events for the instance currently in the context.
func (h *FieldHandler) Trigger(ctx context.Context, opts ...TriggerOpt) (err error) {
config := new(triggerConfig)
for _, opt := range opts {
opt(config)
}
cancel := h.lockInstance(ctx, config)
if cancel == nil {
return nil
}
defer cancel()
for i := 0; ; i++ {
additionalIteration, err := h.processEvents(ctx, config)
h.log().OnError(err).Info("process events failed")
h.log().WithField("iteration", i).Debug("trigger iteration")
if !additionalIteration || err != nil {
return err
}
}
}
func (h *FieldHandler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) {
defer func() {
pgErr := new(pgconn.PgError)
if errors.As(err, &pgErr) {
// error returned if the row is currently locked by another connection
if pgErr.Code == "55P03" {
h.log().Debug("state already locked")
err = nil
additionalIteration = false
}
}
}()
txCtx := ctx
if h.txDuration > 0 {
var cancel, cancelTx func()
// add 100ms to store current state if iteration takes too long
txCtx, cancelTx = context.WithTimeout(ctx, h.txDuration+100*time.Millisecond)
defer cancelTx()
ctx, cancel = context.WithTimeout(ctx, h.txDuration)
defer cancel()
}
tx, err := h.client.BeginTx(txCtx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return false, err
}
defer func() {
if err != nil && !errors.Is(err, &executionError{}) {
rollbackErr := tx.Rollback()
h.log().OnError(rollbackErr).Debug("unable to rollback tx")
return
}
commitErr := tx.Commit()
if err == nil {
err = commitErr
}
}()
// always await currently running transactions
config.awaitRunning = true
currentState, err := h.currentState(ctx, tx, config)
if err != nil {
if errors.Is(err, errJustUpdated) {
return false, nil
}
return additionalIteration, err
}
// stop execution if currentState.eventTimestamp >= config.maxCreatedAt
if !config.maxPosition.IsZero() && currentState.position.GreaterThanOrEqual(config.maxPosition) {
return false, nil
}
if config.minPosition.GreaterThan(decimal.NewFromInt(0)) {
currentState.position = config.minPosition
currentState.offset = 0
}
events, additionalIteration, err := h.fetchEvents(ctx, tx, currentState)
if err != nil {
return additionalIteration, err
}
if len(events) == 0 {
err = h.setState(tx, currentState)
return additionalIteration, err
}
err = h.es.FillFields(ctx, events...)
if err != nil {
return false, err
}
err = h.setState(tx, currentState)
return additionalIteration, err
}
func (h *FieldHandler) fetchEvents(ctx context.Context, tx *sql.Tx, currentState *state) (_ []eventstore.FillFieldsEvent, additionalIteration bool, err error) {
events, err := h.es.Filter(ctx, h.eventQuery(currentState).SetTx(tx))
if err != nil || len(events) == 0 {
h.log().OnError(err).Debug("filter eventstore failed")
return nil, false, err
}
eventAmount := len(events)
idx, offset := skipPreviouslyReducedEvents(events, currentState)
if currentState.position.Equal(events[len(events)-1].Position()) {
offset += currentState.offset
}
currentState.position = events[len(events)-1].Position()
currentState.offset = offset
currentState.aggregateID = events[len(events)-1].Aggregate().ID
currentState.aggregateType = events[len(events)-1].Aggregate().Type
currentState.sequence = events[len(events)-1].Sequence()
currentState.eventTimestamp = events[len(events)-1].CreatedAt()
if idx+1 == len(events) {
return nil, false, nil
}
events = events[idx+1:]
additionalIteration = eventAmount == int(h.bulkLimit)
fillFieldsEvents := make([]eventstore.FillFieldsEvent, len(events))
highestPosition := events[len(events)-1].Position()
for i, event := range events {
if event.Position().Equal(highestPosition) {
offset++
}
fillFieldsEvents[i] = event.(eventstore.FillFieldsEvent)
}
return fillFieldsEvents, additionalIteration, nil
}
func skipPreviouslyReducedEvents(events []eventstore.Event, currentState *state) (index int, offset uint32) {
var position decimal.Decimal
for i, event := range events {
if !event.Position().Equal(position) {
offset = 0
position = event.Position()
}
offset++
if event.Position().Equal(currentState.position) &&
event.Aggregate().ID == currentState.aggregateID &&
event.Aggregate().Type == currentState.aggregateType &&
event.Sequence() == currentState.sequence {
return i, offset
}
}
return -1, 0
}

View File

@@ -27,7 +27,6 @@ type EventStore interface {
FilterToQueryReducer(ctx context.Context, reducer eventstore.QueryReducer) error
Filter(ctx context.Context, queryFactory *eventstore.SearchQueryBuilder) ([]eventstore.Event, error)
Push(ctx context.Context, cmds ...eventstore.Command) ([]eventstore.Event, error)
FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) error
}
type Config struct {

View File

@@ -1,369 +0,0 @@
package eventstore
import (
"context"
"database/sql"
_ "embed"
"encoding/json"
"reflect"
"slices"
"strconv"
"strings"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
type fieldValue struct {
value []byte
}
func (value *fieldValue) Unmarshal(ptr any) error {
return json.Unmarshal(value.value, ptr)
}
func (es *Eventstore) FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer span.End()
tx, err := es.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
return
}
err = tx.Commit()
}()
return handleFieldFillEvents(ctx, tx, events)
}
// Search implements the [eventstore.Search] method
func (es *Eventstore) Search(ctx context.Context, conditions ...map[eventstore.FieldType]any) (result []*eventstore.SearchResult, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var builder strings.Builder
args := buildSearchStatement(ctx, &builder, conditions...)
err = es.client.QueryContext(
ctx,
func(rows *sql.Rows) error {
for rows.Next() {
var (
res eventstore.SearchResult
value fieldValue
)
err = rows.Scan(
&res.Aggregate.InstanceID,
&res.Aggregate.ResourceOwner,
&res.Aggregate.Type,
&res.Aggregate.ID,
&res.Object.Type,
&res.Object.ID,
&res.Object.Revision,
&res.FieldName,
&value.value,
)
if err != nil {
return err
}
res.Value = &value
result = append(result, &res)
}
return nil
},
builder.String(),
args...,
)
if err != nil {
return nil, err
}
return result, nil
}
const searchQueryPrefix = `SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1`
func buildSearchStatement(ctx context.Context, builder *strings.Builder, conditions ...map[eventstore.FieldType]any) []any {
args := make([]any, 0, len(conditions)*4+1)
args = append(args, authz.GetInstance(ctx).InstanceID())
builder.WriteString(searchQueryPrefix)
builder.WriteString(" AND ")
if len(conditions) > 1 {
builder.WriteRune('(')
}
for i, condition := range conditions {
if i > 0 {
builder.WriteString(" OR ")
}
if len(condition) > 1 {
builder.WriteRune('(')
}
args = append(args, buildSearchCondition(builder, len(args)+1, condition)...)
if len(condition) > 1 {
builder.WriteRune(')')
}
}
if len(conditions) > 1 {
builder.WriteRune(')')
}
return args
}
func buildSearchCondition(builder *strings.Builder, index int, conditions map[eventstore.FieldType]any) []any {
args := make([]any, 0, len(conditions))
orderedCondition := make([]eventstore.FieldType, 0, len(conditions))
for field := range conditions {
orderedCondition = append(orderedCondition, field)
}
slices.Sort(orderedCondition)
for _, field := range orderedCondition {
if len(args) > 0 {
builder.WriteString(" AND ")
}
builder.WriteString(fieldNameByType(field, conditions[field]))
builder.WriteString(" = $")
builder.WriteString(strconv.Itoa(index + len(args)))
args = append(args, conditions[field])
}
return args
}
func (es *Eventstore) handleFieldCommands(ctx context.Context, tx database.Tx, commands []eventstore.Command) error {
for _, command := range commands {
if len(command.Fields()) > 0 {
if err := handleFieldOperations(ctx, tx, command.Fields()); err != nil {
return err
}
}
}
return nil
}
func handleFieldFillEvents(ctx context.Context, tx database.Tx, events []eventstore.FillFieldsEvent) error {
for _, event := range events {
if len(event.Fields()) == 0 {
continue
}
if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil {
return err
}
}
return nil
}
func handleFieldOperations(ctx context.Context, tx database.Tx, operations []*eventstore.FieldOperation) error {
for _, operation := range operations {
if operation.Set != nil {
if err := handleFieldSet(ctx, tx, operation.Set); err != nil {
return err
}
continue
}
if operation.Remove != nil {
if err := handleSearchDelete(ctx, tx, operation.Remove); err != nil {
return err
}
}
}
return nil
}
func handleFieldSet(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
if len(field.UpsertConflictFields) == 0 {
return handleSearchInsert(ctx, tx, field)
}
return handleSearchUpsert(ctx, tx, field)
}
const (
insertField = `INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`
)
func handleSearchInsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
value, err := json.Marshal(field.Value.Value)
if err != nil {
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
}
_, err = tx.ExecContext(
ctx,
insertField,
field.Aggregate.InstanceID,
field.Aggregate.ResourceOwner,
field.Aggregate.Type,
field.Aggregate.ID,
field.Object.Type,
field.Object.ID,
field.Object.Revision,
field.FieldName,
value,
field.Value.MustBeUnique,
field.Value.ShouldIndex,
)
return err
}
const (
fieldsUpsertPrefix = `WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE `
fieldsUpsertSuffix = ` RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)`
)
func handleSearchUpsert(ctx context.Context, tx database.Tx, field *eventstore.Field) error {
value, err := json.Marshal(field.Value.Value)
if err != nil {
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
}
_, err = tx.ExecContext(
ctx,
writeUpsertField(field.UpsertConflictFields),
field.Aggregate.InstanceID,
field.Aggregate.ResourceOwner,
field.Aggregate.Type,
field.Aggregate.ID,
field.Object.Type,
field.Object.ID,
field.Object.Revision,
field.FieldName,
value,
field.Value.MustBeUnique,
field.Value.ShouldIndex,
)
return err
}
func writeUpsertField(fields []eventstore.FieldType) string {
var builder strings.Builder
builder.WriteString(fieldsUpsertPrefix)
for i, fieldName := range fields {
if i > 0 {
builder.WriteString(" AND ")
}
name, index := searchFieldNameAndIndexByTypeForPush(fieldName)
builder.WriteString(name)
builder.WriteString(" = ")
builder.WriteString(index)
}
builder.WriteString(fieldsUpsertSuffix)
return builder.String()
}
const removeSearch = `DELETE FROM eventstore.fields WHERE `
func handleSearchDelete(ctx context.Context, tx database.Tx, clauses map[eventstore.FieldType]any) error {
if len(clauses) == 0 {
return zerrors.ThrowInvalidArgument(nil, "V3-oqlBZ", "no conditions")
}
stmt, args := writeDeleteField(clauses)
_, err := tx.ExecContext(ctx, stmt, args...)
return err
}
func writeDeleteField(clauses map[eventstore.FieldType]any) (string, []any) {
var (
builder strings.Builder
args = make([]any, 0, len(clauses))
)
builder.WriteString(removeSearch)
orderedCondition := make([]eventstore.FieldType, 0, len(clauses))
for field := range clauses {
orderedCondition = append(orderedCondition, field)
}
slices.Sort(orderedCondition)
for _, fieldName := range orderedCondition {
if len(args) > 0 {
builder.WriteString(" AND ")
}
builder.WriteString(fieldNameByType(fieldName, clauses[fieldName]))
builder.WriteString(" = $")
builder.WriteString(strconv.Itoa(len(args) + 1))
args = append(args, clauses[fieldName])
}
return builder.String(), args
}
func fieldNameByType(typ eventstore.FieldType, value any) string {
switch typ {
case eventstore.FieldTypeAggregateID:
return "aggregate_id"
case eventstore.FieldTypeAggregateType:
return "aggregate_type"
case eventstore.FieldTypeInstanceID:
return "instance_id"
case eventstore.FieldTypeResourceOwner:
return "resource_owner"
case eventstore.FieldTypeFieldName:
return "field_name"
case eventstore.FieldTypeObjectType:
return "object_type"
case eventstore.FieldTypeObjectID:
return "object_id"
case eventstore.FieldTypeObjectRevision:
return "object_revision"
case eventstore.FieldTypeValue:
return valueColumn(value)
}
return ""
}
func searchFieldNameAndIndexByTypeForPush(typ eventstore.FieldType) (string, string) {
switch typ {
case eventstore.FieldTypeInstanceID:
return "instance_id", "$1"
case eventstore.FieldTypeResourceOwner:
return "resource_owner", "$2"
case eventstore.FieldTypeAggregateType:
return "aggregate_type", "$3"
case eventstore.FieldTypeAggregateID:
return "aggregate_id", "$4"
case eventstore.FieldTypeObjectType:
return "object_type", "$5"
case eventstore.FieldTypeObjectID:
return "object_id", "$6"
case eventstore.FieldTypeObjectRevision:
return "object_revision", "$7"
case eventstore.FieldTypeFieldName:
return "field_name", "$8"
case eventstore.FieldTypeValue:
return "value", "$9"
}
return "", ""
}
func valueColumn(value any) string {
//nolint: exhaustive
switch reflect.TypeOf(value).Kind() {
case reflect.Bool:
return "bool_value"
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return "number_value"
case reflect.String:
return "text_value"
}
return ""
}

View File

@@ -1,260 +0,0 @@
package eventstore
import (
"context"
_ "embed"
"reflect"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
)
func Test_handleSearchDelete(t *testing.T) {
type args struct {
clauses map[eventstore.FieldType]any
}
type want struct {
stmt string
args []any
}
tests := []struct {
name string
args args
want want
}{
{
name: "1 condition",
args: args{
clauses: map[eventstore.FieldType]any{
eventstore.FieldTypeInstanceID: "i_id",
},
},
want: want{
stmt: "DELETE FROM eventstore.fields WHERE instance_id = $1",
args: []any{"i_id"},
},
},
{
name: "2 conditions",
args: args{
clauses: map[eventstore.FieldType]any{
eventstore.FieldTypeInstanceID: "i_id",
eventstore.FieldTypeAggregateID: "a_id",
},
},
want: want{
stmt: "DELETE FROM eventstore.fields WHERE aggregate_id = $1 AND instance_id = $2",
args: []any{"a_id", "i_id"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stmt, args := writeDeleteField(tt.args.clauses)
if stmt != tt.want.stmt {
t.Errorf("handleSearchDelete() stmt = %q, want %q", stmt, tt.want.stmt)
}
assert.Equal(t, tt.want.args, args)
})
}
}
func Test_writeUpsertField(t *testing.T) {
type args struct {
fields []eventstore.FieldType
}
tests := []struct {
name string
args args
want string
}{
{
name: "1 field",
args: args{
fields: []eventstore.FieldType{
eventstore.FieldTypeInstanceID,
},
},
want: "WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE instance_id = $1 RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)",
},
{
name: "2 fields",
args: args{
fields: []eventstore.FieldType{
eventstore.FieldTypeInstanceID,
eventstore.FieldTypeAggregateType,
},
},
want: "WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE instance_id = $1 AND aggregate_type = $3 RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := writeUpsertField(tt.args.fields); got != tt.want {
t.Errorf("writeUpsertField() = %q, want %q", got, tt.want)
}
})
}
}
func Test_buildSearchCondition(t *testing.T) {
type args struct {
index int
conditions map[eventstore.FieldType]any
}
type want struct {
stmt string
args []any
}
tests := []struct {
name string
args args
want want
}{
{
name: "1 condition",
args: args{
index: 1,
conditions: map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateID: "a_id",
},
},
want: want{
stmt: "aggregate_id = $1",
args: []any{"a_id"},
},
},
{
name: "3 condition",
args: args{
index: 1,
conditions: map[eventstore.FieldType]any{
eventstore.FieldTypeAggregateID: "a_id",
eventstore.FieldTypeInstanceID: "i_id",
eventstore.FieldTypeAggregateType: "a_type",
},
},
want: want{
stmt: "aggregate_type = $1 AND aggregate_id = $2 AND instance_id = $3",
args: []any{"a_type", "a_id", "i_id"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var builder strings.Builder
if got := buildSearchCondition(&builder, tt.args.index, tt.args.conditions); !reflect.DeepEqual(got, tt.want.args) {
t.Errorf("buildSearchCondition() = %v, want %v", got, tt.want)
}
if tt.want.stmt != builder.String() {
t.Errorf("buildSearchCondition() stmt = %q, want %q", builder.String(), tt.want.stmt)
}
})
}
}
func Test_buildSearchStatement(t *testing.T) {
type args struct {
index int
conditions []map[eventstore.FieldType]any
}
type want struct {
stmt string
args []any
}
tests := []struct {
name string
args args
want want
}{
{
name: "1 condition with 1 field",
args: args{
index: 1,
conditions: []map[eventstore.FieldType]any{
{
eventstore.FieldTypeAggregateID: "a_id",
},
},
},
want: want{
stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND aggregate_id = $2",
args: []any{"a_id"},
},
},
{
name: "1 condition with 3 fields",
args: args{
index: 1,
conditions: []map[eventstore.FieldType]any{
{
eventstore.FieldTypeAggregateID: "a_id",
eventstore.FieldTypeInstanceID: "i_id",
eventstore.FieldTypeAggregateType: "a_type",
},
},
},
want: want{
stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND (aggregate_type = $2 AND aggregate_id = $3 AND instance_id = $4)",
args: []any{"a_type", "a_id", "i_id"},
},
},
{
name: "2 condition with 1 field",
args: args{
index: 1,
conditions: []map[eventstore.FieldType]any{
{
eventstore.FieldTypeAggregateID: "a_id",
},
{
eventstore.FieldTypeAggregateType: "a_type",
},
},
},
want: want{
stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND (aggregate_id = $2 OR aggregate_type = $3)",
args: []any{"a_id", "a_type"},
},
},
{
name: "2 condition with 2 fields",
args: args{
index: 1,
conditions: []map[eventstore.FieldType]any{
{
eventstore.FieldTypeAggregateID: "a_id1",
eventstore.FieldTypeAggregateType: "a_type1",
},
{
eventstore.FieldTypeAggregateID: "a_id2",
eventstore.FieldTypeAggregateType: "a_type2",
},
},
},
want: want{
stmt: "SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1 AND ((aggregate_type = $2 AND aggregate_id = $3) OR (aggregate_type = $4 AND aggregate_id = $5))",
args: []any{"a_type1", "a_id1", "a_type2", "a_id2"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var builder strings.Builder
tt.want.args = append([]any{"i_id"}, tt.want.args...)
ctx := authz.WithInstanceID(context.Background(), "i_id")
if got := buildSearchStatement(ctx, &builder, tt.args.conditions...); !reflect.DeepEqual(got, tt.want.args) {
t.Errorf("buildSearchStatement() = %v, want %v", got, tt.want)
}
if tt.want.stmt != builder.String() {
t.Errorf("buildSearchStatement() stmt = %q, want %q", builder.String(), tt.want.stmt)
}
})
}
}

View File

@@ -71,11 +71,6 @@ func (es *Eventstore) writeCommands(ctx context.Context, client database.Context
return nil, err
}
err = es.handleFieldCommands(ctx, tx, commands)
if err != nil {
return nil, err
}
return events, nil
}

View File

@@ -61,11 +61,6 @@ func (es *Eventstore) pushWithoutFunc(ctx context.Context, client database.Conte
return nil, err
}
err = es.handleFieldCommands(ctx, tx, commands)
if err != nil {
return nil, err
}
return events, nil
}

View File

@@ -64,10 +64,6 @@ type ImprovedPerformanceType int32
const (
ImprovedPerformanceTypeUnspecified ImprovedPerformanceType = iota
ImprovedPerformanceTypeOrgByID
ImprovedPerformanceTypeProjectGrant
ImprovedPerformanceTypeProject
ImprovedPerformanceTypeUserGrant
ImprovedPerformanceTypeOrgDomainVerified
)
func (f Features) ShouldUseImprovedPerformance(typ ImprovedPerformanceType) bool {

View File

@@ -1,100 +0,0 @@
package projection
import (
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/org"
"github.com/zitadel/zitadel/internal/repository/permission"
"github.com/zitadel/zitadel/internal/repository/project"
)
const (
fieldsProjectGrant = "project_grant_fields"
fieldsOrgDomainVerified = "org_domain_verified_fields"
fieldsInstanceDomain = "instance_domain_fields"
fieldsMemberships = "membership_fields"
fieldsPermission = "permission_fields"
)
func newFillProjectGrantFields(config handler.Config) *handler.FieldHandler {
return handler.NewFieldHandler(
&config,
fieldsProjectGrant,
map[eventstore.AggregateType][]eventstore.EventType{
org.AggregateType: nil,
project.AggregateType: nil,
},
)
}
func newFillOrgDomainVerifiedFields(config handler.Config) *handler.FieldHandler {
return handler.NewFieldHandler(
&config,
fieldsOrgDomainVerified,
map[eventstore.AggregateType][]eventstore.EventType{
org.AggregateType: {
org.OrgDomainAddedEventType,
org.OrgDomainVerifiedEventType,
org.OrgDomainRemovedEventType,
},
},
)
}
func newFillInstanceDomainFields(config handler.Config) *handler.FieldHandler {
return handler.NewFieldHandler(
&config,
fieldsInstanceDomain,
map[eventstore.AggregateType][]eventstore.EventType{
instance.AggregateType: {
instance.InstanceDomainAddedEventType,
instance.InstanceDomainRemovedEventType,
instance.InstanceRemovedEventType,
},
},
)
}
func newFillMembershipFields(config handler.Config) *handler.FieldHandler {
return handler.NewFieldHandler(
&config,
fieldsMemberships,
map[eventstore.AggregateType][]eventstore.EventType{
instance.AggregateType: {
instance.MemberAddedEventType,
instance.MemberChangedEventType,
instance.MemberRemovedEventType,
instance.MemberCascadeRemovedEventType,
instance.InstanceRemovedEventType,
},
org.AggregateType: {
org.MemberAddedEventType,
org.MemberChangedEventType,
org.MemberRemovedEventType,
org.MemberCascadeRemovedEventType,
org.OrgRemovedEventType,
},
project.AggregateType: {
project.MemberAddedEventType,
project.MemberChangedEventType,
project.MemberRemovedEventType,
project.MemberCascadeRemovedEventType,
project.ProjectRemovedType,
},
},
)
}
func newFillPermissionFields(config handler.Config) *handler.FieldHandler {
return handler.NewFieldHandler(
&config,
permission.PermissionSearchField,
map[eventstore.AggregateType][]eventstore.EventType{
permission.AggregateType: {
permission.AddedType,
permission.RemovedType,
},
},
)
}

View File

@@ -48,7 +48,3 @@ func (m *mockEventStore) Push(ctx context.Context, cmds ...eventstore.Command) (
m.pushCounter++
return m.pushResponse[m.pushCounter-1], nil
}
func (m *mockEventStore) FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) error {
return nil
}

View File

@@ -86,12 +86,6 @@ var (
UserSchemaProjection *handler.Handler
WebKeyProjection *handler.Handler
DebugEventsProjection *handler.Handler
ProjectGrantFields *handler.FieldHandler
OrgDomainVerifiedFields *handler.FieldHandler
InstanceDomainFields *handler.FieldHandler
MembershipFields *handler.FieldHandler
PermissionFields *handler.FieldHandler
)
type projection interface {
@@ -102,10 +96,7 @@ type projection interface {
migration.Migration
}
var (
projections []projection
fields []*handler.FieldHandler
)
var projections []projection
func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, config Config, keyEncryptionAlgorithm crypto.EncryptionAlgorithm, certEncryptionAlgorithm crypto.EncryptionAlgorithm, systemUsers map[string]*internal_authz.SystemAPIUser) error {
projectionConfig = handler.Config{
@@ -180,15 +171,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
WebKeyProjection = newWebKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["web_keys"]))
DebugEventsProjection = newDebugEventsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["debug_events"]))
ProjectGrantFields = newFillProjectGrantFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsProjectGrant]))
OrgDomainVerifiedFields = newFillOrgDomainVerifiedFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsOrgDomainVerified]))
InstanceDomainFields = newFillInstanceDomainFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsInstanceDomain]))
MembershipFields = newFillMembershipFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsMemberships]))
PermissionFields = newFillPermissionFields(applyCustomConfig(projectionConfig, config.Customizations[fieldsPermission]))
// Don't forget to add the new field handler to [ProjectInstanceFields]
newProjectionsList()
newFieldsList()
return nil
}
@@ -231,26 +214,6 @@ func ProjectInstance(ctx context.Context) error {
return nil
}
func ProjectInstanceFields(ctx context.Context) error {
for i, fieldProjection := range fields {
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("starting fields projection")
for {
err := fieldProjection.Trigger(ctx)
if err == nil {
break
}
var pgErr *pgconn.PgError
errors.As(err, &pgErr)
if pgErr.Code != database.PgUniqueConstraintErrorCode {
return err
}
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("fields projection failed because of unique constraint, retrying")
}
logging.WithFields("name", fieldProjection.ProjectionName(), "instance", internal_authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(fields))).Info("fields projection done")
}
return nil
}
func ApplyCustomConfig(customConfig CustomConfig) handler.Config {
return applyCustomConfig(projectionConfig, customConfig)
}
@@ -275,20 +238,6 @@ func applyCustomConfig(config handler.Config, customConfig CustomConfig) handler
return config
}
// we know this is ugly, but we need to have a singleton slice of all projections
// and are only able to initialize it after all projections are created
// as setup and start currently create them individually, we make sure we get the right one
// will be refactored when changing to new id based projections
func newFieldsList() {
fields = []*handler.FieldHandler{
ProjectGrantFields,
OrgDomainVerifiedFields,
InstanceDomainFields,
MembershipFields,
PermissionFields,
}
}
// we know this is ugly, but we need to have a singleton slice of all projections
// and are only able to initialize it after all projections are created
// as setup and start currently create them individually, we make sure we get the right one

View File

@@ -11,8 +11,9 @@ import "zitadel/feature/v2/feature.proto";
option go_package = "github.com/zitadel/zitadel/pkg/grpc/feature/v2;feature";
message SetInstanceFeaturesRequest{
reserved 6;
reserved "actions";
reserved 6, 7;
reserved "actions", "improved_performance";
optional bool login_default_org = 1 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "true";
@@ -46,15 +47,6 @@ message SetInstanceFeaturesRequest{
}
];
repeated ImprovedPerformance improved_performance = 7 [
(validate.rules).repeated.unique = true,
(validate.rules).repeated.items.enum = {defined_only: true, not_in: [0]},
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "[1]";
description: "Improves performance of specified execution paths.";
}
];
optional bool web_key = 8 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "true";

View File

@@ -11,8 +11,8 @@ import "zitadel/feature/v2/feature.proto";
option go_package = "github.com/zitadel/zitadel/pkg/grpc/feature/v2;feature";
message SetSystemFeaturesRequest{
reserved 6;
reserved "actions";
reserved 6, 7;
reserved "actions", "improved_performance";
optional bool login_default_org = 1 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "true";
@@ -48,15 +48,6 @@ message SetSystemFeaturesRequest{
}
];
repeated ImprovedPerformance improved_performance = 7 [
(validate.rules).repeated.unique = true,
(validate.rules).repeated.items.enum = {defined_only: true, not_in: [0]},
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "[1]";
description: "Improves performance of specified execution paths.";
}
];
optional bool oidc_single_v1_session_termination = 8 [
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
example: "true";