feat(crdb): use crdb native backup and s3 backup added (#1915)

* fix(zitadelctl): implement takedown command

* fix(zitadelctl): correct destroy flow

* fix(zitadelctl): correct backup commands to read crds beforehand

* fix: add of destroyfile

* fix: clean for userlist

* fix: change backup and restore to crdb native

* fix: timeout for delete pvc for cockroachdb

* fix: corrected unit tests

* fix: add ignored file for scale

* fix: correct handling of gitops in backup command

* feat: add s3 backup kind

* fix: backuplist for s3 and timeout for pv deletion

* fix(database): fix nil pointer with binary version

* fix(database): cleanup of errors which cam with merging of the s3 logic

* fix: correct unit tests

* fix: cleanup monitor output

Co-authored-by: Elio Bischof <eliobischof@gmail.com>

* fix: backup imagepullpolixy to ifnotpresent

Co-authored-by: Elio Bischof <eliobischof@gmail.com>
This commit is contained in:
Stefan Benz 2021-10-13 14:34:03 +02:00 committed by GitHub
parent 591a460450
commit 425a8b5fd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 3867 additions and 626 deletions

View File

@ -2,13 +2,11 @@ package cmds
import (
"fmt"
"github.com/caos/orbos/pkg/kubernetes/cli"
"sort"
"github.com/spf13/cobra"
"github.com/caos/orbos/pkg/kubernetes/cli"
"github.com/caos/zitadel/pkg/databases"
"github.com/spf13/cobra"
)
func BackupListCommand(getRv GetRootValues) *cobra.Command {

View File

@ -3,11 +3,11 @@ package cmds
import (
"errors"
"github.com/caos/zitadel/pkg/zitadel"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes/cli"
"github.com/caos/zitadel/operator/crtlcrd"
"github.com/caos/zitadel/operator/crtlgitops"
"github.com/caos/zitadel/pkg/databases"
"github.com/manifoldco/promptui"
@ -82,17 +82,17 @@ func RestoreCommand(getRv GetRootValues) *cobra.Command {
return mntr.ToUserError(errors.New("chosen backup is not existing"))
}
ensure := func() error { return nil }
if rv.Gitops {
if err := zitadel.GitOpsClearMigrateRestore(monitor, gitClient, orbConfig, k8sClient, backup, &version); err != nil {
return err
ensure = func() error {
return crtlgitops.Restore(monitor, gitClient, k8sClient, backup)
}
} else {
if err := zitadel.CrdClearMigrateRestore(monitor, k8sClient, backup, &version); err != nil {
return err
ensure = func() error {
return crtlcrd.Restore(monitor, k8sClient, backup)
}
}
return nil
return scaleForFunction(monitor, gitClient, orbConfig, k8sClient, &version, rv.Gitops, ensure)
}
return cmd
}

View File

@ -0,0 +1,85 @@
package cmds
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/git"
"github.com/caos/orbos/pkg/kubernetes"
orbconfig "github.com/caos/orbos/pkg/orb"
"github.com/caos/zitadel/operator/crtlcrd/zitadel"
"github.com/caos/zitadel/operator/crtlgitops"
kubernetes2 "github.com/caos/zitadel/pkg/kubernetes"
macherrs "k8s.io/apimachinery/pkg/api/errors"
)
func scaleForFunction(
monitor mntr.Monitor,
gitClient *git.Client,
orbCfg *orbconfig.Orb,
k8sClient *kubernetes.Client,
version *string,
gitops bool,
ensureFunc func() error,
) error {
noOperator := false
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 0); err != nil {
if macherrs.IsNotFound(err) {
noOperator = true
} else {
return err
}
}
noZitadel := false
if gitops {
noZitadelT, err := crtlgitops.ScaleDown(monitor, gitClient, k8sClient, orbCfg, version, gitops)
if err != nil {
return err
}
noZitadel = noZitadelT
} else {
noZitadelT, err := zitadel.ScaleDown(monitor, k8sClient, version)
if err != nil {
return err
}
noZitadel = noZitadelT
}
noDatabase := false
if err := kubernetes2.ScaleDatabaseOperator(monitor, k8sClient, 0); err != nil {
if macherrs.IsNotFound(err) {
noDatabase = true
} else {
return err
}
}
if err := ensureFunc(); err != nil {
return err
}
if !noDatabase {
if err := kubernetes2.ScaleDatabaseOperator(monitor, k8sClient, 1); err != nil {
return err
}
}
if !noZitadel {
if gitops {
if err := crtlgitops.ScaleUp(monitor, gitClient, k8sClient, orbCfg, version, gitops); err != nil {
return err
}
} else {
if err := zitadel.ScaleUp(monitor, k8sClient, version); err != nil {
return err
}
}
}
if !noOperator {
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 1); err != nil {
return err
}
}
return nil
}

5
go.mod
View File

@ -15,6 +15,11 @@ require (
github.com/VictoriaMetrics/fastcache v1.7.0
github.com/ajstarks/svgo v0.0.0-20210406150507-75cfd577ce75
github.com/allegro/bigcache v1.2.1
github.com/aws/aws-sdk-go-v2 v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.3.1 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.10.0 // indirect
github.com/boombuler/barcode v1.0.1
github.com/caos/logging v0.0.2
github.com/caos/oidc v0.15.10

379
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ func (s *Step4) Step() domain.Step {
func (s *Step4) execute(ctx context.Context, commandSide *Commands) error {
return commandSide.SetupStep4(ctx, s)
}
//This step should not be executed when a new instance is setup, because its not used anymore
//SetupStep4 is no op in favour of step 18.
//Password lockout policy is replaced by lockout policy

View File

@ -3,28 +3,23 @@ package crtlcrd
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/api/database"
orbdb "github.com/caos/zitadel/operator/database/kinds/orb"
"github.com/caos/zitadel/pkg/databases"
)
func Restore(monitor mntr.Monitor, k8sClient *kubernetes.Client, backup string, binaryVersion *string) error {
desired, err := database.ReadCrd(k8sClient)
if err != nil {
func Restore(
monitor mntr.Monitor,
k8sClient *kubernetes.Client,
backup string,
) error {
if err := databases.CrdClear(monitor, k8sClient); err != nil {
return err
}
query, _, _, _, _, _, err := orbdb.AdaptFunc(backup, binaryVersion, false, "restore")(monitor, desired, &tree.Tree{})
if err != nil {
return err
}
ensure, err := query(k8sClient, map[string]interface{}{})
if err != nil {
return err
}
if err := ensure(k8sClient); err != nil {
if err := databases.CrdRestore(
monitor,
k8sClient,
backup,
); err != nil {
return err
}

View File

@ -0,0 +1,32 @@
package zitadel
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator/zitadel/kinds/orb"
macherrs "k8s.io/apimachinery/pkg/api/errors"
)
func ScaleDown(
monitor mntr.Monitor,
k8sClient *kubernetes.Client,
version *string,
) (bool, error) {
noZitadel := false
if err := Takeoff(monitor, k8sClient, orb.AdaptFunc(nil, "scaledown", version, false, []string{"scaledown"})); err != nil {
if macherrs.IsNotFound(err) {
noZitadel = true
} else {
return noZitadel, err
}
}
return noZitadel, nil
}
func ScaleUp(
monitor mntr.Monitor,
k8sClient *kubernetes.Client,
version *string,
) error {
return Takeoff(monitor, k8sClient, orb.AdaptFunc(nil, "scaleup", version, false, []string{"scaleup"}))
}

View File

@ -3,6 +3,7 @@ package zitadel
import (
"context"
"fmt"
"github.com/caos/zitadel/operator"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
@ -35,31 +36,40 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.
return res, fmt.Errorf("resource must be named %s and namespaced in %s", zitadel.Name, zitadel.Namespace)
}
desired, err := zitadel.ReadCrd(r.ClientInt)
if err != nil {
return res, err
}
query, _, _, _, _, _, err := orbz.AdaptFunc(nil, "ensure", &r.Version, false, []string{"operator", "iam"})(internalMonitor, desired, &tree.Tree{})
if err != nil {
internalMonitor.Error(err)
return res, err
}
ensure, err := query(r.ClientInt, map[string]interface{}{})
if err != nil {
internalMonitor.Error(err)
return res, err
}
if err := ensure(r.ClientInt); err != nil {
internalMonitor.Error(err)
if err := Takeoff(internalMonitor, r.ClientInt, orbz.AdaptFunc(nil, "ensure", &r.Version, false, []string{"operator", "iam"})); err != nil {
return res, err
}
return res, nil
}
func Takeoff(
monitor mntr.Monitor,
k8sClient kubernetes.ClientInt,
adaptFunc operator.AdaptFunc,
) error {
desired, err := zitadel.ReadCrd(k8sClient)
if err != nil {
return err
}
query, _, _, _, _, _, err := adaptFunc(monitor, desired, &tree.Tree{})
if err != nil {
return err
}
ensure, err := query(k8sClient, map[string]interface{}{})
if err != nil {
return err
}
if err := ensure(k8sClient); err != nil {
return err
}
return nil
}
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.Zitadel{}).

View File

@ -0,0 +1,29 @@
package crtlgitops
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/git"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/pkg/databases"
)
func Restore(
monitor mntr.Monitor,
gitClient *git.Client,
k8sClient *kubernetes.Client,
backup string,
) error {
if err := databases.GitOpsClear(monitor, k8sClient, gitClient); err != nil {
return err
}
if err := databases.GitOpsRestore(
monitor,
k8sClient,
gitClient,
backup,
); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,41 @@
package crtlgitops
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/git"
"github.com/caos/orbos/pkg/kubernetes"
orbconfig "github.com/caos/orbos/pkg/orb"
"github.com/caos/zitadel/operator/zitadel"
"github.com/caos/zitadel/operator/zitadel/kinds/orb"
macherrs "k8s.io/apimachinery/pkg/api/errors"
)
func ScaleDown(
monitor mntr.Monitor,
gitClient *git.Client,
k8sClient *kubernetes.Client,
orbCfg *orbconfig.Orb,
version *string,
gitops bool,
) (bool, error) {
noZitadel := false
if err := zitadel.Takeoff(monitor, gitClient, orb.AdaptFunc(orbCfg, "scaledown", version, gitops, []string{"scaledown"}), k8sClient)(); err != nil {
if macherrs.IsNotFound(err) {
noZitadel = true
} else {
return noZitadel, err
}
}
return noZitadel, nil
}
func ScaleUp(
monitor mntr.Monitor,
gitClient *git.Client,
k8sClient *kubernetes.Client,
orbCfg *orbconfig.Orb,
version *string,
gitops bool,
) error {
return zitadel.Takeoff(monitor, gitClient, orb.AdaptFunc(orbCfg, "scaleup", version, gitops, []string{"scaleup"}), k8sClient)()
}

View File

@ -3,8 +3,6 @@ package backups
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/labels"
@ -12,6 +10,8 @@ import (
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket"
"github.com/caos/zitadel/operator/database/kinds/backups/s3"
corev1 "k8s.io/api/core/v1"
)
func Adapt(
@ -26,6 +26,8 @@ func Adapt(
nodeselector map[string]string,
tolerations []corev1.Toleration,
version string,
dbURL string,
dbPort int32,
features []string,
customImageRegistry string,
) (
@ -54,6 +56,29 @@ func Adapt(
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
customImageRegistry,
)(monitor, desiredTree, currentTree)
case "databases.caos.ch/S3Backup":
return s3.AdaptFunc(
name,
namespace,
labels.MustForComponent(
labels.MustReplaceAPI(
labels.GetAPIFromComponent(componentLabels),
"S3Backup",
desiredTree.Common.Version(),
),
"backup"),
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
customImageRegistry,
)(monitor, desiredTree, currentTree)
@ -74,6 +99,8 @@ func GetBackupList(
switch desiredTree.Common.Kind {
case "databases.caos.ch/BucketBackup":
return bucket.BackupList()(monitor, k8sClient, name, desiredTree)
case "databases.caos.ch/S3Backup":
return s3.BackupList()(monitor, k8sClient, name, desiredTree)
default:
return nil, mntr.ToUserError(fmt.Errorf("unknown database kind %s", desiredTree.Common.Kind))
}

View File

@ -12,12 +12,9 @@ import (
secretpkg "github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/secret/read"
"github.com/caos/orbos/pkg/tree"
coreDB "github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/common"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
)
@ -35,6 +32,8 @@ func AdaptFunc(
nodeselector map[string]string,
tolerations []corev1.Toleration,
version string,
dbURL string,
dbPort int32,
features []string,
customImageRegistry string,
) operator.AdaptFunc {
@ -78,7 +77,6 @@ func AdaptFunc(
name,
namespace,
componentLabels,
[]string{},
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
@ -87,6 +85,8 @@ func AdaptFunc(
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
@ -99,7 +99,6 @@ func AdaptFunc(
name,
namespace,
componentLabels,
[]string{},
desiredKind.Spec.Bucket,
timestamp,
nodeselector,
@ -107,13 +106,15 @@ func AdaptFunc(
checkDBReady,
secretName,
secretKey,
dbURL,
dbPort,
image,
)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
_, destroyC, err := clean.AdaptFunc(
/*_, destroyC, err := clean.AdaptFunc(
monitor,
name,
namespace,
@ -129,7 +130,7 @@ func AdaptFunc(
)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
}*/
destroyers := make([]operator.DestroyFunc, 0)
for _, feature := range features {
@ -139,10 +140,10 @@ func AdaptFunc(
operator.ResourceDestroyToZitadelDestroy(destroyS),
destroyB,
)
case clean.Instant:
destroyers = append(destroyers,
destroyC,
)
/*case clean.Instant:
destroyers = append(destroyers,
destroyC,
)*/
case restore.Instant:
destroyers = append(destroyers,
destroyR,
@ -156,21 +157,6 @@ func AdaptFunc(
return nil, err
}
currentDB, err := coreDB.ParseQueriedForDatabase(queried)
if err != nil {
return nil, err
}
databases, err := currentDB.GetListDatabasesFunc()(k8sClient)
if err != nil {
databases = []string{}
}
users, err := currentDB.GetListUsersFunc()(k8sClient)
if err != nil {
users = []string{}
}
value, err := read.GetSecretValue(k8sClient, desiredKind.Spec.ServiceAccountJSON, desiredKind.Spec.ExistingServiceAccountJSON)
if err != nil {
return nil, err
@ -186,7 +172,6 @@ func AdaptFunc(
name,
namespace,
componentLabels,
databases,
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
@ -195,6 +180,8 @@ func AdaptFunc(
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
@ -207,7 +194,6 @@ func AdaptFunc(
name,
namespace,
componentLabels,
databases,
desiredKind.Spec.Bucket,
timestamp,
nodeselector,
@ -215,13 +201,15 @@ func AdaptFunc(
checkDBReady,
secretName,
secretKey,
dbURL,
dbPort,
image,
)
if err != nil {
return nil, err
}
queryC, _, err := clean.AdaptFunc(
/*queryC, _, err := clean.AdaptFunc(
monitor,
name,
namespace,
@ -237,43 +225,41 @@ func AdaptFunc(
)
if err != nil {
return nil, err
}
}*/
queriers := make([]operator.QueryFunc, 0)
cleanupQueries := make([]operator.QueryFunc, 0)
if databases != nil && len(databases) != 0 {
for _, feature := range features {
switch feature {
case backup.Normal:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryB,
)
case backup.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryB,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(backup.GetCleanupFunc(monitor, namespace, name)),
)
case clean.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryC,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(clean.GetCleanupFunc(monitor, namespace, name)),
)
case restore.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryR,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(restore.GetCleanupFunc(monitor, namespace, name)),
)
}
for _, feature := range features {
switch feature {
case backup.Normal:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryB,
)
case backup.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryB,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(backup.GetCleanupFunc(monitor, namespace, name)),
)
/*case clean.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryC,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(clean.GetCleanupFunc(monitor, namespace, name)),
)*/
case restore.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryR,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(restore.GetCleanupFunc(monitor, namespace, name)),
)
}
}

View File

@ -10,7 +10,6 @@ import (
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
@ -27,6 +26,8 @@ func TestBucket_Secrets(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs2"
dbURL := "testDB"
dbPort := int32(80)
kindVersion := "v0"
kind := "BucketBackup"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", kindVersion), "testComponent")
@ -67,6 +68,8 @@ func TestBucket_Secrets(t *testing.T) {
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
@ -88,6 +91,8 @@ func TestBucket_AdaptBackup(t *testing.T) {
features := []string{backup.Normal}
saJson := "testSA"
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
monitor := mntr.Monitor{}
@ -137,6 +142,8 @@ func TestBucket_AdaptBackup(t *testing.T) {
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
@ -163,6 +170,8 @@ func TestBucket_AdaptInstantBackup(t *testing.T) {
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
dbURL := "testDB"
dbPort := int32(80)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabels := map[string]string{
@ -209,6 +218,8 @@ func TestBucket_AdaptInstantBackup(t *testing.T) {
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
@ -254,6 +265,8 @@ func TestBucket_AdaptRestore(t *testing.T) {
backupName := "testName"
version := "testVersion"
saJson := "testSA"
dbURL := "testDB"
dbPort := int32(80)
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
@ -282,6 +295,8 @@ func TestBucket_AdaptRestore(t *testing.T) {
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
@ -299,6 +314,7 @@ func TestBucket_AdaptRestore(t *testing.T) {
assert.NoError(t, ensure(client))
}
/*
func TestBucket_AdaptClean(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
@ -327,6 +343,8 @@ func TestBucket_AdaptClean(t *testing.T) {
backupName := "testName"
version := "testVersion"
saJson := "testSA"
dbURL := "testDB"
dbPort := int32(80)
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
@ -355,6 +373,8 @@ func TestBucket_AdaptClean(t *testing.T) {
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
@ -371,4 +391,4 @@ func TestBucket_AdaptClean(t *testing.T) {
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
}*/

View File

@ -19,6 +19,7 @@ const (
secretPath = "/secrets/sa.json"
backupPath = "/cockroach"
backupNameEnv = "BACKUP_NAME"
saJsonBase64Env = "SAJSON"
cronJobNamePrefix = "backup-"
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
@ -32,7 +33,6 @@ func AdaptFunc(
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
checkDBReady operator.EnsureFunc,
bucketName string,
cron string,
@ -41,6 +41,8 @@ func AdaptFunc(
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
dbURL string,
dbPort int32,
features []string,
image string,
) (
@ -51,9 +53,12 @@ func AdaptFunc(
command := getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
jobSpecDef := getJobSpecDef(

View File

@ -21,7 +21,6 @@ func TestBackup_AdaptInstantBackup1(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
@ -32,6 +31,8 @@ func TestBackup_AdaptInstantBackup1(t *testing.T) {
image := "testImage"
secretKey := "testKey"
secretName := "testSecretName"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
@ -51,9 +52,12 @@ func TestBackup_AdaptInstantBackup1(t *testing.T) {
backupName,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
),
@ -67,7 +71,6 @@ func TestBackup_AdaptInstantBackup1(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
@ -76,6 +79,8 @@ func TestBackup_AdaptInstantBackup1(t *testing.T) {
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
@ -93,7 +98,8 @@ func TestBackup_AdaptInstantBackup2(t *testing.T) {
features := []string{Instant}
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb2"}
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
@ -123,9 +129,12 @@ func TestBackup_AdaptInstantBackup2(t *testing.T) {
backupName,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
),
@ -139,7 +148,6 @@ func TestBackup_AdaptInstantBackup2(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
@ -148,6 +156,8 @@ func TestBackup_AdaptInstantBackup2(t *testing.T) {
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
@ -165,10 +175,11 @@ func TestBackup_AdaptBackup1(t *testing.T) {
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
dbURL := "testDB"
dbPort := int32(80)
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
@ -196,9 +207,12 @@ func TestBackup_AdaptBackup1(t *testing.T) {
backupName,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
),
@ -211,7 +225,6 @@ func TestBackup_AdaptBackup1(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
@ -220,6 +233,8 @@ func TestBackup_AdaptBackup1(t *testing.T) {
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
@ -237,7 +252,8 @@ func TestBackup_AdaptBackup2(t *testing.T) {
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb2"}
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
@ -268,9 +284,12 @@ func TestBackup_AdaptBackup2(t *testing.T) {
backupName,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
),
@ -283,7 +302,6 @@ func TestBackup_AdaptBackup2(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
@ -292,6 +310,8 @@ func TestBackup_AdaptBackup2(t *testing.T) {
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)

View File

@ -23,7 +23,7 @@ func GetCleanupFunc(
if err := k8sClient.DeleteJob(namespace, GetJobName(backupName)); err != nil {
return fmt.Errorf("error while trying to cleanup backup: %w", err)
}
monitor.Info("restore backup is completed")
monitor.Info("cleanup backup is completed")
return nil
}
}

View File

@ -1,12 +1,18 @@
package backup
import "strings"
import (
"strconv"
"strings"
)
func getBackupCommand(
timestamp string,
databases []string,
bucketName string,
backupName string,
certsFolder string,
serviceAccountPath string,
dbURL string,
dbPort int32,
) string {
backupCommands := make([]string, 0)
@ -16,18 +22,20 @@ func getBackupCommand(
backupCommands = append(backupCommands, "export "+backupNameEnv+"=$(date +%Y-%m-%dT%H:%M:%SZ)")
}
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/backup.sh",
backupName,
bucketName,
database,
backupPath,
secretPath,
certPath,
"${" + backupNameEnv + "}",
}, " "))
}
backupCommands = append(backupCommands, "export "+saJsonBase64Env+"=$(cat "+serviceAccountPath+" | base64 | tr -d '\n' )")
backupCommands = append(backupCommands,
strings.Join([]string{
"cockroach",
"sql",
"--certs-dir=" + certsFolder,
"--host=" + dbURL,
"--port=" + strconv.Itoa(int(dbPort)),
"-e",
"\"BACKUP TO \\\"gs://" + bucketName + "/" + backupName + "/${" + backupNameEnv + "}?AUTH=specified&CREDENTIALS=${" + saJsonBase64Env + "}\\\";\"",
}, " ",
),
)
return strings.Join(backupCommands, " && ")
}

View File

@ -7,47 +7,40 @@ import (
func TestBackup_Command1(t *testing.T) {
timestamp := ""
databases := []string{}
bucketName := "test"
backupName := "test"
dbURL := "testDB"
dbPort := int32(80)
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ)"
cmd := getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ) && export SAJSON=$(cat /secrets/sa.json | base64 | tr -d '\n' ) && cockroach sql --certs-dir=/cockroach/cockroach-certs --host=testDB --port=80 -e \"BACKUP TO \\\"gs://test/test/${BACKUP_NAME}?AUTH=specified&CREDENTIALS=${SAJSON}\\\";\""
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := "test"
databases := []string{}
bucketName := "test"
backupName := "test"
dbURL := "testDB"
dbPort := int32(80)
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=test"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command3(t *testing.T) {
timestamp := ""
databases := []string{"testDb"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ) && /scripts/backup.sh testBackup testBucket testDb " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "}"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command4(t *testing.T) {
timestamp := "test"
databases := []string{"test1", "test2", "test3"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=test && " +
"/scripts/backup.sh testBackup testBucket test1 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "} && " +
"/scripts/backup.sh testBackup testBucket test2 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "} && " +
"/scripts/backup.sh testBackup testBucket test3 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "}"
cmd := getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
equals := "export " + backupNameEnv + "=test && export SAJSON=$(cat /secrets/sa.json | base64 | tr -d '\n' ) && cockroach sql --certs-dir=/cockroach/cockroach-certs --host=testDB --port=80 -e \"BACKUP TO \\\"gs://test/test/${BACKUP_NAME}?AUTH=specified&CREDENTIALS=${SAJSON}\\\";\""
assert.Equal(t, equals, cmd)
}

View File

@ -77,7 +77,7 @@ func getJobSpecDef(
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,

View File

@ -41,7 +41,7 @@ func TestBackup_JobSpec1(t *testing.T) {
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
@ -98,7 +98,7 @@ func TestBackup_JobSpec2(t *testing.T) {
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,

View File

@ -1,33 +0,0 @@
package clean
import "strings"
func getCommand(
databases []string,
users []string,
) string {
backupCommands := make([]string, 0)
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-db.sh",
certPath,
database,
}, " "))
}
for _, user := range users {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-user.sh",
certPath,
user,
}, " "))
}
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-migration.sh",
certPath,
}, " "))
return strings.Join(backupCommands, " && ")
}

View File

@ -1,38 +0,0 @@
package clean
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestClean_Command1(t *testing.T) {
databases := []string{}
users := []string{}
cmd := getCommand(databases, users)
equals := "/scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}
func TestClean_Command2(t *testing.T) {
databases := []string{"test"}
users := []string{"test"}
cmd := getCommand(databases, users)
equals := "/scripts/clean-db.sh " + certPath + " test && /scripts/clean-user.sh " + certPath + " test && /scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}
func TestClean_Command3(t *testing.T) {
databases := []string{"test1", "test2", "test3"}
users := []string{"test1", "test2", "test3"}
cmd := getCommand(databases, users)
equals := "/scripts/clean-db.sh " + certPath + " test1 && /scripts/clean-db.sh " + certPath + " test2 && /scripts/clean-db.sh " + certPath + " test3 && " +
"/scripts/clean-user.sh " + certPath + " test1 && /scripts/clean-user.sh " + certPath + " test2 && /scripts/clean-user.sh " + certPath + " test3 && " +
"/scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}

View File

@ -60,6 +60,9 @@ func listFilesWithFilter(serviceAccountJSON string, bucketName, name string) ([]
parts := strings.Split(attrs.Name, "/")
found := false
for _, name := range names {
if len(parts) < 2 {
continue
}
if name == parts[1] {
found = true
}

View File

@ -3,7 +3,6 @@ package bucket
import (
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/golang/mock/gomock"
@ -61,29 +60,6 @@ func SetBackup(
k8sClient.EXPECT().ApplyCronJob(gomock.Any()).Times(1).Return(nil)
}
func SetClean(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
labels map[string]string,
saJson string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
Labels: labels,
},
StringData: map[string]string{secretKey: saJson},
Type: "Opaque",
}).Times(1).Return(nil)
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, clean.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, clean.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil)
}
func SetRestore(
k8sClient *kubernetesmock.MockClientInt,
namespace string,

View File

@ -22,6 +22,7 @@ const (
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
timeout = 45 * time.Minute
saJsonBase64Env = "SAJSON"
)
func AdaptFunc(
@ -29,7 +30,6 @@ func AdaptFunc(
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
bucketName string,
timestamp string,
nodeselector map[string]string,
@ -37,6 +37,8 @@ func AdaptFunc(
checkDBReady operator.EnsureFunc,
secretName string,
secretKey string,
dbURL string,
dbPort int32,
image string,
) (
queryFunc operator.QueryFunc,
@ -47,9 +49,12 @@ func AdaptFunc(
jobName := jobPrefix + backupName + jobSuffix
command := getCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
jobdef := getJob(
@ -60,7 +65,8 @@ func AdaptFunc(
secretName,
secretKey,
command,
image)
image,
)
destroyJ, err := job.AdaptFuncToDestroy(jobName, namespace)
if err != nil {

View File

@ -19,7 +19,6 @@ func TestBackup_Adapt1(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
@ -29,6 +28,8 @@ func TestBackup_Adapt1(t *testing.T) {
image := "testImage2"
secretKey := "testKey"
secretName := "testSecretName"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
@ -46,9 +47,12 @@ func TestBackup_Adapt1(t *testing.T) {
secretKey,
getCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
)
@ -61,7 +65,6 @@ func TestBackup_Adapt1(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
bucketName,
timestamp,
nodeselector,
@ -69,6 +72,8 @@ func TestBackup_Adapt1(t *testing.T) {
checkDBReady,
secretName,
secretKey,
dbURL,
dbPort,
image,
)
@ -84,7 +89,6 @@ func TestBackup_Adapt2(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb1", "testDb2"}
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
@ -94,6 +98,8 @@ func TestBackup_Adapt2(t *testing.T) {
image := "testImage2"
secretKey := "testKey2"
secretName := "testSecretName2"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
@ -111,9 +117,12 @@ func TestBackup_Adapt2(t *testing.T) {
secretKey,
getCommand(
timestamp,
databases,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
),
image,
)
@ -126,7 +135,6 @@ func TestBackup_Adapt2(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
bucketName,
timestamp,
nodeselector,
@ -134,6 +142,8 @@ func TestBackup_Adapt2(t *testing.T) {
checkDBReady,
secretName,
secretKey,
dbURL,
dbPort,
image,
)

View File

@ -1,28 +1,36 @@
package restore
import "strings"
import (
"strconv"
"strings"
)
func getCommand(
timestamp string,
databases []string,
bucketName string,
backupName string,
certsFolder string,
serviceAccountPath string,
dbURL string,
dbPort int32,
) string {
backupCommands := make([]string, 0)
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/restore.sh",
bucketName,
backupName,
timestamp,
database,
secretPath,
certPath,
}, " "))
}
backupCommands = append(backupCommands, "export "+saJsonBase64Env+"=$(cat "+serviceAccountPath+" | base64 | tr -d '\n' )")
backupCommands = append(backupCommands,
strings.Join([]string{
"cockroach",
"sql",
"--certs-dir=" + certsFolder,
"--host=" + dbURL,
"--port=" + strconv.Itoa(int(dbPort)),
"-e",
"\"RESTORE FROM \\\"gs://" + bucketName + "/" + backupName + "/" + timestamp + "?AUTH=specified&CREDENTIALS=${" + saJsonBase64Env + "}\\\";\"",
}, " ",
),
)
return strings.Join(backupCommands, " && ")
}

View File

@ -6,45 +6,42 @@ import (
)
func TestBackup_Command1(t *testing.T) {
timestamp := ""
databases := []string{}
timestamp := "test1"
bucketName := "testBucket"
backupName := "testBackup"
dbURL := "testDB"
dbPort := int32(80)
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := ""
cmd := getCommand(
timestamp,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
equals := "export SAJSON=$(cat /secrets/sa.json | base64 | tr -d '\n' ) && cockroach sql --certs-dir=/cockroach/cockroach-certs --host=testDB --port=80 -e \"RESTORE FROM \\\"gs://testBucket/testBackup/test1?AUTH=specified&CREDENTIALS=${SAJSON}\\\";\""
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := ""
databases := []string{"testDb"}
timestamp := "test2"
bucketName := "testBucket"
backupName := "testBackup"
dbURL := "testDB2"
dbPort := int32(81)
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := "/scripts/restore.sh testBucket testBackup testDb /secrets/sa.json /cockroach/cockroach-certs"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command3(t *testing.T) {
timestamp := "test"
databases := []string{"testDb"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := "/scripts/restore.sh testBucket testBackup test testDb /secrets/sa.json /cockroach/cockroach-certs"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command4(t *testing.T) {
timestamp := ""
databases := []string{}
bucketName := "test"
backupName := "test"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := ""
cmd := getCommand(
timestamp,
bucketName,
backupName,
certPath,
secretPath,
dbURL,
dbPort,
)
equals := "export SAJSON=$(cat /secrets/sa.json | base64 | tr -d '\n' ) && cockroach sql --certs-dir=/cockroach/cockroach-certs --host=testDB2 --port=81 -e \"RESTORE FROM \\\"gs://testBucket/testBackup/test2?AUTH=specified&CREDENTIALS=${SAJSON}\\\";\""
assert.Equal(t, equals, cmd)
}

View File

@ -46,7 +46,7 @@ func getJob(
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,

View File

@ -61,7 +61,7 @@ func TestBackup_Job1(t *testing.T) {
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
@ -137,7 +137,7 @@ func TestBackup_Job2(t *testing.T) {
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,

View File

@ -0,0 +1,296 @@
package s3
import (
"fmt"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/secret"
"github.com/caos/orbos/pkg/labels"
secretpkg "github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/secret/read"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/common"
"github.com/caos/zitadel/operator/database/kinds/backups/s3/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/s3/restore"
corev1 "k8s.io/api/core/v1"
)
const (
accessKeyIDName = "backup-accessaccountkey"
accessKeyIDKey = "accessaccountkey"
secretAccessKeyName = "backup-secretaccesskey"
secretAccessKeyKey = "secretaccesskey"
sessionTokenName = "backup-sessiontoken"
sessionTokenKey = "sessiontoken"
)
func AdaptFunc(
name string,
namespace string,
componentLabels *labels.Component,
checkDBReady operator.EnsureFunc,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
version string,
dbURL string,
dbPort int32,
features []string,
customImageRegistry string,
) operator.AdaptFunc {
return func(
monitor mntr.Monitor,
desired *tree.Tree,
current *tree.Tree,
) (
operator.QueryFunc,
operator.DestroyFunc,
operator.ConfigureFunc,
map[string]*secretpkg.Secret,
map[string]*secretpkg.Existing,
bool,
error,
) {
internalMonitor := monitor.WithField("component", "backup")
desiredKind, err := ParseDesiredV0(desired)
if err != nil {
return nil, nil, nil, nil, nil, false, fmt.Errorf("parsing desired state failed: %s", err)
}
desired.Parsed = desiredKind
secrets, existing := getSecretsMap(desiredKind)
if !monitor.IsVerbose() && desiredKind.Spec.Verbose {
internalMonitor.Verbose()
}
destroySAKI, err := secret.AdaptFuncToDestroy(namespace, accessKeyIDName)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
destroySSAK, err := secret.AdaptFuncToDestroy(namespace, secretAccessKeyName)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
destroySSTK, err := secret.AdaptFuncToDestroy(namespace, sessionTokenName)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
image := common.BackupImage.Reference(customImageRegistry, version)
_, destroyB, err := backup.AdaptFunc(
internalMonitor,
name,
namespace,
componentLabels,
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
desiredKind.Spec.Region,
desiredKind.Spec.Endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
_, destroyR, err := restore.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
desiredKind.Spec.Bucket,
timestamp,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
desiredKind.Spec.Region,
desiredKind.Spec.Endpoint,
nodeselector,
tolerations,
checkDBReady,
dbURL,
dbPort,
image,
)
if err != nil {
return nil, nil, nil, nil, nil, false, err
}
destroyers := make([]operator.DestroyFunc, 0)
for _, feature := range features {
switch feature {
case backup.Normal, backup.Instant:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroySSAK),
operator.ResourceDestroyToZitadelDestroy(destroySAKI),
operator.ResourceDestroyToZitadelDestroy(destroySSTK),
destroyB,
)
case restore.Instant:
destroyers = append(destroyers,
destroyR,
)
}
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
if err := desiredKind.validateSecrets(); err != nil {
return nil, err
}
valueAKI, err := read.GetSecretValue(k8sClient, desiredKind.Spec.AccessKeyID, desiredKind.Spec.ExistingAccessKeyID)
if err != nil {
return nil, err
}
querySAKI, err := secret.AdaptFuncToEnsure(namespace, labels.MustForName(componentLabels, accessKeyIDName), map[string]string{accessKeyIDKey: valueAKI})
if err != nil {
return nil, err
}
valueSAK, err := read.GetSecretValue(k8sClient, desiredKind.Spec.SecretAccessKey, desiredKind.Spec.ExistingSecretAccessKey)
if err != nil {
return nil, err
}
querySSAK, err := secret.AdaptFuncToEnsure(namespace, labels.MustForName(componentLabels, secretAccessKeyName), map[string]string{secretAccessKeyKey: valueSAK})
if err != nil {
return nil, err
}
valueST, err := read.GetSecretValue(k8sClient, desiredKind.Spec.SessionToken, desiredKind.Spec.ExistingSessionToken)
if err != nil {
return nil, err
}
querySST, err := secret.AdaptFuncToEnsure(namespace, labels.MustForName(componentLabels, sessionTokenName), map[string]string{sessionTokenKey: valueST})
if err != nil {
return nil, err
}
queryB, _, err := backup.AdaptFunc(
internalMonitor,
name,
namespace,
componentLabels,
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
desiredKind.Spec.Region,
desiredKind.Spec.Endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
image,
)
if err != nil {
return nil, err
}
queryR, _, err := restore.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
desiredKind.Spec.Bucket,
timestamp,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
desiredKind.Spec.Region,
desiredKind.Spec.Endpoint,
nodeselector,
tolerations,
checkDBReady,
dbURL,
dbPort,
image,
)
if err != nil {
return nil, err
}
queriers := make([]operator.QueryFunc, 0)
cleanupQueries := make([]operator.QueryFunc, 0)
for _, feature := range features {
switch feature {
case backup.Normal:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(querySAKI),
operator.ResourceQueryToZitadelQuery(querySSAK),
operator.ResourceQueryToZitadelQuery(querySST),
queryB,
)
case backup.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(querySAKI),
operator.ResourceQueryToZitadelQuery(querySSAK),
operator.ResourceQueryToZitadelQuery(querySST),
queryB,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(backup.GetCleanupFunc(monitor, namespace, name)),
)
case restore.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(querySAKI),
operator.ResourceQueryToZitadelQuery(querySSAK),
operator.ResourceQueryToZitadelQuery(querySST),
queryR,
)
cleanupQueries = append(cleanupQueries,
operator.EnsureFuncToQueryFunc(restore.GetCleanupFunc(monitor, namespace, name)),
)
}
}
for _, cleanup := range cleanupQueries {
queriers = append(queriers, cleanup)
}
return operator.QueriersToEnsureFunc(internalMonitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(internalMonitor, destroyers),
func(kubernetes.ClientInt, map[string]interface{}, bool) error { return nil },
secrets,
existing,
false,
nil
}
}

View File

@ -0,0 +1,500 @@
package s3
import (
"testing"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)
func TestBucket_Secrets(t *testing.T) {
masterkey := "testMk"
features := []string{backup.Normal}
region := "testRegion"
endpoint := "testEndpoint"
akid := "testAKID"
sak := "testSAK"
st := "testST"
bucketName := "testBucket2"
cron := "testCron2"
monitor := mntr.Monitor{}
namespace := "testNs2"
dbURL := "testDB"
dbPort := int32(80)
kindVersion := "v0"
kind := "BucketBackup"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", kindVersion), "testComponent")
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/"+kind, kindVersion, false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Endpoint: endpoint,
Region: region,
AccessKeyID: &secret.Secret{
Value: akid,
},
SecretAccessKey: &secret.Secret{
Value: sak,
},
SessionToken: &secret.Secret{
Value: st,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
allSecrets := map[string]string{
"accesskeyid": "testAKID",
"secretaccesskey": "testSAK",
"sessiontoken": "testST",
}
_, _, _, secrets, existing, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
for key, value := range allSecrets {
assert.Contains(t, secrets, key)
assert.Contains(t, existing, key)
assert.Equal(t, value, secrets[key].Value)
}
}
func TestBucket_AdaptBackup(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{backup.Normal}
region := "testRegion"
endpoint := "testEndpoint"
akid := "testAKID"
sak := "testSAK"
st := "testST"
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
monitor := mntr.Monitor{}
namespace := "testNs2"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabelsAKID := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": accessKeyIDName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsSAK := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": secretAccessKeyName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsST := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": sessionTokenName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Endpoint: endpoint,
Region: region,
AccessKeyID: &secret.Secret{
Value: akid,
},
SecretAccessKey: &secret.Secret{
Value: sak,
},
SessionToken: &secret.Secret{
Value: st,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetBackup(client, namespace, k8sLabelsAKID, k8sLabelsSAK, k8sLabelsST, akid, sak, st)
query, _, _, _, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases, []string{})
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NotNil(t, ensure)
assert.NoError(t, ensure(client))
}
func TestBucket_AdaptInstantBackup(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{backup.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
dbURL := "testDB"
dbPort := int32(80)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabelsAKID := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": accessKeyIDName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsSAK := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": secretAccessKeyName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsST := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": sessionTokenName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
region := "testRegion"
endpoint := "testEndpoint"
akid := "testAKID"
sak := "testSAK"
st := "testST"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Endpoint: endpoint,
Region: region,
AccessKeyID: &secret.Secret{
Value: akid,
},
SecretAccessKey: &secret.Secret{
Value: sak,
},
SessionToken: &secret.Secret{
Value: st,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetInstantBackup(client, namespace, backupName, k8sLabelsAKID, k8sLabelsSAK, k8sLabelsST, akid, sak, st)
query, _, _, _, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases, []string{})
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBucket_AdaptRestore(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{restore.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabelsAKID := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": accessKeyIDName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsSAK := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": secretAccessKeyName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
k8sLabelsST := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": sessionTokenName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
region := "testRegion"
endpoint := "testEndpoint"
akid := "testAKID"
sak := "testSAK"
st := "testST"
dbURL := "testDB"
dbPort := int32(80)
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Endpoint: endpoint,
Region: region,
AccessKeyID: &secret.Secret{
Value: akid,
},
SecretAccessKey: &secret.Secret{
Value: sak,
},
SessionToken: &secret.Secret{
Value: st,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetRestore(client, namespace, backupName, k8sLabelsAKID, k8sLabelsSAK, k8sLabelsST, akid, sak, st)
query, _, _, _, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
"",
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases, []string{})
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
/*
func TestBucket_AdaptClean(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{clean.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": "backup-serviceaccountjson",
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
saJson := "testSA"
dbURL := "testDB"
dbPort := int32(80)
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetClean(client, namespace, backupName, k8sLabels, saJson)
query, _, _, _, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
dbURL,
dbPort,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
users := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases, users)
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}*/

View File

@ -0,0 +1,154 @@
package backup
import (
"time"
"github.com/caos/zitadel/operator"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/cronjob"
"github.com/caos/orbos/pkg/kubernetes/resources/job"
"github.com/caos/orbos/pkg/labels"
corev1 "k8s.io/api/core/v1"
)
const (
defaultMode int32 = 256
certPath = "/cockroach/cockroach-certs"
accessKeyIDPath = "/secrets/accessaccountkey"
secretAccessKeyPath = "/secrets/secretaccesskey"
sessionTokenPath = "/secrets/sessiontoken"
backupNameEnv = "BACKUP_NAME"
cronJobNamePrefix = "backup-"
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
timeout = 15 * time.Minute
Normal = "backup"
Instant = "instantbackup"
)
func AdaptFunc(
monitor mntr.Monitor,
backupName string,
namespace string,
componentLabels *labels.Component,
checkDBReady operator.EnsureFunc,
bucketName string,
cron string,
accessKeyIDName string,
accessKeyIDKey string,
secretAccessKeyName string,
secretAccessKeyKey string,
sessionTokenName string,
sessionTokenKey string,
region string,
endpoint string,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
dbURL string,
dbPort int32,
features []string,
image string,
) (
queryFunc operator.QueryFunc,
destroyFunc operator.DestroyFunc,
err error,
) {
command := getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
jobSpecDef := getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
image,
command,
)
destroyers := []operator.DestroyFunc{}
queriers := []operator.QueryFunc{}
cronJobDef := getCronJob(
namespace,
labels.MustForName(componentLabels, GetJobName(backupName)),
cron,
jobSpecDef,
)
destroyCJ, err := cronjob.AdaptFuncToDestroy(cronJobDef.Namespace, cronJobDef.Name)
if err != nil {
return nil, nil, err
}
queryCJ, err := cronjob.AdaptFuncToEnsure(cronJobDef)
if err != nil {
return nil, nil, err
}
jobDef := getJob(
namespace,
labels.MustForName(componentLabels, cronJobNamePrefix+backupName),
jobSpecDef,
)
destroyJ, err := job.AdaptFuncToDestroy(jobDef.Namespace, jobDef.Name)
if err != nil {
return nil, nil, err
}
queryJ, err := job.AdaptFuncToEnsure(jobDef)
if err != nil {
return nil, nil, err
}
for _, feature := range features {
switch feature {
case Normal:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroyCJ),
)
queriers = append(queriers,
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryCJ),
)
case Instant:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroyJ),
)
queriers = append(queriers,
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryJ),
)
}
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
return operator.QueriersToEnsureFunc(monitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(monitor, destroyers),
nil
}
func GetJobName(backupName string) string {
return cronJobNamePrefix + backupName
}

View File

@ -0,0 +1,404 @@
package backup
import (
"testing"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func TestBackup_AdaptInstantBackup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Instant}
monitor := mntr.Monitor{}
namespace := "testNs"
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
accessKeyIDName := "testAKIN"
accessKeyIDKey := "testAKIK"
secretAccessKeyName := "testSAKN"
secretAccessKeyKey := "testSAKK"
sessionTokenName := "testSTN"
sessionTokenKey := "testSTK"
region := "region"
endpoint := "endpoint"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
version,
getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
checkDBReady,
bucketName,
cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptInstantBackup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Instant}
monitor := mntr.Monitor{}
namespace := "testNs2"
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
accessKeyIDName := "testAKIN2"
accessKeyIDKey := "testAKIK2"
secretAccessKeyName := "testSAKN2"
secretAccessKeyKey := "testSAKK2"
sessionTokenName := "testSTN2"
sessionTokenKey := "testSTK2"
region := "region2"
endpoint := "endpoint2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
version,
getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
checkDBReady,
bucketName,
cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptBackup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs"
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
dbURL := "testDB"
dbPort := int32(80)
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
accessKeyIDName := "testAKIN"
accessKeyIDKey := "testAKIK"
secretAccessKeyName := "testSAKN"
secretAccessKeyKey := "testSAKK"
sessionTokenName := "testSTN"
sessionTokenKey := "testSTK"
region := "region"
endpoint := "endpoint"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getCronJob(
namespace,
nameLabels,
cron,
getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
version,
getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
),
)
client.EXPECT().ApplyCronJob(jobDef).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
checkDBReady,
bucketName,
cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptBackup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs2"
dbURL := "testDB"
dbPort := int32(80)
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
accessKeyIDName := "testAKIN2"
accessKeyIDKey := "testAKIK2"
secretAccessKeyName := "testSAKN2"
secretAccessKeyKey := "testSAKK2"
sessionTokenName := "testSTN2"
sessionTokenKey := "testSTK2"
region := "region2"
endpoint := "endpoint2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getCronJob(
namespace,
nameLabels,
cron,
getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
version,
getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
),
)
client.EXPECT().ApplyCronJob(jobDef).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
checkDBReady,
bucketName,
cron,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
timestamp,
nodeselector,
tolerations,
dbURL,
dbPort,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}

View File

@ -1,11 +1,9 @@
package clean
package backup
import (
"fmt"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator"
)
@ -15,15 +13,15 @@ func GetCleanupFunc(
backupName string,
) operator.EnsureFunc {
return func(k8sClient kubernetes.ClientInt) error {
monitor.Info("waiting for clean to be completed")
monitor.Info("waiting for backup to be completed")
if err := k8sClient.WaitUntilJobCompleted(namespace, GetJobName(backupName), timeout); err != nil {
return fmt.Errorf("error while waiting for clean to be completed: %w", err)
return fmt.Errorf("error while waiting for backup to be completed: %w", err)
}
monitor.Info("clean is completed, cleanup")
monitor.Info("backup is completed, cleanup")
if err := k8sClient.DeleteJob(namespace, GetJobName(backupName)); err != nil {
return fmt.Errorf("error while trying to cleanup clean: %w", err)
return fmt.Errorf("error while trying to cleanup backup: %w", err)
}
monitor.Info("clean cleanup is completed")
monitor.Info("cleanup backup is completed")
return nil
}
}

View File

@ -0,0 +1,41 @@
package backup
import (
"fmt"
"testing"
"github.com/caos/orbos/mntr"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
func TestBackup_Cleanup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test"
namespace := "testNs"
cleanupFunc := GetCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, GetJobName(name), timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, GetJobName(name)).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, GetJobName(name), timeout).Times(1).Return(fmt.Errorf("fail"))
assert.Error(t, cleanupFunc(client))
}
func TestBackup_Cleanup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test2"
namespace := "testNs2"
cleanupFunc := GetCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, GetJobName(name), timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, GetJobName(name)).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, GetJobName(name), timeout).Times(1).Return(fmt.Errorf("fail"))
assert.Error(t, cleanupFunc(client))
}

View File

@ -0,0 +1,53 @@
package backup
import (
"strconv"
"strings"
)
func getBackupCommand(
timestamp string,
bucketName string,
backupName string,
certsFolder string,
accessKeyIDPath string,
secretAccessKeyPath string,
sessionTokenPath string,
region string,
endpoint string,
dbURL string,
dbPort int32,
) string {
backupCommands := make([]string, 0)
if timestamp != "" {
backupCommands = append(backupCommands, "export "+backupNameEnv+"="+timestamp)
} else {
backupCommands = append(backupCommands, "export "+backupNameEnv+"=$(date +%Y-%m-%dT%H:%M:%SZ)")
}
parameters := []string{
"AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")",
"AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")",
"AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")",
"AWS_ENDPOINT=" + endpoint,
}
if region != "" {
parameters = append(parameters, "AWS_REGION="+region)
}
backupCommands = append(backupCommands,
strings.Join([]string{
"cockroach",
"sql",
"--certs-dir=" + certsFolder,
"--host=" + dbURL,
"--port=" + strconv.Itoa(int(dbPort)),
"-e",
"\"BACKUP TO \\\"s3://" + bucketName + "/" + backupName + "/${" + backupNameEnv + "}?" + strings.Join(parameters, "&") + "\\\";\"",
}, " ",
),
)
return strings.Join(backupCommands, " && ")
}

View File

@ -0,0 +1,58 @@
package backup
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Command1(t *testing.T) {
timestamp := ""
bucketName := "test"
backupName := "test"
dbURL := "testDB"
dbPort := int32(80)
region := "region"
endpoint := "endpoint"
cmd := getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ) && cockroach sql --certs-dir=" + certPath + " --host=testDB --port=80 -e \"BACKUP TO \\\"s3://test/test/${BACKUP_NAME}?AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")&AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")&AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")&AWS_ENDPOINT=endpoint&AWS_REGION=region\\\";\""
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := "test"
bucketName := "test"
backupName := "test"
dbURL := "testDB"
dbPort := int32(80)
region := "region"
endpoint := "endpoint"
cmd := getBackupCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
equals := "export " + backupNameEnv + "=test && cockroach sql --certs-dir=" + certPath + " --host=testDB --port=80 -e \"BACKUP TO \\\"s3://test/test/${BACKUP_NAME}?AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")&AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")&AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")&AWS_ENDPOINT=endpoint&AWS_REGION=region\\\";\""
assert.Equal(t, equals, cmd)
}

View File

@ -0,0 +1,127 @@
package backup
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getCronJob(
namespace string,
nameLabels *labels.Name,
cron string,
jobSpecDef batchv1.JobSpec,
) *v1beta1.CronJob {
return &v1beta1.CronJob{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: v1beta1.CronJobSpec{
Schedule: cron,
ConcurrencyPolicy: v1beta1.ForbidConcurrent,
JobTemplate: v1beta1.JobTemplateSpec{
Spec: jobSpecDef,
},
},
}
}
func getJob(
namespace string,
nameLabels *labels.Name,
jobSpecDef batchv1.JobSpec,
) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: jobSpecDef,
}
}
func getJobSpecDef(
nodeselector map[string]string,
tolerations []corev1.Toleration,
accessKeyIDName string,
accessKeyIDKey string,
secretAccessKeyName string,
secretAccessKeyKey string,
sessionTokenName string,
sessionTokenKey string,
backupName string,
image string,
command string,
) batchv1.JobSpec {
return batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: image,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
},
},
}
}

View File

@ -0,0 +1,199 @@
package backup
import (
"github.com/caos/zitadel/operator/common"
"github.com/caos/zitadel/operator/helpers"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"testing"
)
func TestBackup_JobSpec1(t *testing.T) {
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
command := "test"
accessKeyIDName := "testAKIN"
accessKeyIDKey := "testAKIK"
secretAccessKeyName := "testSAKN"
secretAccessKeyKey := "testSAKK"
sessionTokenName := "testSTN"
sessionTokenKey := "testSTK"
equals := batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: common.BackupImage.Reference("", version),
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
},
},
}
assert.Equal(t, equals, getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
common.BackupImage.Reference("", version),
command))
}
func TestBackup_JobSpec2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
command := "test2"
accessKeyIDName := "testAKIN2"
accessKeyIDKey := "testAKIK2"
secretAccessKeyName := "testSAKN2"
secretAccessKeyKey := "testSAKK2"
sessionTokenName := "testSTN2"
sessionTokenKey := "testSTK2"
equals := batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: common.BackupImage.Reference("", version),
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
},
},
}
assert.Equal(t, equals, getJobSpecDef(
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
backupName,
common.BackupImage.Reference("", version),
command,
))
}

View File

@ -0,0 +1,65 @@
package s3
import (
"fmt"
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
)
type DesiredV0 struct {
Common *tree.Common `yaml:",inline"`
Spec *Spec
}
type Spec struct {
Verbose bool
Cron string `yaml:"cron,omitempty"`
Bucket string `yaml:"bucket,omitempty"`
Endpoint string `yaml:"endpoint,omitempty"`
Region string `yaml:"region,omitempty"`
AccessKeyID *secret.Secret `yaml:"accessKeyID,omitempty"`
ExistingAccessKeyID *secret.Existing `yaml:"existingAccessKeyID,omitempty"`
SecretAccessKey *secret.Secret `yaml:"secretAccessKey,omitempty"`
ExistingSecretAccessKey *secret.Existing `yaml:"existingSecretAccessKey,omitempty"`
SessionToken *secret.Secret `yaml:"sessionToken,omitempty"`
ExistingSessionToken *secret.Existing `yaml:"existingSessionToken,omitempty"`
}
func (s *Spec) IsZero() bool {
if ((s.AccessKeyID == nil || s.AccessKeyID.IsZero()) && (s.ExistingAccessKeyID == nil || s.ExistingAccessKeyID.IsZero())) &&
((s.SecretAccessKey == nil || s.SecretAccessKey.IsZero()) && (s.ExistingSecretAccessKey == nil || s.ExistingSecretAccessKey.IsZero())) &&
((s.SessionToken == nil || s.SessionToken.IsZero()) && (s.ExistingSessionToken == nil || s.ExistingSessionToken.IsZero())) &&
!s.Verbose &&
s.Bucket == "" &&
s.Cron == "" &&
s.Endpoint == "" &&
s.Region == "" {
return true
}
return false
}
func ParseDesiredV0(desiredTree *tree.Tree) (*DesiredV0, error) {
desiredKind := &DesiredV0{
Common: desiredTree.Common,
Spec: &Spec{},
}
if err := desiredTree.Original.Decode(desiredKind); err != nil {
return nil, fmt.Errorf("parsing desired state failed: %s", err)
}
return desiredKind, nil
}
func (d *DesiredV0) validateSecrets() error {
if err := secret.ValidateSecret(d.Spec.AccessKeyID, d.Spec.ExistingAccessKeyID); err != nil {
return fmt.Errorf("validating access key id failed: %w", err)
}
if err := secret.ValidateSecret(d.Spec.SecretAccessKey, d.Spec.ExistingSecretAccessKey); err != nil {
return fmt.Errorf("validating secret access key failed: %w", err)
}
return nil
}

View File

@ -0,0 +1,155 @@
package s3
import (
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
"testing"
)
const (
masterkey = "testMk"
cron = "testCron"
bucketName = "testBucket"
region = "testRegion"
endpoint = "testEndpoint"
akid = "testAKID"
sak = "testSAK"
st = "testST"
yamlFile = `kind: databases.caos.ch/BucketBackup
version: v0
spec:
verbose: true
cron: testCron
bucket: testBucket
region: testRegion
endpoint: testEndpoint
accessKeyID:
encryption: AES256
encoding: Base64
value: l7GEXvmCT8hBXereT4FIG4j5vKQIycjS
secretAccessKey:
encryption: AES256
encoding: Base64
value: NWYnOpFpME-9FESqWi0bFQ3M6e0iNQw=
sessionToken:
encryption: AES256
encoding: Base64
value: xVY9pEXuh0Wbf2P2X_yThXwqRX08sA==
`
yamlFileWithoutSecret = `kind: databases.caos.ch/BucketBackup
version: v0
spec:
verbose: true
cron: testCron
bucket: testBucket
endpoint: testEndpoint
region: testRegion
`
yamlEmpty = `kind: databases.caos.ch/BucketBackup
version: v0`
)
var (
desired = DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Endpoint: endpoint,
Region: region,
AccessKeyID: &secret.Secret{
Value: akid,
Encryption: "AES256",
Encoding: "Base64",
},
SecretAccessKey: &secret.Secret{
Value: sak,
Encryption: "AES256",
Encoding: "Base64",
},
SessionToken: &secret.Secret{
Value: st,
Encryption: "AES256",
Encoding: "Base64",
},
},
}
desiredWithoutSecret = DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
Region: region,
Endpoint: endpoint,
},
}
desiredEmpty = DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
Spec: &Spec{
Verbose: false,
Cron: "",
Bucket: "",
Endpoint: "",
Region: "",
AccessKeyID: &secret.Secret{
Value: "",
},
SecretAccessKey: &secret.Secret{
Value: "",
},
SessionToken: &secret.Secret{
Value: "",
},
},
}
desiredNil = DesiredV0{
Common: tree.NewCommon("databases.caos.ch/BucketBackup", "v0", false),
}
)
func marshalYaml(t *testing.T, masterkey string, struc *DesiredV0) []byte {
secret.Masterkey = masterkey
data, err := yaml.Marshal(struc)
assert.NoError(t, err)
return data
}
func unmarshalYaml(t *testing.T, masterkey string, yamlFile []byte) *tree.Tree {
secret.Masterkey = masterkey
desiredTree := &tree.Tree{}
assert.NoError(t, yaml.Unmarshal(yamlFile, desiredTree))
return desiredTree
}
func getDesiredTree(t *testing.T, masterkey string, desired *DesiredV0) *tree.Tree {
return unmarshalYaml(t, masterkey, marshalYaml(t, masterkey, desired))
}
func TestBucket_DesiredParse(t *testing.T) {
assert.Equal(t, yamlFileWithoutSecret, string(marshalYaml(t, masterkey, &desiredWithoutSecret)))
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlFile))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.Equal(t, &desired, desiredKind)
}
func TestBucket_DesiredNotZero(t *testing.T) {
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlFile))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.False(t, desiredKind.Spec.IsZero())
}
func TestBucket_DesiredZero(t *testing.T) {
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlEmpty))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.True(t, desiredKind.Spec.IsZero())
}

View File

@ -0,0 +1,105 @@
package s3
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/secret/read"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/core"
"strings"
)
func BackupList() core.BackupListFunc {
return func(monitor mntr.Monitor, k8sClient kubernetes.ClientInt, name string, desired *tree.Tree) ([]string, error) {
desiredKind, err := ParseDesiredV0(desired)
if err != nil {
return nil, fmt.Errorf("parsing desired state failed: %s", err)
}
desired.Parsed = desiredKind
if !monitor.IsVerbose() && desiredKind.Spec.Verbose {
monitor.Verbose()
}
valueAKI, err := read.GetSecretValue(k8sClient, desiredKind.Spec.AccessKeyID, desiredKind.Spec.ExistingAccessKeyID)
if err != nil {
return nil, err
}
valueSAK, err := read.GetSecretValue(k8sClient, desiredKind.Spec.SecretAccessKey, desiredKind.Spec.ExistingSecretAccessKey)
if err != nil {
return nil, err
}
valueST, err := read.GetSecretValue(k8sClient, desiredKind.Spec.SessionToken, desiredKind.Spec.ExistingSessionToken)
if err != nil {
return nil, err
}
return listFilesWithFilter(valueAKI, valueSAK, valueST, desiredKind.Spec.Region, desiredKind.Spec.Endpoint, desiredKind.Spec.Bucket, name)
}
}
func listFilesWithFilter(akid, secretkey, sessionToken string, region string, endpoint string, bucketName, name string) ([]string, error) {
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
URL: "https://" + endpoint,
SigningRegion: region,
}, nil
})
cfg, err := config.LoadDefaultConfig(
context.Background(),
config.WithRegion(region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(akid, secretkey, sessionToken)),
config.WithEndpointResolver(customResolver),
)
if err != nil {
return nil, err
}
client := s3.NewFromConfig(cfg)
prefix := name + "/"
params := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
}
paginator := s3.NewListObjectsV2Paginator(client, params, func(o *s3.ListObjectsV2PaginatorOptions) {
o.Limit = 10
})
names := make([]string, 0)
for paginator.HasMorePages() {
output, err := paginator.NextPage(context.Background())
if err != nil {
return nil, err
}
for _, value := range output.Contents {
if strings.HasPrefix(*value.Key, prefix) {
parts := strings.Split(*value.Key, "/")
if len(parts) < 2 {
continue
}
found := false
for _, name := range names {
if name == parts[1] {
found = true
}
}
if !found {
names = append(names, parts[1])
}
names = append(names)
}
}
}
return names, nil
}

View File

@ -0,0 +1,149 @@
package s3
import (
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/golang/mock/gomock"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func SetQueriedForDatabases(databases, users []string) map[string]interface{} {
queried := map[string]interface{}{}
core.SetQueriedForDatabaseDBList(queried, databases, users)
return queried
}
func SetInstantBackup(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
labelsAKID map[string]string,
labelsSAK map[string]string,
labelsST map[string]string,
akid, sak, st string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: accessKeyIDName,
Namespace: namespace,
Labels: labelsAKID,
},
StringData: map[string]string{accessKeyIDKey: akid},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretAccessKeyName,
Namespace: namespace,
Labels: labelsSAK,
},
StringData: map[string]string{secretAccessKeyKey: sak},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: sessionTokenName,
Namespace: namespace,
Labels: labelsST,
},
StringData: map[string]string{sessionTokenKey: st},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, backup.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, backup.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, backup.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, backup.GetJobName(backupName)).Times(1).Return(nil)
}
func SetBackup(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
labelsAKID map[string]string,
labelsSAK map[string]string,
labelsST map[string]string,
akid, sak, st string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: accessKeyIDName,
Namespace: namespace,
Labels: labelsAKID,
},
StringData: map[string]string{accessKeyIDKey: akid},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretAccessKeyName,
Namespace: namespace,
Labels: labelsSAK,
},
StringData: map[string]string{secretAccessKeyKey: sak},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: sessionTokenName,
Namespace: namespace,
Labels: labelsST,
},
StringData: map[string]string{sessionTokenKey: st},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplyCronJob(gomock.Any()).Times(1).Return(nil)
}
func SetRestore(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
labelsAKID map[string]string,
labelsSAK map[string]string,
labelsST map[string]string,
akid, sak, st string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: accessKeyIDName,
Namespace: namespace,
Labels: labelsAKID,
},
StringData: map[string]string{accessKeyIDKey: akid},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretAccessKeyName,
Namespace: namespace,
Labels: labelsSAK,
},
StringData: map[string]string{secretAccessKeyKey: sak},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: sessionTokenName,
Namespace: namespace,
Labels: labelsST,
},
StringData: map[string]string{sessionTokenKey: st},
Type: "Opaque",
}).MinTimes(1).MaxTimes(1).Return(nil)
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, restore.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, restore.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, restore.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, restore.GetJobName(backupName)).Times(1).Return(nil)
}

View File

@ -1,4 +1,4 @@
package clean
package restore
import (
"time"
@ -13,15 +13,17 @@ import (
)
const (
Instant = "clean"
defaultMode = int32(256)
certPath = "/cockroach/cockroach-certs"
secretPath = "/secrets/sa.json"
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
jobPrefix = "backup-"
jobSuffix = "-clean"
timeout = 60 * time.Second
Instant = "restore"
defaultMode = int32(256)
certPath = "/cockroach/cockroach-certs"
accessKeyIDPath = "/secrets/accessaccountkey"
secretAccessKeyPath = "/secrets/secretaccesskey"
sessionTokenPath = "/secrets/sessiontoken"
jobPrefix = "backup-"
jobSuffix = "-restore"
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
timeout = 15 * time.Minute
)
func AdaptFunc(
@ -29,13 +31,21 @@ func AdaptFunc(
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
users []string,
bucketName string,
timestamp string,
accessKeyIDName string,
accessKeyIDKey string,
secretAccessKeyName string,
secretAccessKeyKey string,
sessionTokenName string,
sessionTokenKey string,
region string,
endpoint string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
checkDBReady operator.EnsureFunc,
secretName string,
secretKey string,
dbURL string,
dbPort int32,
image string,
) (
queryFunc operator.QueryFunc,
@ -43,20 +53,37 @@ func AdaptFunc(
err error,
) {
command := getCommand(databases, users)
jobName := jobPrefix + backupName + jobSuffix
command := getCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
jobDef := getJob(
jobdef := getJob(
namespace,
labels.MustForName(componentLabels, GetJobName(backupName)),
nodeselector,
tolerations,
secretName,
secretKey,
command,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
image,
command,
)
destroyJ, err := job.AdaptFuncToDestroy(jobDef.Namespace, jobDef.Name)
destroyJ, err := job.AdaptFuncToDestroy(jobName, namespace)
if err != nil {
return nil, nil, err
}
@ -65,7 +92,7 @@ func AdaptFunc(
operator.ResourceDestroyToZitadelDestroy(destroyJ),
}
queryJ, err := job.AdaptFuncToEnsure(jobDef)
queryJ, err := job.AdaptFuncToEnsure(jobdef)
if err != nil {
return nil, nil, err
}
@ -79,6 +106,7 @@ func AdaptFunc(
return operator.QueriersToEnsureFunc(monitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(monitor, destroyers),
nil
}

View File

@ -1,6 +1,7 @@
package clean
package restore
import (
"github.com/caos/zitadel/operator/common"
"testing"
"github.com/caos/orbos/mntr"
@ -19,15 +20,23 @@ func TestBackup_Adapt1(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
users := []string{"testUser"}
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
image := "testImage"
secretKey := "testKey"
secretName := "testSecretName"
timestamp := "testTs"
backupName := "testName2"
bucketName := "testBucket2"
version := "testVersion"
accessKeyIDName := "testAKIN"
accessKeyIDKey := "testAKIK"
secretAccessKeyName := "testSAKN"
secretAccessKeyKey := "testSAKK"
sessionTokenName := "testSTN"
sessionTokenKey := "testSTK"
region := "region"
endpoint := "endpoint"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
@ -41,13 +50,26 @@ func TestBackup_Adapt1(t *testing.T) {
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
common.BackupImage.Reference("", version),
getCommand(
databases,
users,
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
image,
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
@ -58,14 +80,22 @@ func TestBackup_Adapt1(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
users,
bucketName,
timestamp,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
image,
dbURL,
dbPort,
common.BackupImage.Reference("", version),
)
assert.NoError(t, err)
@ -80,15 +110,23 @@ func TestBackup_Adapt2(t *testing.T) {
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb1", "testDb2"}
users := []string{"testUser1", "testUser2"}
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
timestamp := "testTs"
backupName := "testName2"
image := "testImage2"
secretKey := "testKey2"
secretName := "testSecretName2"
bucketName := "testBucket2"
version := "testVersion2"
accessKeyIDName := "testAKIN2"
accessKeyIDKey := "testAKIK2"
secretAccessKeyName := "testSAKN2"
secretAccessKeyKey := "testSAKK2"
sessionTokenName := "testSTN2"
sessionTokenKey := "testSTK2"
region := "region2"
endpoint := "endpoint2"
dbURL := "testDB"
dbPort := int32(80)
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
@ -102,13 +140,26 @@ func TestBackup_Adapt2(t *testing.T) {
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
common.BackupImage.Reference("", version),
getCommand(
databases,
users,
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
),
image,
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
@ -119,14 +170,22 @@ func TestBackup_Adapt2(t *testing.T) {
backupName,
namespace,
componentLabels,
databases,
users,
bucketName,
timestamp,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
region,
endpoint,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
image,
dbURL,
dbPort,
common.BackupImage.Reference("", version),
)
assert.NoError(t, err)

View File

@ -0,0 +1,27 @@
package restore
import (
"fmt"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator"
)
func GetCleanupFunc(
monitor mntr.Monitor,
namespace,
backupName string,
) operator.EnsureFunc {
return func(k8sClient kubernetes.ClientInt) error {
monitor.Info("waiting for restore to be completed")
if err := k8sClient.WaitUntilJobCompleted(namespace, GetJobName(backupName), timeout); err != nil {
return fmt.Errorf("error while waiting for restore to be completed: %s", err.Error())
}
monitor.Info("restore is completed, cleanup")
if err := k8sClient.DeleteJob(namespace, GetJobName(backupName)); err != nil {
return fmt.Errorf("error while trying to cleanup restore: %s", err.Error())
}
monitor.Info("restore cleanup is completed")
return nil
}
}

View File

@ -1,14 +1,12 @@
package clean
package restore
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/caos/orbos/mntr"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Cleanup1(t *testing.T) {

View File

@ -0,0 +1,48 @@
package restore
import (
"strconv"
"strings"
)
func getCommand(
timestamp string,
bucketName string,
backupName string,
certsFolder string,
accessKeyIDPath string,
secretAccessKeyPath string,
sessionTokenPath string,
region string,
endpoint string,
dbURL string,
dbPort int32,
) string {
backupCommands := make([]string, 0)
parameters := []string{
"AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")",
"AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")",
"AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")",
"AWS_ENDPOINT=" + endpoint,
}
if region != "" {
parameters = append(parameters, "AWS_REGION="+region)
}
backupCommands = append(backupCommands,
strings.Join([]string{
"cockroach",
"sql",
"--certs-dir=" + certsFolder,
"--host=" + dbURL,
"--port=" + strconv.Itoa(int(dbPort)),
"-e",
"\"RESTORE FROM \\\"s3://" + bucketName + "/" + backupName + "/" + timestamp + "?" + strings.Join(parameters, "&") + "\\\";\"",
}, " ",
),
)
return strings.Join(backupCommands, " && ")
}

View File

@ -0,0 +1,59 @@
package restore
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Command1(t *testing.T) {
timestamp := "test1"
bucketName := "testBucket"
backupName := "testBackup"
dbURL := "testDB"
dbPort := int32(80)
region := "region"
endpoint := "endpoint"
cmd := getCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
equals := "cockroach sql --certs-dir=" + certPath + " --host=testDB --port=80 -e \"RESTORE FROM \\\"s3://testBucket/testBackup/test1?AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")&AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")&AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")&AWS_ENDPOINT=endpoint&AWS_REGION=region\\\";\""
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := "test2"
bucketName := "testBucket"
backupName := "testBackup"
dbURL := "testDB2"
dbPort := int32(81)
region := "region2"
endpoint := "endpoint2"
cmd := getCommand(
timestamp,
bucketName,
backupName,
certPath,
accessKeyIDPath,
secretAccessKeyPath,
sessionTokenPath,
region,
endpoint,
dbURL,
dbPort,
)
equals := "cockroach sql --certs-dir=" + certPath + " --host=testDB2 --port=81 -e \"RESTORE FROM \\\"s3://testBucket/testBackup/test2?AWS_ACCESS_KEY_ID=$(cat " + accessKeyIDPath + ")&AWS_SECRET_ACCESS_KEY=$(cat " + secretAccessKeyPath + ")&AWS_SESSION_TOKEN=$(cat " + sessionTokenPath + ")&AWS_ENDPOINT=endpoint2&AWS_REGION=region2\\\";\""
assert.Equal(t, equals, cmd)
}

View File

@ -1,4 +1,4 @@
package clean
package restore
import (
"github.com/caos/orbos/pkg/labels"
@ -13,12 +13,16 @@ func getJob(
nameLabels *labels.Name,
nodeselector map[string]string,
tolerations []corev1.Toleration,
secretName string,
secretKey string,
command string,
accessKeyIDName string,
accessKeyIDKey string,
secretAccessKeyName string,
secretAccessKeyKey string,
sessionTokenName string,
sessionTokenKey string,
image string,
) *batchv1.Job {
command string,
) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
@ -43,11 +47,19 @@ func getJob(
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
@ -58,10 +70,24 @@ func getJob(
},
},
}, {
Name: secretKey,
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
@ -69,5 +95,4 @@ func getJob(
},
},
}
}

View File

@ -1,27 +1,29 @@
package clean
package restore
import (
"testing"
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)
func TestBackup_Job1(t *testing.T) {
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
image := "testVersion"
command := "test"
secretKey := "testKey"
secretName := "testSecretName"
accessKeyIDName := "testAKIN"
accessKeyIDKey := "testAKIK"
secretAccessKeyName := "testSAKN"
secretAccessKeyKey := "testSAKK"
sessionTokenName := "testSTN"
sessionTokenKey := "testSTK"
jobName := "testJob"
namespace := "testNs"
image := "testImage"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
@ -58,11 +60,19 @@ func TestBackup_Job1(t *testing.T) {
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
@ -73,10 +83,24 @@ func TestBackup_Job1(t *testing.T) {
},
},
}, {
Name: secretKey,
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
@ -85,19 +109,36 @@ func TestBackup_Job1(t *testing.T) {
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, command, image))
assert.Equal(t, equals, getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
image,
command,
))
}
func TestBackup_Job2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
image := "testVersion2"
command := "test2"
secretKey := "testKey2"
secretName := "testSecretName2"
accessKeyIDName := "testAKIN2"
accessKeyIDKey := "testAKIK2"
secretAccessKeyName := "testSAKN2"
secretAccessKeyKey := "testSAKK2"
sessionTokenName := "testSTN2"
sessionTokenKey := "testSTK2"
jobName := "testJob2"
namespace := "testNs2"
image := "testImage2"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent2",
"app.kubernetes.io/managed-by": "testOp2",
@ -134,11 +175,19 @@ func TestBackup_Job2(t *testing.T) {
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
Name: accessKeyIDKey,
SubPath: accessKeyIDKey,
MountPath: accessKeyIDPath,
}, {
Name: secretAccessKeyKey,
SubPath: secretAccessKeyKey,
MountPath: secretAccessKeyPath,
}, {
Name: sessionTokenKey,
SubPath: sessionTokenKey,
MountPath: sessionTokenPath,
}},
ImagePullPolicy: corev1.PullAlways,
ImagePullPolicy: corev1.PullIfNotPresent,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
@ -149,10 +198,24 @@ func TestBackup_Job2(t *testing.T) {
},
},
}, {
Name: secretKey,
Name: accessKeyIDKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
SecretName: accessKeyIDName,
},
},
}, {
Name: secretAccessKeyKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretAccessKeyName,
},
},
}, {
Name: sessionTokenKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: sessionTokenName,
},
},
}},
@ -161,5 +224,17 @@ func TestBackup_Job2(t *testing.T) {
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, command, image))
assert.Equal(t, equals, getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
accessKeyIDName,
accessKeyIDKey,
secretAccessKeyName,
secretAccessKeyKey,
sessionTokenName,
sessionTokenKey,
image,
command))
}

View File

@ -0,0 +1,54 @@
package s3
import (
"github.com/caos/orbos/pkg/secret"
)
func getSecretsMap(desiredKind *DesiredV0) (map[string]*secret.Secret, map[string]*secret.Existing) {
var (
secrets = make(map[string]*secret.Secret, 0)
existing = make(map[string]*secret.Existing, 0)
)
if desiredKind.Spec == nil {
desiredKind.Spec = &Spec{}
}
if desiredKind.Spec.AccessKeyID == nil {
desiredKind.Spec.AccessKeyID = &secret.Secret{}
}
if desiredKind.Spec.ExistingAccessKeyID == nil {
desiredKind.Spec.ExistingAccessKeyID = &secret.Existing{}
}
akikey := "accesskeyid"
secrets[akikey] = desiredKind.Spec.AccessKeyID
existing[akikey] = desiredKind.Spec.ExistingAccessKeyID
if desiredKind.Spec.SecretAccessKey == nil {
desiredKind.Spec.SecretAccessKey = &secret.Secret{}
}
if desiredKind.Spec.ExistingSecretAccessKey == nil {
desiredKind.Spec.ExistingSecretAccessKey = &secret.Existing{}
}
sakkey := "secretaccesskey"
secrets[sakkey] = desiredKind.Spec.SecretAccessKey
existing[sakkey] = desiredKind.Spec.ExistingSecretAccessKey
if desiredKind.Spec.SessionToken == nil {
desiredKind.Spec.SessionToken = &secret.Secret{}
}
if desiredKind.Spec.ExistingSessionToken == nil {
desiredKind.Spec.ExistingSessionToken = &secret.Existing{}
}
stkey := "sessiontoken"
secrets[stkey] = desiredKind.Spec.SessionToken
existing[stkey] = desiredKind.Spec.ExistingSessionToken
return secrets, existing
}

View File

@ -0,0 +1,38 @@
package s3
import (
"testing"
"github.com/caos/orbos/pkg/secret"
"github.com/stretchr/testify/assert"
)
func TestBucket_getSecretsFull(t *testing.T) {
secrets, existing := getSecretsMap(&desired)
assert.Equal(t, desired.Spec.AccessKeyID, secrets["accesskeyid"])
assert.Equal(t, desired.Spec.ExistingAccessKeyID, existing["accesskeyid"])
assert.Equal(t, desired.Spec.SecretAccessKey, secrets["secretaccesskey"])
assert.Equal(t, desired.Spec.ExistingSecretAccessKey, existing["secretaccesskey"])
assert.Equal(t, desired.Spec.SessionToken, secrets["sessiontoken"])
assert.Equal(t, desired.Spec.ExistingSessionToken, existing["sessiontoken"])
}
func TestBucket_getSecretsEmpty(t *testing.T) {
secrets, existing := getSecretsMap(&desiredWithoutSecret)
assert.Equal(t, &secret.Secret{}, secrets["accesskeyid"])
assert.Equal(t, &secret.Existing{}, existing["accesskeyid"])
assert.Equal(t, &secret.Secret{}, secrets["secretaccesskey"])
assert.Equal(t, &secret.Existing{}, existing["secretaccesskey"])
assert.Equal(t, &secret.Secret{}, secrets["sessiontoken"])
assert.Equal(t, &secret.Existing{}, existing["sessiontoken"])
}
func TestBucket_getSecretsNil(t *testing.T) {
secrets, existing := getSecretsMap(&desiredNil)
assert.Equal(t, &secret.Secret{}, secrets["accesskeyid"])
assert.Equal(t, &secret.Existing{}, existing["accesskeyid"])
assert.Equal(t, &secret.Secret{}, secrets["secretaccesskey"])
assert.Equal(t, &secret.Existing{}, existing["secretaccesskey"])
assert.Equal(t, &secret.Secret{}, secrets["sessiontoken"])
assert.Equal(t, &secret.Existing{}, existing["sessiontoken"])
}

View File

@ -10,7 +10,6 @@ import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/database/kinds/databases/managed"
"github.com/caos/zitadel/operator/database/kinds/databases/provided"
@ -50,16 +49,7 @@ func Adapt(
switch desiredTree.Common.Kind {
case "databases.caos.ch/CockroachDB":
return managed.Adapter(
componentLabels,
namespace,
timestamp,
nodeselector,
tolerations,
version,
features,
customImageRegistry,
)(internalMonitor, desiredTree, currentTree)
return managed.Adapter(componentLabels, namespace, timestamp, nodeselector, tolerations, version, features, customImageRegistry)(internalMonitor, desiredTree, currentTree)
case "databases.caos.ch/ProvidedDatabase":
return provided.Adapter()(internalMonitor, desiredTree, currentTree)
default:

View File

@ -33,6 +33,8 @@ const (
privateServiceName = SfsName
cockroachPort = int32(26257)
cockroachHTTPPort = int32(8080)
Clean = "clean"
DBReady = "dbready"
)
func Adapter(
@ -89,14 +91,14 @@ func Adapter(
var (
isFeatureDatabase bool
isFeatureRestore bool
isFeatureClean bool
)
for _, feature := range features {
switch feature {
case "database":
isFeatureDatabase = true
case "restore":
isFeatureRestore = true
case Clean:
isFeatureClean = true
}
}
@ -179,9 +181,6 @@ func Adapter(
queryS,
operator.EnsureFuncToQueryFunc(ensureInit),
)
}
if isFeatureDatabase {
destroyers = append(destroyers,
destroyS,
operator.ResourceDestroyToZitadelDestroy(destroySFS),
@ -191,6 +190,21 @@ func Adapter(
)
}
if isFeatureClean {
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(
statefulset.CleanPVCs(
monitor,
namespace,
cockroachSelectabel,
desiredKind.Spec.ReplicaCount,
),
),
operator.EnsureFuncToQueryFunc(ensureInit),
operator.EnsureFuncToQueryFunc(checkDBReady),
)
}
if desiredKind.Spec.Backups != nil {
oneBackup := false
@ -215,6 +229,8 @@ func Adapter(
nodeselector,
tolerations,
version,
PublicServiceName,
cockroachPort,
features,
customImageRegistry,
)
@ -233,21 +249,19 @@ func Adapter(
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
if !isFeatureRestore {
queriedCurrentDB, err := core.ParseQueriedForDatabase(queried)
if err != nil || queriedCurrentDB == nil {
// TODO: query system state
currentDB.Current.Port = strconv.Itoa(int(cockroachPort))
currentDB.Current.URL = PublicServiceName
currentDB.Current.ReadyFunc = checkDBReady
currentDB.Current.AddUserFunc = addUser
currentDB.Current.DeleteUserFunc = deleteUser
currentDB.Current.ListUsersFunc = listUsers
currentDB.Current.ListDatabasesFunc = listDatabases
queriedCurrentDB, err := core.ParseQueriedForDatabase(queried)
if err != nil || queriedCurrentDB == nil {
// TODO: query system state
currentDB.Current.Port = strconv.Itoa(int(cockroachPort))
currentDB.Current.URL = PublicServiceName
currentDB.Current.ReadyFunc = checkDBReady
currentDB.Current.AddUserFunc = addUser
currentDB.Current.DeleteUserFunc = deleteUser
currentDB.Current.ListUsersFunc = listUsers
currentDB.Current.ListDatabasesFunc = listDatabases
core.SetQueriedForDatabase(queried, current)
internalMonitor.Info("set current state of managed database")
}
core.SetQueriedForDatabase(queried, current)
internalMonitor.Info("set current state of managed database")
}
ensure, err := operator.QueriersToEnsureFunc(internalMonitor, true, queriers, k8sClient, queried)

View File

@ -11,7 +11,6 @@ import (
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
@ -66,11 +65,11 @@ func TestManaged_AdaptBucketBackup(t *testing.T) {
timestamp := "testTs"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{}
version := "testVersion"
k8sClient := kubernetesmock.NewMockClientInt(gomock.NewController(t))
backupName := "testBucket"
saJson := "testSA"
masterkey := "testMk"
version := "test"
desired := getTreeWithDBAndBackup(t, masterkey, saJson, backupName)
@ -106,11 +105,11 @@ func TestManaged_AdaptBucketInstantBackup(t *testing.T) {
timestamp := "testTs"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{}
version := "testVersion"
masterkey := "testMk"
k8sClient := kubernetesmock.NewMockClientInt(gomock.NewController(t))
saJson := "testSA"
backupName := "testBucket"
version := "test"
features := []string{backup.Instant}
bucket.SetInstantBackup(k8sClient, namespace, backupName, labels, saJson)
@ -152,10 +151,10 @@ func TestManaged_AdaptBucketCleanAndRestore(t *testing.T) {
saJson := "testSA"
backupName := "testBucket"
features := []string{restore.Instant, clean.Instant}
features := []string{restore.Instant}
bucket.SetRestore(k8sClient, namespace, backupName, labels, saJson)
bucket.SetClean(k8sClient, namespace, backupName, labels, saJson)
k8sClient.EXPECT().WaitUntilStatefulsetIsReady(namespace, SfsName, true, true, 60*time.Second).Times(2)
//SetClean(k8sClient, namespace, 1)
k8sClient.EXPECT().WaitUntilStatefulsetIsReady(namespace, SfsName, true, true, 60*time.Second).Times(1)
desired := getTreeWithDBAndBackup(t, masterkey, saJson, backupName)

View File

@ -66,12 +66,12 @@ func TestManaged_Adapt1(t *testing.T) {
timestamp := "testTs"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{}
version := "testVersion"
features := []string{"database"}
masterkey := "testMk"
k8sClient := kubernetesmock.NewMockClientInt(gomock.NewController(t))
dbCurrent := coremock.NewMockDatabaseCurrent(gomock.NewController(t))
queried := map[string]interface{}{}
version := "test"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/CockroachDB", "v0", false),
@ -130,16 +130,7 @@ func TestManaged_Adapt1(t *testing.T) {
dbCurrent.EXPECT().SetCertificateKey(gomock.Any()).MinTimes(1).MaxTimes(1)
k8sClient.EXPECT().ApplySecret(gomock.Any()).MinTimes(1).MaxTimes(1)
query, _, _, _, _, _, err := Adapter(
componentLabels,
namespace,
timestamp,
nodeselector,
tolerations,
version,
features,
"",
)(monitor, desired, &tree.Tree{})
query, _, _, _, _, _, err := Adapter(componentLabels, namespace, timestamp, nodeselector, tolerations, version, features, "")(monitor, desired, &tree.Tree{})
assert.NoError(t, err)
ensure, err := query(k8sClient, queried)
@ -184,12 +175,12 @@ func TestManaged_Adapt2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
var tolerations []corev1.Toleration
version := "testVersion2"
features := []string{"database"}
masterkey := "testMk2"
k8sClient := kubernetesmock.NewMockClientInt(gomock.NewController(t))
dbCurrent := coremock.NewMockDatabaseCurrent(gomock.NewController(t))
queried := map[string]interface{}{}
version := "test"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: tree.NewCommon("databases.caos.ch/CockroachDB", "v0", false),
@ -248,16 +239,7 @@ func TestManaged_Adapt2(t *testing.T) {
dbCurrent.EXPECT().SetCertificateKey(gomock.Any()).MinTimes(1).MaxTimes(1)
k8sClient.EXPECT().ApplySecret(gomock.Any()).MinTimes(1).MaxTimes(1)
query, _, _, _, _, _, err := Adapter(
componentLabels,
namespace,
timestamp,
nodeselector,
tolerations,
version,
features,
"",
)(monitor, desired, &tree.Tree{})
query, _, _, _, _, _, err := Adapter(componentLabels, namespace, timestamp, nodeselector, tolerations, version, features, "")(monitor, desired, &tree.Tree{})
assert.NoError(t, err)
ensure, err := query(k8sClient, queried)

View File

@ -0,0 +1,35 @@
package managed
import (
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)
func SetClean(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
replicas int,
) {
k8sClient.EXPECT().ScaleStatefulset(namespace, gomock.Any(), 0).Return(nil)
k8sClient.EXPECT().ListPersistentVolumeClaims(namespace).Return(&core.PersistentVolumeClaimList{
Items: []core.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{
Name: "datadir-cockroachdb-0",
}},
},
}, nil)
k8sClient.EXPECT().ScaleStatefulset(namespace, gomock.Any(), 1).Return(nil)
k8sClient.EXPECT().DeletePersistentVolumeClaim(namespace, gomock.Any(), gomock.Any()).Times(replicas).Return(nil)
k8sClient.EXPECT().WaitUntilStatefulsetIsReady(namespace, gomock.Any(), true, false, gomock.Any())
k8sClient.EXPECT().WaitUntilStatefulsetIsReady(namespace, gomock.Any(), true, true, time.Second*1)
k8sClient.EXPECT().WaitUntilStatefulsetIsReady(namespace, gomock.Any(), true, true, gomock.Any())
/*
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, clean.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, clean.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil)*/
}

View File

@ -34,6 +34,7 @@ const (
defaultMode = int32(256)
nodeSecret = "cockroachdb.node"
rootSecret = "cockroachdb.client.root"
cleanTimeout = time.Minute * 5
)
type Affinity struct {

View File

@ -0,0 +1,60 @@
package statefulset
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources"
"github.com/caos/orbos/pkg/labels"
macherrs "k8s.io/apimachinery/pkg/api/errors"
"strings"
"time"
)
func CleanPVCs(
monitor mntr.Monitor,
namespace string,
sfsSelectable *labels.Selectable,
replicaCount int,
) resources.QueryFunc {
name := sfsSelectable.Name()
return func(k8sClient kubernetes.ClientInt) (resources.EnsureFunc, error) {
pvcs, err := k8sClient.ListPersistentVolumeClaims(namespace)
if err != nil {
return nil, err
}
internalPvcs := []string{}
for _, pvc := range pvcs.Items {
if strings.HasPrefix(pvc.Name, datadirInternal+"-"+name) {
internalPvcs = append(internalPvcs, pvc.Name)
}
}
return func(k8sClient kubernetes.ClientInt) error {
noSFS := false
monitor.Info("Scale down statefulset")
if err := k8sClient.ScaleStatefulset(namespace, name, 0); err != nil {
if macherrs.IsNotFound(err) {
noSFS = true
} else {
return err
}
}
time.Sleep(2 * time.Second)
monitor.Info("Delete persistent volume claims")
for _, pvcName := range internalPvcs {
if err := k8sClient.DeletePersistentVolumeClaim(namespace, pvcName, cleanTimeout); err != nil {
return err
}
}
time.Sleep(2 * time.Second)
if !noSFS {
monitor.Info("Scale up statefulset")
if err := k8sClient.ScaleStatefulset(namespace, name, replicaCount); err != nil {
return err
}
}
return nil
}, nil
}
}

View File

@ -2,6 +2,7 @@ package orb
import (
"fmt"
"github.com/caos/zitadel/operator/database/kinds/databases/managed"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
@ -13,7 +14,6 @@ import (
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/caos/zitadel/operator/database/kinds/databases"
)
@ -77,6 +77,10 @@ func AdaptFunc(
databaseCurrent := &tree.Tree{}
operatorLabels := mustDatabaseOperator(binaryVersion)
version := ""
if binaryVersion != nil {
version = *binaryVersion
}
queryDB, destroyDB, configureDB, secrets, existing, migrate, err := databases.Adapt(
orbMonitor,
@ -87,7 +91,7 @@ func AdaptFunc(
timestamp,
desiredKind.Spec.NodeSelector,
desiredKind.Spec.Tolerations,
desiredKind.Spec.Version,
version,
features,
desiredKind.Spec.CustomImageRegistry,
)
@ -102,7 +106,7 @@ func AdaptFunc(
dbOrBackup := false
for _, feature := range features {
switch feature {
case "database", backup.Instant, backup.Normal, restore.Instant, clean.Instant:
case "database", backup.Instant, backup.Normal, restore.Instant, managed.Clean:
if !dbOrBackup {
dbOrBackup = true
queriers = append(queriers,

View File

@ -6,7 +6,7 @@ import (
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/api/database"
"github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/caos/zitadel/operator/database/kinds/databases/managed"
orbdb "github.com/caos/zitadel/operator/database/kinds/orb"
)
@ -14,51 +14,42 @@ func GitOpsClear(
monitor mntr.Monitor,
k8sClient kubernetes.ClientInt,
gitClient *git.Client,
databases []string,
users []string,
) error {
desired, err := gitClient.ReadTree(git.DatabaseFile)
if err != nil {
return err
}
return clear(monitor, k8sClient, databases, users, desired)
return clear(monitor, k8sClient, desired, true)
}
func CrdClear(
monitor mntr.Monitor,
k8sClient kubernetes.ClientInt,
databases []string,
users []string,
) error {
desired, err := database.ReadCrd(k8sClient)
if err != nil {
return err
}
return clear(monitor, k8sClient, databases, users, desired)
return clear(monitor, k8sClient, desired, false)
}
func clear(
monitor mntr.Monitor,
k8sClient kubernetes.ClientInt,
databases []string,
users []string,
desired *tree.Tree,
gitops bool,
) error {
current := &tree.Tree{}
query, _, _, _, _, _, err := orbdb.AdaptFunc("", nil, false, "clean")(monitor, desired, current)
query, _, _, _, _, _, err := orbdb.AdaptFunc("", nil, gitops, managed.Clean)(monitor, desired, current)
if err != nil {
monitor.Error(err)
return err
}
queried := map[string]interface{}{}
core.SetQueriedForDatabaseDBList(queried, databases, users)
ensure, err := query(k8sClient, queried)
if err != nil {
monitor.Error(err)
return err
}

View File

@ -6,7 +6,6 @@ import (
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/api/database"
"github.com/caos/zitadel/operator/database/kinds/databases/core"
orbdb "github.com/caos/zitadel/operator/database/kinds/orb"
)
@ -15,26 +14,24 @@ func GitOpsRestore(
k8sClient kubernetes.ClientInt,
gitClient *git.Client,
name string,
databases []string,
) error {
desired, err := gitClient.ReadTree(git.DatabaseFile)
if err != nil {
return err
}
return restore(monitor, k8sClient, desired, name, databases)
return restore(monitor, k8sClient, desired, name)
}
func CrdRestore(
monitor mntr.Monitor,
k8sClient kubernetes.ClientInt,
name string,
databases []string,
) error {
desired, err := database.ReadCrd(k8sClient)
if err != nil {
return err
}
return restore(monitor, k8sClient, desired, name, databases)
return restore(monitor, k8sClient, desired, name)
}
func restore(
@ -42,7 +39,6 @@ func restore(
k8sClient kubernetes.ClientInt,
desired *tree.Tree,
name string,
databases []string,
) error {
current := &tree.Tree{}
@ -52,8 +48,6 @@ func restore(
return err
}
queried := map[string]interface{}{}
core.SetQueriedForDatabaseDBList(queried, databases, []string{})
ensure, err := query(k8sClient, queried)
if err != nil {
monitor.Error(err)

View File

@ -354,7 +354,7 @@ func DestroyZitadelOperator(
func ScaleZitadelOperator(
monitor mntr.Monitor,
client *kubernetes.Client,
client kubernetes.ClientInt,
replicaCount int,
) error {
monitor.Debug("Scaling zitadel-operator")
@ -363,7 +363,7 @@ func ScaleZitadelOperator(
func ScaleDatabaseOperator(
monitor mntr.Monitor,
client *kubernetes.Client,
client kubernetes.ClientInt,
replicaCount int,
) error {
monitor.Debug("Scaling database-operator")

View File

@ -1,105 +0,0 @@
package zitadel
import (
"time"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/git"
"github.com/caos/orbos/pkg/kubernetes"
orbconfig "github.com/caos/orbos/pkg/orb"
"github.com/caos/zitadel/pkg/databases"
kubernetes2 "github.com/caos/zitadel/pkg/kubernetes"
)
var (
databasesList = []string{
"notification",
"adminapi",
"auth",
"authz",
"eventstore",
"management",
"zitadel",
}
userList = []string{
"notification",
"adminapi",
"auth",
"authz",
"eventstore",
"management",
"queries",
}
)
func GitOpsClearMigrateRestore(
monitor mntr.Monitor,
gitClient *git.Client,
orbCfg *orbconfig.Orb,
k8sClient *kubernetes.Client,
backup string,
version *string,
) error {
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 0); err != nil {
return err
}
time.Sleep(5 * time.Second)
if err := GitOpsScaleDown(monitor, orbCfg, gitClient, k8sClient, version); err != nil {
return err
}
if err := databases.GitOpsClear(monitor, k8sClient, gitClient, databasesList, userList); err != nil {
return err
}
if err := GitOpsMigrations(monitor, orbCfg, gitClient, k8sClient, version); err != nil {
return err
}
if err := databases.GitOpsRestore(monitor, k8sClient, gitClient, backup, databasesList); err != nil {
return err
}
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 1); err != nil {
return err
}
return nil
}
func CrdClearMigrateRestore(
monitor mntr.Monitor,
k8sClient *kubernetes.Client,
backup string,
version *string,
) error {
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 0); err != nil {
return err
}
time.Sleep(5 * time.Second)
if err := CrdScaleDown(monitor, k8sClient, version); err != nil {
return err
}
if err := databases.CrdClear(monitor, k8sClient, databasesList, userList); err != nil {
return err
}
if err := CrdMigrations(monitor, k8sClient, version); err != nil {
return err
}
if err := databases.CrdRestore(monitor, k8sClient, backup, databasesList); err != nil {
return err
}
if err := kubernetes2.ScaleZitadelOperator(monitor, k8sClient, 1); err != nil {
return err
}
return nil
}