fix(eventstore): consider IsGlobal-flag of constraints (#7518)

* fix(eventstore): consider `IsGlobal`-flag of constraints

* fix(setup): set `instance_domain`-constraint global

(cherry picked from commit 60ee2610f2)
This commit is contained in:
Silvan 2024-03-08 14:33:53 +01:00 committed by Livio Spring
parent 3c0cd30afe
commit e0e5665e17
No known key found for this signature in database
GPG Key ID: 26BB1C2FA5952CF0
5 changed files with 92 additions and 63 deletions

27
cmd/setup/23.go Normal file
View File

@ -0,0 +1,27 @@
package setup
import (
"context"
_ "embed"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
//go:embed 23.sql
correctGlobalUniqueConstraints string
)
type CorrectGlobalUniqueConstraints struct {
dbClient *database.DB
}
func (mig *CorrectGlobalUniqueConstraints) Execute(ctx context.Context, _ eventstore.Event) error {
_, err := mig.dbClient.ExecContext(ctx, correctGlobalUniqueConstraints)
return err
}
func (mig *CorrectGlobalUniqueConstraints) String() string {
return "23_correct_global_unique_constraints"
}

1
cmd/setup/23.sql Normal file
View File

@ -0,0 +1 @@
UPDATE eventstore.unique_constraints SET instance_id = '' WHERE unique_type = 'instance_domain';

View File

@ -101,6 +101,7 @@ type Steps struct {
s20AddByUserSessionIndex *AddByUserIndexToSession s20AddByUserSessionIndex *AddByUserIndexToSession
s21AddBlockFieldToLimits *AddBlockFieldToLimits s21AddBlockFieldToLimits *AddBlockFieldToLimits
s22ActiveInstancesIndex *ActiveInstanceEvents s22ActiveInstancesIndex *ActiveInstanceEvents
s23CorrectGlobalUniqueConstraints *CorrectGlobalUniqueConstraints
} }
func MustNewSteps(v *viper.Viper) *Steps { func MustNewSteps(v *viper.Viper) *Steps {

View File

@ -135,6 +135,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
steps.s20AddByUserSessionIndex = &AddByUserIndexToSession{dbClient: queryDBClient} steps.s20AddByUserSessionIndex = &AddByUserIndexToSession{dbClient: queryDBClient}
steps.s21AddBlockFieldToLimits = &AddBlockFieldToLimits{dbClient: queryDBClient} steps.s21AddBlockFieldToLimits = &AddBlockFieldToLimits{dbClient: queryDBClient}
steps.s22ActiveInstancesIndex = &ActiveInstanceEvents{dbClient: queryDBClient} steps.s22ActiveInstancesIndex = &ActiveInstanceEvents{dbClient: queryDBClient}
steps.s23CorrectGlobalUniqueConstraints = &CorrectGlobalUniqueConstraints{dbClient: esPusherDBClient}
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil) err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections") logging.OnError(err).Fatal("unable to start projections")
@ -153,49 +154,39 @@ func Setup(config *Config, steps *Steps, masterKey string) {
}, },
} }
err = migration.Migrate(ctx, eventstoreClient, steps.s14NewEventsTable) for _, step := range []migration.Migration{
logging.WithFields("name", steps.s14NewEventsTable.String()).OnError(err).Fatal("migration failed") steps.s14NewEventsTable,
err = migration.Migrate(ctx, eventstoreClient, steps.s1ProjectionTable) steps.s1ProjectionTable,
logging.WithFields("name", steps.s1ProjectionTable.String()).OnError(err).Fatal("migration failed") steps.s2AssetsTable,
err = migration.Migrate(ctx, eventstoreClient, steps.s2AssetsTable) steps.FirstInstance,
logging.WithFields("name", steps.s2AssetsTable.String()).OnError(err).Fatal("migration failed") steps.s5LastFailed,
err = migration.Migrate(ctx, eventstoreClient, steps.FirstInstance) steps.s6OwnerRemoveColumns,
logging.WithFields("name", steps.FirstInstance.String()).OnError(err).Fatal("migration failed") steps.s7LogstoreTables,
err = migration.Migrate(ctx, eventstoreClient, steps.s5LastFailed) steps.s8AuthTokens,
logging.WithFields("name", steps.s5LastFailed.String()).OnError(err).Fatal("migration failed") steps.s12AddOTPColumns,
err = migration.Migrate(ctx, eventstoreClient, steps.s6OwnerRemoveColumns) steps.s13FixQuotaProjection,
logging.WithFields("name", steps.s6OwnerRemoveColumns.String()).OnError(err).Fatal("migration failed") steps.s15CurrentStates,
err = migration.Migrate(ctx, eventstoreClient, steps.s7LogstoreTables) steps.s16UniqueConstraintsLower,
logging.WithFields("name", steps.s7LogstoreTables.String()).OnError(err).Fatal("migration failed") steps.s17AddOffsetToUniqueConstraints,
err = migration.Migrate(ctx, eventstoreClient, steps.s8AuthTokens) steps.s19AddCurrentStatesIndex,
logging.WithFields("name", steps.s8AuthTokens.String()).OnError(err).Fatal("migration failed") steps.s20AddByUserSessionIndex,
err = migration.Migrate(ctx, eventstoreClient, steps.s12AddOTPColumns) steps.s22ActiveInstancesIndex,
logging.WithFields("name", steps.s12AddOTPColumns.String()).OnError(err).Fatal("migration failed") steps.s23CorrectGlobalUniqueConstraints,
err = migration.Migrate(ctx, eventstoreClient, steps.s13FixQuotaProjection) } {
logging.WithFields("name", steps.s13FixQuotaProjection.String()).OnError(err).Fatal("migration failed") mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s15CurrentStates) }
logging.WithFields("name", steps.s15CurrentStates.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s16UniqueConstraintsLower)
logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s17AddOffsetToUniqueConstraints)
logging.WithFields("name", steps.s17AddOffsetToUniqueConstraints.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s19AddCurrentStatesIndex)
logging.WithFields("name", steps.s19AddCurrentStatesIndex.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s20AddByUserSessionIndex)
logging.WithFields("name", steps.s20AddByUserSessionIndex.String()).OnError(err).Fatal("migration failed")
err = migration.Migrate(ctx, eventstoreClient, steps.s22ActiveInstancesIndex)
logging.WithFields("name", steps.s22ActiveInstancesIndex.String()).OnError(err).Fatal("migration failed")
for _, repeatableStep := range repeatableSteps { for _, repeatableStep := range repeatableSteps {
err = migration.Migrate(ctx, eventstoreClient, repeatableStep) mustExecuteMigration(ctx, eventstoreClient, repeatableStep, "unable to migrate repeatable step")
logging.OnError(err).Fatalf("unable to migrate repeatable step: %s", repeatableStep.String())
} }
// These steps are executed after the repeatable steps because they add fields projections // These steps are executed after the repeatable steps because they add fields projections
err = migration.Migrate(ctx, eventstoreClient, steps.s18AddLowerFieldsToLoginNames) for _, step := range []migration.Migration{
logging.WithFields("name", steps.s18AddLowerFieldsToLoginNames.String()).OnError(err).Fatal("migration failed") steps.s18AddLowerFieldsToLoginNames,
err = migration.Migrate(ctx, eventstoreClient, steps.s21AddBlockFieldToLimits) steps.s21AddBlockFieldToLimits,
logging.WithFields("name", steps.s21AddBlockFieldToLimits.String()).OnError(err).Fatal("migration failed") } {
mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
}
// projection initialization must be done last, since the steps above might add required columns to the projections // projection initialization must be done last, since the steps above might add required columns to the projections
if config.InitProjections.Enabled { if config.InitProjections.Enabled {
@ -210,6 +201,11 @@ func Setup(config *Config, steps *Steps, masterKey string) {
} }
} }
func mustExecuteMigration(ctx context.Context, eventstoreClient *eventstore.Eventstore, step migration.Migration, errorMsg string) {
err := migration.Migrate(ctx, eventstoreClient, step)
logging.WithFields("name", step.String()).OnError(err).Fatal(errorMsg)
}
func readStmt(fs embed.FS, folder, typ, filename string) (string, error) { func readStmt(fs embed.FS, folder, typ, filename string) (string, error) {
stmt, err := fs.ReadFile(folder + "/" + typ + "/" + filename) stmt, err := fs.ReadFile(folder + "/" + typ + "/" + filename)
return string(stmt), err return string(stmt), err

View File

@ -35,20 +35,24 @@ func handleUniqueConstraints(ctx context.Context, tx *sql.Tx, commands []eventst
for _, command := range commands { for _, command := range commands {
for _, constraint := range command.UniqueConstraints() { for _, constraint := range command.UniqueConstraints() {
instanceID := command.Aggregate().InstanceID
if constraint.IsGlobal {
instanceID = ""
}
switch constraint.Action { switch constraint.Action {
case eventstore.UniqueConstraintAdd: case eventstore.UniqueConstraintAdd:
constraint.UniqueField = strings.ToLower(constraint.UniqueField) constraint.UniqueField = strings.ToLower(constraint.UniqueField)
addPlaceholders = append(addPlaceholders, fmt.Sprintf("($%d, $%d, $%d)", len(addArgs)+1, len(addArgs)+2, len(addArgs)+3)) addPlaceholders = append(addPlaceholders, fmt.Sprintf("($%d, $%d, $%d)", len(addArgs)+1, len(addArgs)+2, len(addArgs)+3))
addArgs = append(addArgs, command.Aggregate().InstanceID, constraint.UniqueType, constraint.UniqueField) addArgs = append(addArgs, instanceID, constraint.UniqueType, constraint.UniqueField)
addConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, command.Aggregate().InstanceID, constraint.UniqueType, constraint.UniqueField)] = constraint addConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, instanceID, constraint.UniqueType, constraint.UniqueField)] = constraint
case eventstore.UniqueConstraintRemove: case eventstore.UniqueConstraintRemove:
deletePlaceholders = append(deletePlaceholders, fmt.Sprintf(deleteConstraintPlaceholdersStmt, len(deleteArgs)+1, len(deleteArgs)+2, len(deleteArgs)+3)) deletePlaceholders = append(deletePlaceholders, fmt.Sprintf(deleteConstraintPlaceholdersStmt, len(deleteArgs)+1, len(deleteArgs)+2, len(deleteArgs)+3))
deleteArgs = append(deleteArgs, command.Aggregate().InstanceID, constraint.UniqueType, constraint.UniqueField) deleteArgs = append(deleteArgs, instanceID, constraint.UniqueType, constraint.UniqueField)
deleteConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, command.Aggregate().InstanceID, constraint.UniqueType, constraint.UniqueField)] = constraint deleteConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, instanceID, constraint.UniqueType, constraint.UniqueField)] = constraint
case eventstore.UniqueConstraintInstanceRemove: case eventstore.UniqueConstraintInstanceRemove:
deletePlaceholders = append(deletePlaceholders, fmt.Sprintf("(instance_id = $%d)", len(deleteArgs)+1)) deletePlaceholders = append(deletePlaceholders, fmt.Sprintf("(instance_id = $%d)", len(deleteArgs)+1))
deleteArgs = append(deleteArgs, command.Aggregate().InstanceID) deleteArgs = append(deleteArgs, instanceID)
deleteConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, command.Aggregate().InstanceID, constraint.UniqueType, constraint.UniqueField)] = constraint deleteConstraints[fmt.Sprintf(uniqueConstraintPlaceholderFmt, instanceID, constraint.UniqueType, constraint.UniqueField)] = constraint
} }
} }
} }