From e670b9126c98dacf4861b5fd8232a45307dde348 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20M=C3=B6hlmann?= Date: Wed, 26 Feb 2025 18:06:50 +0200 Subject: [PATCH] fix(permissions): chunked synchronization of role permission events (#9403) # Which Problems Are Solved Setup fails to push all role permission events when running Zitadel with CockroachDB. `TransactionRetryError`s were visible in logs which finally times out the setup job with `timeout: context deadline exceeded` # How the Problems Are Solved As suggested in the [Cockroach documentation](timeout: context deadline exceeded), _"break down larger transactions"_. The commands to be pushed for the role permissions are chunked in 50 events per push. This chunking is only done with CockroachDB. # Additional Changes - gci run fixed some unrelated imports - access to `command.Commands` for the setup job, so we can reuse the sync logic. # Additional Context Closes #9293 --------- Co-authored-by: Silvan <27845747+adlerhurst@users.noreply.github.com> --- cmd/setup/setup.go | 139 ++++++++++-------- cmd/setup/sync_role_permissions.go | 70 +-------- .../user/v2/integration_test/user_test.go | 5 +- internal/command/command.go | 27 ++++ internal/command/command_test.go | 89 +++++++++++ internal/command/converter.go | 3 + internal/command/instance.go | 17 ++- internal/command/instance_permissions.go | 29 ---- internal/command/instance_role_permissions.go | 101 +++++++++++++ .../instance_role_permissions_sync.sql | 0 .../command/instance_role_permissions_test.go | 139 ++++++++++++++++++ internal/eventstore/handler/v2/handler.go | 8 +- internal/integration/sink/server.go | 3 +- 13 files changed, 461 insertions(+), 169 deletions(-) delete mode 100644 internal/command/instance_permissions.go create mode 100644 internal/command/instance_role_permissions.go rename cmd/setup/sync_role_permissions.sql => internal/command/instance_role_permissions_sync.sql (100%) create mode 100644 internal/command/instance_role_permissions_test.go diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index a4bfc42403..c123247e46 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -180,33 +180,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") - repeatableSteps := []migration.RepeatableMigration{ - &externalConfigChange{ - es: eventstoreClient, - ExternalDomain: config.ExternalDomain, - ExternalPort: config.ExternalPort, - ExternalSecure: config.ExternalSecure, - defaults: config.SystemDefaults, - }, - &projectionTables{ - es: eventstoreClient, - Version: build.Version(), - }, - &DeleteStaleOrgFields{ - eventstore: eventstoreClient, - }, - &FillFieldsForInstanceDomains{ - eventstore: eventstoreClient, - }, - &SyncRolePermissions{ - eventstore: eventstoreClient, - rolePermissionMappings: config.InternalAuthZ.RolePermissionMappings, - }, - &RiverMigrateRepeatable{ - client: dbClient, - }, - } - for _, step := range []migration.Migration{ steps.s14NewEventsTable, steps.s40InitPushFunc, @@ -214,6 +187,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s2AssetsTable, steps.s28AddFieldTable, steps.s31AddAggregateIndexToFields, + steps.s46InitPermissionFunctions, steps.FirstInstance, steps.s5LastFailed, steps.s6OwnerRemoveColumns, @@ -238,7 +212,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s38BackChannelLogoutNotificationStart, steps.s44ReplaceCurrentSequencesIndex, steps.s45CorrectProjectOwners, - steps.s46InitPermissionFunctions, steps.s47FillMembershipFields, steps.s49InitPermittedOrgsFunction, steps.s50IDPTemplate6UsePKCE, @@ -246,6 +219,36 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) mustExecuteMigration(ctx, eventstoreClient, step, "migration failed") } + commands, _, _, _ := startCommandsQueries(ctx, eventstoreClient, eventstoreV4, dbClient, masterKey, config) + + repeatableSteps := []migration.RepeatableMigration{ + &externalConfigChange{ + es: eventstoreClient, + ExternalDomain: config.ExternalDomain, + ExternalPort: config.ExternalPort, + ExternalSecure: config.ExternalSecure, + defaults: config.SystemDefaults, + }, + &projectionTables{ + es: eventstoreClient, + Version: build.Version(), + }, + &DeleteStaleOrgFields{ + eventstore: eventstoreClient, + }, + &FillFieldsForInstanceDomains{ + eventstore: eventstoreClient, + }, + &SyncRolePermissions{ + commands: commands, + eventstore: eventstoreClient, + rolePermissionMappings: config.InternalAuthZ.RolePermissionMappings, + }, + &RiverMigrateRepeatable{ + client: dbClient, + }, + } + for _, repeatableStep := range repeatableSteps { mustExecuteMigration(ctx, eventstoreClient, repeatableStep, "unable to migrate repeatable step") } @@ -271,11 +274,6 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) initProjections( ctx, eventstoreClient, - eventstoreV4, - dbClient, - dbClient, - masterKey, - config, ) } } @@ -336,18 +334,20 @@ func readStatements(fs embed.FS, folder, typ string) ([]statement, error) { return statements, nil } -func initProjections( +func startCommandsQueries( ctx context.Context, eventstoreClient *eventstore.Eventstore, eventstoreV4 *es_v4.EventStore, - queryDBClient, - projectionDBClient *database.DB, + dbClient *database.DB, masterKey string, config *Config, +) ( + *command.Commands, + *query.Queries, + *admin_view.View, + *auth_view.View, ) { - logging.Info("init-projections is currently in beta") - - keyStorage, err := cryptoDB.NewKeyStorage(queryDBClient, masterKey) + keyStorage, err := cryptoDB.NewKeyStorage(dbClient, masterKey) logging.OnError(err).Fatal("unable to start key storage") keys, err := encryption.EnsureEncryptionKeys(ctx, config.EncryptionKeys, keyStorage) @@ -355,7 +355,7 @@ func initProjections( err = projection.Create( ctx, - queryDBClient, + dbClient, eventstoreClient, projection.Config{ RetryFailedAfter: config.InitProjections.RetryFailedAfter, @@ -367,19 +367,15 @@ func initProjections( config.SystemAPIUsers, ) logging.OnError(err).Fatal("unable to start projections") - for _, p := range projection.Projections() { - err := migration.Migrate(ctx, eventstoreClient, p) - logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") - } - staticStorage, err := config.AssetStorage.NewStorage(queryDBClient.DB) + staticStorage, err := config.AssetStorage.NewStorage(dbClient.DB) logging.OnError(err).Fatal("unable to start asset storage") - adminView, err := admin_view.StartView(queryDBClient) + adminView, err := admin_view.StartView(dbClient) logging.OnError(err).Fatal("unable to start admin view") admin_handler.Register(ctx, admin_handler.Config{ - Client: queryDBClient, + Client: dbClient, Eventstore: eventstoreClient, BulkLimit: config.InitProjections.BulkLimit, FailureCountUntilSkip: uint64(config.InitProjections.MaxFailureCount), @@ -387,22 +383,18 @@ func initProjections( adminView, staticStorage, ) - for _, p := range admin_handler.Projections() { - err := migration.Migrate(ctx, eventstoreClient, p) - logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") - } sessionTokenVerifier := internal_authz.SessionTokenVerifier(keys.OIDC) - cacheConnectors, err := connector.StartConnectors(config.Caches, queryDBClient) + cacheConnectors, err := connector.StartConnectors(config.Caches, dbClient) logging.OnError(err).Fatal("unable to start caches") queries, err := query.StartQueries( ctx, eventstoreClient, eventstoreV4.Querier, - queryDBClient, - projectionDBClient, + dbClient, + dbClient, cacheConnectors, config.Projections, config.SystemDefaults, @@ -424,11 +416,11 @@ func initProjections( ) logging.OnError(err).Fatal("unable to start queries") - authView, err := auth_view.StartView(queryDBClient, keys.OIDC, queries, eventstoreClient) + authView, err := auth_view.StartView(dbClient, keys.OIDC, queries, eventstoreClient) logging.OnError(err).Fatal("unable to start admin view") auth_handler.Register(ctx, auth_handler.Config{ - Client: queryDBClient, + Client: dbClient, Eventstore: eventstoreClient, BulkLimit: config.InitProjections.BulkLimit, FailureCountUntilSkip: uint64(config.InitProjections.MaxFailureCount), @@ -436,16 +428,13 @@ func initProjections( authView, queries, ) - for _, p := range auth_handler.Projections() { - err := migration.Migrate(ctx, eventstoreClient, p) - logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") - } - authZRepo, err := authz.Start(queries, eventstoreClient, queryDBClient, keys.OIDC, config.ExternalSecure) + authZRepo, err := authz.Start(queries, eventstoreClient, dbClient, keys.OIDC, config.ExternalSecure) logging.OnError(err).Fatal("unable to start authz repo") permissionCheck := func(ctx context.Context, permission, orgID, resourceID string) (err error) { return internal_authz.CheckPermission(ctx, authZRepo, config.InternalAuthZ.RolePermissionMappings, permission, orgID, resourceID) } + commands, err := command.StartCommands(ctx, eventstoreClient, cacheConnectors, @@ -477,6 +466,7 @@ func initProjections( config.DefaultInstance.SecretGenerators, ) logging.OnError(err).Fatal("unable to start commands") + notify_handler.Register( ctx, config.Projections.Customizations["notifications"], @@ -498,8 +488,33 @@ func initProjections( keys.SMS, keys.OIDC, config.OIDC.DefaultBackChannelLogoutLifetime, - queryDBClient, + dbClient, ) + + return commands, queries, adminView, authView +} + +func initProjections( + ctx context.Context, + eventstoreClient *eventstore.Eventstore, +) { + logging.Info("init-projections is currently in beta") + + for _, p := range projection.Projections() { + err := migration.Migrate(ctx, eventstoreClient, p) + logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") + } + + for _, p := range admin_handler.Projections() { + err := migration.Migrate(ctx, eventstoreClient, p) + logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") + } + + for _, p := range auth_handler.Projections() { + err := migration.Migrate(ctx, eventstoreClient, p) + logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") + } + for _, p := range notify_handler.Projections() { err := migration.Migrate(ctx, eventstoreClient, p) logging.WithFields("name", p.String()).OnError(err).Fatal("migration failed") diff --git a/cmd/setup/sync_role_permissions.go b/cmd/setup/sync_role_permissions.go index b38b075d82..5c380265b5 100644 --- a/cmd/setup/sync_role_permissions.go +++ b/cmd/setup/sync_role_permissions.go @@ -2,29 +2,22 @@ package setup import ( "context" - "database/sql" _ "embed" "fmt" - "strings" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/repository/instance" - "github.com/zitadel/zitadel/internal/repository/permission" -) - -var ( - //go:embed sync_role_permissions.sql - getRolePermissionOperationsQuery string ) // SyncRolePermissions is a repeatable step which synchronizes the InternalAuthZ // RolePermissionMappings from the configuration to the database. // This is needed until role permissions are manageable over the API. type SyncRolePermissions struct { + commands *command.Commands eventstore *eventstore.Eventstore rolePermissionMappings []authz.RoleMapping } @@ -38,18 +31,11 @@ func (mig *SyncRolePermissions) Execute(ctx context.Context, _ eventstore.Event) func (mig *SyncRolePermissions) executeSystem(ctx context.Context) error { logging.WithFields("migration", mig.String()).Info("prepare system role permission sync events") - - target := rolePermissionMappingsToDatabaseMap(mig.rolePermissionMappings, true) - cmds, err := mig.synchronizeCommands(ctx, "SYSTEM", target) + details, err := mig.commands.SynchronizeRolePermission(ctx, "SYSTEM", mig.rolePermissionMappings) if err != nil { return err } - events, err := mig.eventstore.Push(ctx, cmds...) - if err != nil { - return err - } - - logging.WithFields("migration", mig.String(), "pushed_events", len(events)).Info("pushed system role permission sync events") + logging.WithFields("migration", mig.String(), "sequence", details.Sequence).Info("pushed system role permission sync events") return nil } @@ -70,51 +56,17 @@ func (mig *SyncRolePermissions) executeInstances(ctx context.Context) error { if err != nil { return err } - target := rolePermissionMappingsToDatabaseMap(mig.rolePermissionMappings, false) for i, instanceID := range instances { logging.WithFields("instance_id", instanceID, "migration", mig.String(), "progress", fmt.Sprintf("%d/%d", i+1, len(instances))).Info("prepare instance role permission sync events") - cmds, err := mig.synchronizeCommands(ctx, instanceID, target) + details, err := mig.commands.SynchronizeRolePermission(ctx, instanceID, mig.rolePermissionMappings) if err != nil { return err } - events, err := mig.eventstore.Push(ctx, cmds...) - if err != nil { - return err - } - logging.WithFields("instance_id", instanceID, "migration", mig.String(), "pushed_events", len(events)).Info("pushed instance role permission sync events") + logging.WithFields("instance_id", instanceID, "migration", mig.String(), "sequence", details.Sequence).Info("pushed instance role permission sync events") } return nil } -// synchronizeCommands checks the current state of role permissions in the eventstore for the aggregate. -// It returns the commands required to reach the desired state passed in target. -// For system level permissions aggregateID must be set to `SYSTEM`, -// else it is the instance ID. -func (mig *SyncRolePermissions) synchronizeCommands(ctx context.Context, aggregateID string, target database.Map[[]string]) (cmds []eventstore.Command, err error) { - aggregate := permission.NewAggregate(aggregateID) - err = mig.eventstore.Client().QueryContext(ctx, func(rows *sql.Rows) error { - for rows.Next() { - var operation, role, perm string - if err := rows.Scan(&operation, &role, &perm); err != nil { - return err - } - logging.WithFields("aggregate_id", aggregateID, "migration", mig.String(), "operation", operation, "role", role, "permission", perm).Debug("sync role permission") - switch operation { - case "add": - cmds = append(cmds, permission.NewAddedEvent(ctx, aggregate, role, perm)) - case "remove": - cmds = append(cmds, permission.NewRemovedEvent(ctx, aggregate, role, perm)) - } - } - return rows.Close() - - }, getRolePermissionOperationsQuery, aggregateID, target) - if err != nil { - return nil, err - } - return cmds, err -} - func (*SyncRolePermissions) String() string { return "repeatable_sync_role_permissions" } @@ -122,13 +74,3 @@ func (*SyncRolePermissions) String() string { func (*SyncRolePermissions) Check(lastRun map[string]interface{}) bool { return true } - -func rolePermissionMappingsToDatabaseMap(mappings []authz.RoleMapping, system bool) database.Map[[]string] { - out := make(database.Map[[]string], len(mappings)) - for _, m := range mappings { - if system == strings.HasPrefix(m.Role, "SYSTEM") { - out[m.Role] = m.Permissions - } - } - return out -} diff --git a/internal/api/grpc/user/v2/integration_test/user_test.go b/internal/api/grpc/user/v2/integration_test/user_test.go index 6d5d112e98..0293fd925d 100644 --- a/internal/api/grpc/user/v2/integration_test/user_test.go +++ b/internal/api/grpc/user/v2/integration_test/user_test.go @@ -10,15 +10,14 @@ import ( "testing" "time" - "github.com/zitadel/logging" - "google.golang.org/protobuf/types/known/structpb" - "github.com/brianvoe/gofakeit/v6" "github.com/muhlemmer/gu" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/zitadel/logging" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/zitadel/zitadel/internal/api/grpc" diff --git a/internal/command/command.go b/internal/command/command.go index ab047fccdb..17f6641caf 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -218,6 +218,33 @@ func (c *Commands) pushAppendAndReduce(ctx context.Context, object AppendReducer return AppendAndReduce(object, events...) } +// pushChunked pushes the commands in chunks of size to the eventstore. +// This can be used to reduce the amount of events in a single transaction. +// When an error occurs, the events that have been pushed so far will be returned. +// +// Warning: chunks are pushed in separate transactions. +// Successful pushes will not be rolled back if a later chunk fails. +// Only use this function when the caller is able to handle partial success +// and is able to consolidate the state on errors. +func (c *Commands) pushChunked(ctx context.Context, size uint16, cmds ...eventstore.Command) (_ []eventstore.Event, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + + events := make([]eventstore.Event, 0, len(cmds)) + for i := 0; i < len(cmds); i += int(size) { + end := i + int(size) + if end > len(cmds) { + end = len(cmds) + } + chunk, err := c.eventstore.Push(ctx, cmds[i:end]...) + if err != nil { + return events, err + } + events = append(events, chunk...) + } + return events, nil +} + type AppendReducerDetails interface { AppendEvents(...eventstore.Event) // TODO: Why is it allowed to return an error here? diff --git a/internal/command/command_test.go b/internal/command/command_test.go index 2367930b89..7224f047b5 100644 --- a/internal/command/command_test.go +++ b/internal/command/command_test.go @@ -2,6 +2,7 @@ package command import ( "context" + "fmt" "io" "os" "testing" @@ -13,6 +14,7 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/i18n" + "github.com/zitadel/zitadel/internal/repository/permission" "github.com/zitadel/zitadel/internal/repository/user" ) @@ -29,6 +31,93 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestCommands_pushChunked(t *testing.T) { + aggregate := permission.NewAggregate("instanceID") + cmds := make([]eventstore.Command, 100) + for i := 0; i < 100; i++ { + cmds[i] = permission.NewAddedEvent(context.Background(), aggregate, "role", fmt.Sprintf("permission%d", i)) + } + type args struct { + size uint16 + } + tests := []struct { + name string + args args + eventstore func(*testing.T) *eventstore.Eventstore + wantEvents int + wantErr error + }{ + { + name: "push error", + args: args{ + size: 100, + }, + eventstore: expectEventstore( + expectPushFailed(io.ErrClosedPipe, cmds...), + ), + wantEvents: 0, + wantErr: io.ErrClosedPipe, + }, + { + name: "single chunk", + args: args{ + size: 100, + }, + eventstore: expectEventstore( + expectPush(cmds...), + ), + wantEvents: len(cmds), + }, + { + name: "aligned chunks", + args: args{ + size: 50, + }, + eventstore: expectEventstore( + expectPush(cmds[0:50]...), + expectPush(cmds[50:100]...), + ), + wantEvents: len(cmds), + }, + { + name: "odd chunks", + args: args{ + size: 30, + }, + eventstore: expectEventstore( + expectPush(cmds[0:30]...), + expectPush(cmds[30:60]...), + expectPush(cmds[60:90]...), + expectPush(cmds[90:100]...), + ), + wantEvents: len(cmds), + }, + { + name: "partial error", + args: args{ + size: 30, + }, + eventstore: expectEventstore( + expectPush(cmds[0:30]...), + expectPush(cmds[30:60]...), + expectPushFailed(io.ErrClosedPipe, cmds[60:90]...), + ), + wantEvents: len(cmds[0:60]), + wantErr: io.ErrClosedPipe, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Commands{ + eventstore: tt.eventstore(t), + } + gotEvents, err := c.pushChunked(context.Background(), tt.args.size, cmds...) + require.ErrorIs(t, err, tt.wantErr) + assert.Len(t, gotEvents, tt.wantEvents) + }) + } +} + func TestCommands_asyncPush(t *testing.T) { // make sure the test terminates on deadlock background := context.Background() diff --git a/internal/command/converter.go b/internal/command/converter.go index e4309a54c1..292de21aa9 100644 --- a/internal/command/converter.go +++ b/internal/command/converter.go @@ -15,6 +15,9 @@ func writeModelToObjectDetails(writeModel *eventstore.WriteModel) *domain.Object } func pushedEventsToObjectDetails(events []eventstore.Event) *domain.ObjectDetails { + if len(events) == 0 { + return &domain.ObjectDetails{} + } return &domain.ObjectDetails{ Sequence: events[len(events)-1].Sequence(), EventDate: events[len(events)-1].CreatedAt(), diff --git a/internal/command/instance.go b/internal/command/instance.go index 99075ccfad..ba22d4fb31 100644 --- a/internal/command/instance.go +++ b/internal/command/instance.go @@ -233,21 +233,25 @@ func (c *Commands) SetUpInstance(ctx context.Context, setup *InstanceSetup) (str return "", "", nil, nil, err } - events, err := c.eventstore.Push(ctx, cmds...) + _, err = c.eventstore.Push(ctx, cmds...) if err != nil { return "", "", nil, nil, err } + // RolePermissions need to be pushed in separate transaction. + // https://github.com/zitadel/zitadel/issues/9293 + details, err := c.SynchronizeRolePermission(ctx, setup.zitadel.instanceID, setup.RolePermissionMappings) + if err != nil { + return "", "", nil, nil, err + } + details.ResourceOwner = setup.zitadel.orgID + var token string if pat != nil { token = pat.Token } - return setup.zitadel.instanceID, token, machineKey, &domain.ObjectDetails{ - Sequence: events[len(events)-1].Sequence(), - EventDate: events[len(events)-1].CreatedAt(), - ResourceOwner: setup.zitadel.orgID, - }, nil + return setup.zitadel.instanceID, token, machineKey, details, nil } func contextWithInstanceSetupInfo(ctx context.Context, instanceID, projectID, consoleAppID, externalDomain string) context.Context { @@ -380,7 +384,6 @@ func setupInstanceElements(instanceAgg *instance.Aggregate, setup *InstanceSetup setup.LabelPolicy.ThemeMode, ), prepareAddDefaultEmailTemplate(instanceAgg, setup.EmailTemplate), - prepareAddRolePermissions(instanceAgg, setup.RolePermissionMappings), } } diff --git a/internal/command/instance_permissions.go b/internal/command/instance_permissions.go deleted file mode 100644 index c46c8f7c4a..0000000000 --- a/internal/command/instance_permissions.go +++ /dev/null @@ -1,29 +0,0 @@ -package command - -import ( - "context" - "strings" - - "github.com/zitadel/zitadel/internal/api/authz" - "github.com/zitadel/zitadel/internal/command/preparation" - "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/repository/instance" - "github.com/zitadel/zitadel/internal/repository/permission" -) - -func prepareAddRolePermissions(a *instance.Aggregate, roles []authz.RoleMapping) preparation.Validation { - return func() (preparation.CreateCommands, error) { - return func(ctx context.Context, _ preparation.FilterToQueryReducer) (cmds []eventstore.Command, _ error) { - aggregate := permission.NewAggregate(a.InstanceID) - for _, r := range roles { - if strings.HasPrefix(r.Role, "SYSTEM") { - continue - } - for _, p := range r.Permissions { - cmds = append(cmds, permission.NewAddedEvent(ctx, aggregate, r.Role, p)) - } - } - return cmds, nil - }, nil - } -} diff --git a/internal/command/instance_role_permissions.go b/internal/command/instance_role_permissions.go new file mode 100644 index 0000000000..c0c6355dd6 --- /dev/null +++ b/internal/command/instance_role_permissions.go @@ -0,0 +1,101 @@ +package command + +import ( + "context" + "database/sql" + _ "embed" + "strings" + + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/permission" + "github.com/zitadel/zitadel/internal/telemetry/tracing" + "github.com/zitadel/zitadel/internal/zerrors" +) + +const ( + CockroachRollPermissionChunkSize uint16 = 50 +) + +// SynchronizeRolePermission checks the current state of role permissions in the eventstore for the aggregate. +// It pushes the commands required to reach the desired state passed in target. +// For system level permissions aggregateID must be set to `SYSTEM`, else it is the instance ID. +// +// In case cockroachDB is used, the commands are pushed in chunks of CockroachRollPermissionChunkSize. +func (c *Commands) SynchronizeRolePermission(ctx context.Context, aggregateID string, target []authz.RoleMapping) (_ *domain.ObjectDetails, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + + cmds, err := synchronizeRolePermissionCommands(ctx, c.eventstore.Client(), aggregateID, + rolePermissionMappingsToDatabaseMap(target, aggregateID == "SYSTEM"), + ) + if err != nil { + return nil, zerrors.ThrowInternal(err, "COMMA-Iej2r", "Errors.Internal") + } + var events []eventstore.Event + if c.eventstore.Client().Database.Type() == "cockroach" { + events, err = c.pushChunked(ctx, CockroachRollPermissionChunkSize, cmds...) + } else { + events, err = c.eventstore.Push(ctx, cmds...) + } + if err != nil { + return nil, zerrors.ThrowInternal(err, "COMMA-AiV3u", "Errors.Internal") + } + return pushedEventsToObjectDetails(events), nil +} + +func rolePermissionMappingsToDatabaseMap(mappings []authz.RoleMapping, system bool) database.Map[[]string] { + out := make(database.Map[[]string], len(mappings)) + for _, m := range mappings { + if system == strings.HasPrefix(m.Role, "SYSTEM") { + out[m.Role] = m.Permissions + } + } + return out +} + +var ( + //go:embed instance_role_permissions_sync.sql + instanceRolePermissionsSyncQuery string +) + +// synchronizeRolePermissionCommands checks the current state of role permissions in the eventstore for the aggregate. +// It returns the commands required to reach the desired state passed in target. +// For system level permissions aggregateID must be set to `SYSTEM`, else it is the instance ID. +func synchronizeRolePermissionCommands(ctx context.Context, db *database.DB, aggregateID string, target database.Map[[]string]) (cmds []eventstore.Command, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + + err = db.QueryContext(ctx, + rolePermissionScanner(ctx, permission.NewAggregate(aggregateID), &cmds), + instanceRolePermissionsSyncQuery, + aggregateID, target) + if err != nil { + return nil, err + } + return cmds, nil +} + +func rolePermissionScanner(ctx context.Context, aggregate *eventstore.Aggregate, cmds *[]eventstore.Command) func(rows *sql.Rows) error { + return func(rows *sql.Rows) error { + for rows.Next() { + var operation, role, perm string + if err := rows.Scan(&operation, &role, &perm); err != nil { + return err + } + logging.WithFields("aggregate_id", aggregate.ID, "operation", operation, "role", role, "permission", perm).Debug("sync role permission") + switch operation { + case "add": + *cmds = append(*cmds, permission.NewAddedEvent(ctx, aggregate, role, perm)) + case "remove": + *cmds = append(*cmds, permission.NewRemovedEvent(ctx, aggregate, role, perm)) + } + } + return rows.Close() + + } +} diff --git a/cmd/setup/sync_role_permissions.sql b/internal/command/instance_role_permissions_sync.sql similarity index 100% rename from cmd/setup/sync_role_permissions.sql rename to internal/command/instance_role_permissions_sync.sql diff --git a/internal/command/instance_role_permissions_test.go b/internal/command/instance_role_permissions_test.go new file mode 100644 index 0000000000..0a8d84b9e7 --- /dev/null +++ b/internal/command/instance_role_permissions_test.go @@ -0,0 +1,139 @@ +package command + +import ( + "context" + "database/sql" + "database/sql/driver" + _ "embed" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/mock" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/repository/permission" +) + +func Test_rolePermissionMappingsToDatabaseMap(t *testing.T) { + type args struct { + mappings []authz.RoleMapping + system bool + } + tests := []struct { + name string + args args + want database.Map[[]string] + }{ + { + name: "instance", + args: args{ + mappings: []authz.RoleMapping{ + {Role: "role1", Permissions: []string{"permission1", "permission2"}}, + {Role: "role2", Permissions: []string{"permission3", "permission4"}}, + {Role: "SYSTEM_ROLE", Permissions: []string{"permission5", "permission6"}}, + }, + system: false, + }, + want: database.Map[[]string]{ + "role1": []string{"permission1", "permission2"}, + "role2": []string{"permission3", "permission4"}, + }, + }, + { + name: "system", + args: args{ + mappings: []authz.RoleMapping{ + {Role: "role1", Permissions: []string{"permission1", "permission2"}}, + {Role: "role2", Permissions: []string{"permission3", "permission4"}}, + {Role: "SYSTEM_ROLE", Permissions: []string{"permission5", "permission6"}}, + }, + system: true, + }, + want: database.Map[[]string]{ + "SYSTEM_ROLE": []string{"permission5", "permission6"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := rolePermissionMappingsToDatabaseMap(tt.args.mappings, tt.args.system) + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_synchronizeRolePermissionCommands(t *testing.T) { + const aggregateID = "aggregateID" + aggregate := permission.NewAggregate(aggregateID) + target := database.Map[[]string]{ + "role1": []string{"permission1", "permission2"}, + "role2": []string{"permission3", "permission4"}, + } + tests := []struct { + name string + mock func(*testing.T) *mock.SQLMock + wantCmds []eventstore.Command + wantErr error + }{ + { + name: "query error", + mock: func(t *testing.T) *mock.SQLMock { + return mock.NewSQLMock(t, + mock.ExpectQuery(instanceRolePermissionsSyncQuery, + mock.WithQueryArgs(aggregateID, target), + mock.WithQueryErr(sql.ErrConnDone), + ), + ) + }, + wantErr: sql.ErrConnDone, + }, + { + name: "no rows", + mock: func(t *testing.T) *mock.SQLMock { + return mock.NewSQLMock(t, + mock.ExpectQuery(instanceRolePermissionsSyncQuery, + mock.WithQueryArgs(aggregateID, target), + mock.WithQueryResult([]string{"operation", "role", "permission"}, [][]driver.Value{}), + ), + ) + }, + }, + { + name: "add and remove operations", + mock: func(t *testing.T) *mock.SQLMock { + return mock.NewSQLMock(t, + mock.ExpectQuery(instanceRolePermissionsSyncQuery, + mock.WithQueryArgs(aggregateID, target), + mock.WithQueryResult([]string{"operation", "role", "permission"}, [][]driver.Value{ + {"add", "role1", "permission1"}, + {"add", "role1", "permission2"}, + {"remove", "role3", "permission5"}, + {"remove", "role3", "permission6"}, + }), + ), + ) + }, + wantCmds: []eventstore.Command{ + permission.NewAddedEvent(context.Background(), aggregate, "role1", "permission1"), + permission.NewAddedEvent(context.Background(), aggregate, "role1", "permission2"), + permission.NewRemovedEvent(context.Background(), aggregate, "role3", "permission5"), + permission.NewRemovedEvent(context.Background(), aggregate, "role3", "permission6"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mock := tt.mock(t) + defer mock.Assert(t) + db := &database.DB{ + DB: mock.DB, + } + gotCmds, err := synchronizeRolePermissionCommands(context.Background(), db, aggregateID, target) + require.ErrorIs(t, err, tt.wantErr) + assert.Equal(t, tt.wantCmds, gotCmds) + }) + } +} diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index 2c2f88f8a0..03805f360b 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -263,12 +263,16 @@ func (h *Handler) triggerInstances(ctx context.Context, instances []string, trig // simple implementation of do while _, err := h.Trigger(instanceCtx, triggerOpts...) - h.log().WithField("instance", instance).OnError(err).Debug("trigger failed") + // skip retry if everything is fine + if err == nil { + continue + } + h.log().WithField("instance", instance).WithError(err).Debug("trigger failed") time.Sleep(h.retryFailedAfter) // retry if trigger failed for ; err != nil; _, err = h.Trigger(instanceCtx, triggerOpts...) { time.Sleep(h.retryFailedAfter) - h.log().WithField("instance", instance).OnError(err).Debug("trigger failed") + h.log().WithField("instance", instance).WithError(err).Debug("trigger failed") } } } diff --git a/internal/integration/sink/server.go b/internal/integration/sink/server.go index aee40cad02..2c79081e98 100644 --- a/internal/integration/sink/server.go +++ b/internal/integration/sink/server.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + crewjam_saml "github.com/crewjam/saml" "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" @@ -23,8 +24,6 @@ import ( "golang.org/x/oauth2" "golang.org/x/text/language" - crewjam_saml "github.com/crewjam/saml" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/idp/providers/ldap"