feat(operator): zitadel and database operator (#1208)

* feat(operator): add base for zitadel operator

* fix(operator): changed pipeline to release operator

* fix(operator): fmt with only one parameter

* fix(operator): corrected workflow job name

* fix(zitadelctl): added restore and backuplist command

* fix(zitadelctl): scale for restore

* chore(container): use scratch for deploy container

* fix(zitadelctl): limit image to scratch

* fix(migration): added migration scripts for newer version

* fix(operator): changed handling of kubeconfig in operator logic

* fix(operator): changed handling of secrets in operator logic

* fix(operator): use new version of zitadel

* fix(operator): added path for migrations

* fix(operator): delete doublets of migration scripts

* fix(operator): delete subpaths and integrate logic into init container

* fix(operator): corrected path in dockerfile for local migrations

* fix(operator): added migrations for cockroachdb-secure

* fix(operator): delete logic for ambassador module

* fix(operator): added read and write secret commands

* fix(operator): correct and align operator pipeline with zitadel pipeline

* fix(operator): correct yaml error in operator pipeline

* fix(operator): correct action name in operator pipeline

* fix(operator): correct case-sensitive filename in operator pipeline

* fix(operator): upload artifacts from buildx output

* fix(operator): corrected attribute spelling error

* fix(operator): combined jobs for operator binary and image

* fix(operator): added missing comma in operator pipeline

* fix(operator): added codecov for operator image

* fix(operator): added codecov for operator image

* fix(testing): code changes for testing and several unit-tests (#1009)

* fix(operator): usage of interface of kubernetes client for testing and several unit-tests

* fix(operator): several unit-tests

* fix(operator): several unit-tests

* fix(operator): changed order for the operator logic

* fix(operator): added version of zitadelctl from semantic release

* fix(operator): corrected function call with version of zitadelctl

* fix(operator): corrected function call with version of zitadelctl

* fix(operator): add check output to operator release pipeline

* fix(operator): set --short length everywhere to 12

* fix(operator): zitadel setup in job instead of exec with several unit tests

* fix(operator): fixes to combine newest zitadel and testing branch

* fix(operator): corrected path in Dockerfile

* fix(operator): fixed unit-test that was ignored during changes

* fix(operator): fixed unit-test that was ignored during changes

* fix(operator): corrected Dockerfile to correctly use env variable

* fix(operator): quickfix takeoff deployment

* fix(operator): corrected the clusterrolename in the applied artifacts

* fix: update secure migrations

* fix(operator): migrations (#1057)

* fix(operator): copied migrations from orbos repository

* fix(operator): newest migrations

* chore: use cockroach-secure

* fix: rename migration

* fix: remove insecure cockroach migrations

Co-authored-by: Stefan Benz <stefan@caos.ch>

* fix: finalize labels

* fix(operator): cli logging concurrent and fixe deployment of operator during restore

* fix: finalize labels and cli commands

* fix: restore

* chore: cockroachdb is always secure

* chore: use orbos consistent-labels latest commit

* test: make tests compatible with new labels

* fix: default to sa token for start command

* fix: use cockroachdb v12.02

* fix: don't delete flyway user

* test: fix migration test

* fix: use correct table qualifiers

* fix: don't alter sequence ownership

* fix: upgrade flyway

* fix: change ownership of all dbs and tables to admin user

* fix: change defaultdb user

* fix: treat clientid status codes >= 400 as errors

* fix: reconcile specified ZITADEL version, not binary version

* fix: add ca-certs

* fix: use latest orbos code

* fix: use orbos with fixed race condition

* fix: use latest ORBOS code

* fix: use latest ORBOS code

* fix: make migration and scaling around restoring work

* fix(operator): move zitadel operator

* chore(migrations): include owner change migration

* feat(db): add code base for database operator

* fix(db): change used image registry for database operator

* fix(db): generated mock

* fix(db): add accidentally ignored file

* fix(db): add cockroachdb backup image to pipeline

* fix(db): correct pipeline and image versions

* fix(db): correct version of used orbos

* fix(db): correct database import

* fix(db): go mod tidy

* fix(db): use new version for orbos

* fix(migrations): include migrations into zitadelctl binary (#1211)

* fix(db): use statik to integrate migrations into binary

* fix(migrations): corrections unit tests and pipeline for integrated migrations into zitadelctl binary

* fix(migrations): correction in dockerfile for pipeline build

* fix(migrations): correction in dockerfile for pipeline build

* fix(migrations):  dockerfile changes for cache optimization

* fix(database): correct used part-of label in database operator

* fix(database): correct used selectable label in zitadel operator

* fix(operator): correct lables for user secrets in zitadel operator

* fix(operator): correct lables for service test in zitadel operator

* fix: don't enable database features for user operations (#1227)

* fix: don't enable database features for user operations

* fix: omit database feature for connection info adapter

* fix: use latest orbos version

* fix: update ORBOS (#1240)

Co-authored-by: Florian Forster <florian@caos.ch>
Co-authored-by: Elio Bischof <eliobischof@gmail.com>
This commit is contained in:
Stefan Benz
2021-02-05 19:28:12 +01:00
committed by GitHub
parent 3428046ffa
commit ad25f35539
210 changed files with 18515 additions and 148 deletions

View File

@@ -0,0 +1,71 @@
package backups
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/labels"
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
)
func GetQueryAndDestroyFuncs(
monitor mntr.Monitor,
desiredTree *tree.Tree,
currentTree *tree.Tree,
name string,
namespace string,
componentLabels *labels.Component,
checkDBReady operator.EnsureFunc,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
version string,
features []string,
) (
operator.QueryFunc,
operator.DestroyFunc,
map[string]*secret.Secret,
error,
) {
switch desiredTree.Common.Kind {
case "databases.caos.ch/BucketBackup":
return bucket.AdaptFunc(
name,
namespace,
labels.MustForComponent(
labels.MustReplaceAPI(
labels.GetAPIFromComponent(componentLabels),
"BucketBackup",
desiredTree.Common.Version,
),
"backup"),
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(monitor, desiredTree, currentTree)
default:
return nil, nil, nil, errors.Errorf("unknown database kind %s", desiredTree.Common.Kind)
}
}
func GetBackupList(
monitor mntr.Monitor,
name string,
desiredTree *tree.Tree,
) (
[]string,
error,
) {
switch desiredTree.Common.Kind {
case "databases.caos.ch/BucketBackup":
return bucket.BackupList()(monitor, name, desiredTree)
default:
return nil, errors.Errorf("unknown database kind %s", desiredTree.Common.Kind)
}
}

View File

@@ -0,0 +1,230 @@
package bucket
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/secret"
"github.com/caos/orbos/pkg/labels"
secretpkg "github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
coreDB "github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
)
const (
secretName = "backup-serviceaccountjson"
secretKey = "serviceaccountjson"
)
func AdaptFunc(
name string,
namespace string,
componentLabels *labels.Component,
checkDBReady operator.EnsureFunc,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
version string,
features []string,
) operator.AdaptFunc {
return func(monitor mntr.Monitor, desired *tree.Tree, current *tree.Tree) (queryFunc operator.QueryFunc, destroyFunc operator.DestroyFunc, secrets map[string]*secretpkg.Secret, err error) {
internalMonitor := monitor.WithField("component", "backup")
desiredKind, err := ParseDesiredV0(desired)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "parsing desired state failed")
}
desired.Parsed = desiredKind
if !monitor.IsVerbose() && desiredKind.Spec.Verbose {
internalMonitor.Verbose()
}
destroyS, err := secret.AdaptFuncToDestroy(namespace, secretName)
if err != nil {
return nil, nil, nil, err
}
queryS, err := secret.AdaptFuncToEnsure(namespace, labels.MustForName(componentLabels, secretName), map[string]string{secretKey: desiredKind.Spec.ServiceAccountJSON.Value})
if err != nil {
return nil, nil, nil, err
}
_, destroyB, err := backup.AdaptFunc(
internalMonitor,
name,
namespace,
componentLabels,
[]string{},
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
if err != nil {
return nil, nil, nil, err
}
_, destroyR, err := restore.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
[]string{},
desiredKind.Spec.Bucket,
timestamp,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
if err != nil {
return nil, nil, nil, err
}
_, destroyC, err := clean.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
[]string{},
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
if err != nil {
return nil, nil, nil, err
}
destroyers := make([]operator.DestroyFunc, 0)
for _, feature := range features {
switch feature {
case backup.Normal, backup.Instant:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroyS),
destroyB,
)
case clean.Instant:
destroyers = append(destroyers,
destroyC,
)
case restore.Instant:
destroyers = append(destroyers,
destroyR,
)
}
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
currentDB, err := coreDB.ParseQueriedForDatabase(queried)
if err != nil {
return nil, err
}
databases, err := currentDB.GetListDatabasesFunc()(k8sClient)
if err != nil {
databases = []string{}
}
queryB, _, err := backup.AdaptFunc(
internalMonitor,
name,
namespace,
componentLabels,
databases,
checkDBReady,
desiredKind.Spec.Bucket,
desiredKind.Spec.Cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
if err != nil {
return nil, err
}
queryR, _, err := restore.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
databases,
desiredKind.Spec.Bucket,
timestamp,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
if err != nil {
return nil, err
}
queryC, _, err := clean.AdaptFunc(
monitor,
name,
namespace,
componentLabels,
databases,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
if err != nil {
return nil, err
}
queriers := make([]operator.QueryFunc, 0)
if databases != nil && len(databases) != 0 {
for _, feature := range features {
switch feature {
case backup.Normal, backup.Instant:
queriers = append(queriers,
operator.ResourceQueryToZitadelQuery(queryS),
queryB,
)
case clean.Instant:
queriers = append(queriers,
queryC,
)
case restore.Instant:
queriers = append(queriers,
queryR,
)
}
}
}
return operator.QueriersToEnsureFunc(internalMonitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(internalMonitor, destroyers),
getSecretsMap(desiredKind),
nil
}
}

View File

@@ -0,0 +1,372 @@
package bucket
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"testing"
)
func TestBucket_Secrets(t *testing.T) {
masterkey := "testMk"
features := []string{backup.Normal}
saJson := "testSA"
bucketName := "testBucket2"
cron := "testCron2"
monitor := mntr.Monitor{}
namespace := "testNs2"
kindVersion := "v0"
kind := "BucketBackup"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", kindVersion), "testComponent")
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/" + kind,
Version: kindVersion,
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
allSecrets := map[string]string{
"serviceaccountjson": saJson,
}
_, _, secrets, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
for key, value := range allSecrets {
assert.Contains(t, secrets, key)
assert.Equal(t, value, secrets[key].Value)
}
}
func TestBucket_AdaptBackup(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{backup.Normal}
saJson := "testSA"
bucketName := "testBucket2"
cron := "testCron2"
monitor := mntr.Monitor{}
namespace := "testNs2"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": "backup-serviceaccountjson",
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetBackup(client, namespace, k8sLabels, saJson)
query, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases)
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NotNil(t, ensure)
assert.NoError(t, ensure(client))
}
func TestBucket_AdaptInstantBackup(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{backup.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": "backup-serviceaccountjson",
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
saJson := "testSA"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetInstantBackup(client, namespace, backupName, k8sLabels, saJson)
query, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases)
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBucket_AdaptRestore(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{restore.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": "backup-serviceaccountjson",
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testVersion",
"caos.ch/apiversion": "v0",
"caos.ch/kind": "BucketBackup",
}
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
saJson := "testSA"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetRestore(client, namespace, backupName, k8sLabels, saJson)
query, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases)
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBucket_AdaptClean(t *testing.T) {
masterkey := "testMk"
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{clean.Instant}
bucketName := "testBucket1"
cron := "testCron"
monitor := mntr.Monitor{}
namespace := "testNs"
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "BucketBackup", "v0"), "testComponent")
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
saJson := "testSA"
desired := getDesiredTree(t, masterkey, &DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
},
},
})
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
SetClean(client, namespace, backupName)
query, _, _, err := AdaptFunc(
backupName,
namespace,
componentLabels,
checkDBReady,
timestamp,
nodeselector,
tolerations,
version,
features,
)(
monitor,
desired,
&tree.Tree{},
)
assert.NoError(t, err)
databases := []string{"test1", "test2"}
queried := SetQueriedForDatabases(databases)
ensure, err := query(client, queried)
assert.NotNil(t, ensure)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}

View File

@@ -0,0 +1,136 @@
package backup
import (
"github.com/caos/zitadel/operator"
"time"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/cronjob"
"github.com/caos/orbos/pkg/kubernetes/resources/job"
"github.com/caos/orbos/pkg/labels"
corev1 "k8s.io/api/core/v1"
)
const (
defaultMode int32 = 256
certPath = "/cockroach/cockroach-certs"
secretPath = "/secrets/sa.json"
backupPath = "/cockroach"
backupNameEnv = "BACKUP_NAME"
cronJobNamePrefix = "backup-"
internalSecretName = "client-certs"
image = "ghcr.io/caos/zitadel-crbackup"
rootSecretName = "cockroachdb.client.root"
timeout time.Duration = 60
Normal = "backup"
Instant = "instantbackup"
)
func AdaptFunc(
monitor mntr.Monitor,
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
checkDBReady operator.EnsureFunc,
bucketName string,
cron string,
secretName string,
secretKey string,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
features []string,
version string,
) (
queryFunc operator.QueryFunc,
destroyFunc operator.DestroyFunc,
err error,
) {
command := getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
)
jobSpecDef := getJobSpecDef(
nodeselector,
tolerations,
secretName,
secretKey,
backupName,
version,
command,
)
destroyers := []operator.DestroyFunc{}
queriers := []operator.QueryFunc{}
cronJobDef := getCronJob(
namespace,
labels.MustForName(componentLabels, GetJobName(backupName)),
cron,
jobSpecDef,
)
destroyCJ, err := cronjob.AdaptFuncToDestroy(cronJobDef.Namespace, cronJobDef.Name)
if err != nil {
return nil, nil, err
}
queryCJ, err := cronjob.AdaptFuncToEnsure(cronJobDef)
if err != nil {
return nil, nil, err
}
jobDef := getJob(
namespace,
labels.MustForName(componentLabels, cronJobNamePrefix+backupName),
jobSpecDef,
)
destroyJ, err := job.AdaptFuncToDestroy(jobDef.Namespace, jobDef.Name)
if err != nil {
return nil, nil, err
}
queryJ, err := job.AdaptFuncToEnsure(jobDef)
if err != nil {
return nil, nil, err
}
for _, feature := range features {
switch feature {
case Normal:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroyCJ),
)
queriers = append(queriers,
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryCJ),
)
case Instant:
destroyers = append(destroyers,
operator.ResourceDestroyToZitadelDestroy(destroyJ),
)
queriers = append(queriers,
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryJ),
operator.EnsureFuncToQueryFunc(getCleanupFunc(monitor, jobDef.Namespace, jobDef.Name)),
)
}
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
return operator.QueriersToEnsureFunc(monitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(monitor, destroyers),
nil
}
func GetJobName(backupName string) string {
return cronJobNamePrefix + backupName
}

View File

@@ -0,0 +1,307 @@
package backup
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"testing"
)
func TestBackup_AdaptInstantBackup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Instant}
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
secretKey := "testKey"
secretName := "testSecretName"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
getJobSpecDef(
nodeselector,
tolerations,
secretName,
secretKey,
backupName,
version,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
),
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptInstantBackup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Instant}
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb2"}
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
getJobSpecDef(
nodeselector,
tolerations,
secretName,
secretKey,
backupName,
version,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
),
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptBackup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
bucketName := "testBucket"
cron := "testCron"
timestamp := "test"
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
secretKey := "testKey"
secretName := "testSecretName"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getCronJob(
namespace,
nameLabels,
cron,
getJobSpecDef(
nodeselector,
tolerations,
secretName,
secretKey,
backupName,
version,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
),
),
)
client.EXPECT().ApplyCronJob(jobDef).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_AdaptBackup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
features := []string{Normal}
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb2"}
bucketName := "testBucket2"
cron := "testCron2"
timestamp := "test2"
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getCronJob(
namespace,
nameLabels,
cron,
getJobSpecDef(
nodeselector,
tolerations,
secretName,
secretKey,
backupName,
version,
getBackupCommand(
timestamp,
databases,
bucketName,
backupName,
),
),
)
client.EXPECT().ApplyCronJob(jobDef).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
checkDBReady,
bucketName,
cron,
secretName,
secretKey,
timestamp,
nodeselector,
tolerations,
features,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}

View File

@@ -0,0 +1,25 @@
package backup
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator"
"github.com/pkg/errors"
)
func getCleanupFunc(monitor mntr.Monitor, namespace string, name string) operator.EnsureFunc {
return func(k8sClient kubernetes.ClientInt) error {
monitor.Info("waiting for backup to be completed")
if err := k8sClient.WaitUntilJobCompleted(namespace, name, timeout); err != nil {
monitor.Error(errors.Wrap(err, "error while waiting for backup to be completed"))
return err
}
monitor.Info("backup is completed, cleanup")
if err := k8sClient.DeleteJob(namespace, name); err != nil {
monitor.Error(errors.Wrap(err, "error while trying to cleanup backup"))
return err
}
monitor.Info("restore backup is completed")
return nil
}
}

View File

@@ -0,0 +1,40 @@
package backup
import (
"github.com/caos/orbos/mntr"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Cleanup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test"
namespace := "testNs"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}
func TestBackup_Cleanup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test2"
namespace := "testNs2"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}

View File

@@ -0,0 +1,33 @@
package backup
import "strings"
func getBackupCommand(
timestamp string,
databases []string,
bucketName string,
backupName string,
) string {
backupCommands := make([]string, 0)
if timestamp != "" {
backupCommands = append(backupCommands, "export "+backupNameEnv+"="+timestamp)
} else {
backupCommands = append(backupCommands, "export "+backupNameEnv+"=$(date +%Y-%m-%dT%H:%M:%SZ)")
}
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/backup.sh",
backupName,
bucketName,
database,
backupPath,
secretPath,
certPath,
"${" + backupNameEnv + "}",
}, " "))
}
return strings.Join(backupCommands, " && ")
}

View File

@@ -0,0 +1,53 @@
package backup
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Command1(t *testing.T) {
timestamp := ""
databases := []string{}
bucketName := "test"
backupName := "test"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ)"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := "test"
databases := []string{}
bucketName := "test"
backupName := "test"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=test"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command3(t *testing.T) {
timestamp := ""
databases := []string{"testDb"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=$(date +%Y-%m-%dT%H:%M:%SZ) && /scripts/backup.sh testBackup testBucket testDb " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "}"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command4(t *testing.T) {
timestamp := "test"
databases := []string{"test1", "test2", "test3"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getBackupCommand(timestamp, databases, bucketName, backupName)
equals := "export " + backupNameEnv + "=test && " +
"/scripts/backup.sh testBackup testBucket test1 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "} && " +
"/scripts/backup.sh testBackup testBucket test2 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "} && " +
"/scripts/backup.sh testBackup testBucket test3 " + backupPath + " " + secretPath + " " + certPath + " ${" + backupNameEnv + "}"
assert.Equal(t, equals, cmd)
}

View File

@@ -0,0 +1,101 @@
package backup
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getCronJob(
namespace string,
nameLabels *labels.Name,
cron string,
jobSpecDef batchv1.JobSpec,
) *v1beta1.CronJob {
return &v1beta1.CronJob{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: v1beta1.CronJobSpec{
Schedule: cron,
ConcurrencyPolicy: v1beta1.ForbidConcurrent,
JobTemplate: v1beta1.JobTemplateSpec{
Spec: jobSpecDef,
},
},
}
}
func getJob(
namespace string,
nameLabels *labels.Name,
jobSpecDef batchv1.JobSpec,
) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: jobSpecDef,
}
}
func getJobSpecDef(
nodeselector map[string]string,
tolerations []corev1.Toleration,
secretName string,
secretKey string,
backupName string,
version string,
command string,
) batchv1.JobSpec {
return batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
}
}

View File

@@ -0,0 +1,123 @@
package backup
import (
"github.com/caos/zitadel/operator/helpers"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"testing"
)
func TestBackup_JobSpec1(t *testing.T) {
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
command := "test"
secretKey := "testKey"
secretName := "testSecretName"
equals := batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
}
assert.Equal(t, equals, getJobSpecDef(nodeselector, tolerations, secretName, secretKey, backupName, version, command))
}
func TestBackup_JobSpec2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
command := "test2"
secretKey := "testKey2"
secretName := "testSecretName2"
equals := batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: backupName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
}
assert.Equal(t, equals, getJobSpecDef(nodeselector, tolerations, secretName, secretKey, backupName, version, command))
}

View File

@@ -0,0 +1,86 @@
package clean
import (
"github.com/caos/zitadel/operator"
"time"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/job"
"github.com/caos/orbos/pkg/labels"
corev1 "k8s.io/api/core/v1"
)
const (
Instant = "clean"
defaultMode = int32(256)
certPath = "/cockroach/cockroach-certs"
secretPath = "/secrets/sa.json"
internalSecretName = "client-certs"
image = "ghcr.io/caos/zitadel-crbackup"
rootSecretName = "cockroachdb.client.root"
jobPrefix = "backup-"
jobSuffix = "-clean"
timeout time.Duration = 60
)
func AdaptFunc(
monitor mntr.Monitor,
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
checkDBReady operator.EnsureFunc,
secretName string,
secretKey string,
version string,
) (
queryFunc operator.QueryFunc,
destroyFunc operator.DestroyFunc,
err error,
) {
command := getCommand(databases)
jobDef := getJob(
namespace,
labels.MustForName(componentLabels, GetJobName(backupName)),
nodeselector,
tolerations,
secretName,
secretKey,
version,
command)
destroyJ, err := job.AdaptFuncToDestroy(jobDef.Namespace, jobDef.Name)
if err != nil {
return nil, nil, err
}
destroyers := []operator.DestroyFunc{
operator.ResourceDestroyToZitadelDestroy(destroyJ),
}
queryJ, err := job.AdaptFuncToEnsure(jobDef)
if err != nil {
return nil, nil, err
}
queriers := []operator.QueryFunc{
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryJ),
operator.EnsureFuncToQueryFunc(getCleanupFunc(monitor, jobDef.Namespace, jobDef.Name)),
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
return operator.QueriersToEnsureFunc(monitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(monitor, destroyers),
nil
}
func GetJobName(backupName string) string {
return jobPrefix + backupName + jobSuffix
}

View File

@@ -0,0 +1,134 @@
package clean
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"testing"
)
func TestBackup_Adapt1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
backupName := "testName"
version := "testVersion"
secretKey := "testKey"
secretName := "testSecretName"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
version,
getCommand(
databases,
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_Adapt2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb1", "testDb2"}
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
backupName := "testName2"
version := "testVersion2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
version,
getCommand(
databases,
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}

View File

@@ -0,0 +1,29 @@
package clean
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator"
"github.com/pkg/errors"
)
func getCleanupFunc(
monitor mntr.Monitor,
namespace string,
jobName string,
) operator.EnsureFunc {
return func(k8sClient kubernetes.ClientInt) error {
monitor.Info("waiting for clean to be completed")
if err := k8sClient.WaitUntilJobCompleted(namespace, jobName, 60); err != nil {
monitor.Error(errors.Wrap(err, "error while waiting for clean to be completed"))
return err
}
monitor.Info("clean is completed, cleanup")
if err := k8sClient.DeleteJob(namespace, jobName); err != nil {
monitor.Error(errors.Wrap(err, "error while trying to cleanup clean"))
return err
}
monitor.Info("clean cleanup is completed")
return nil
}
}

View File

@@ -0,0 +1,40 @@
package clean
import (
"github.com/caos/orbos/mntr"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Cleanup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test"
namespace := "testNs"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}
func TestBackup_Cleanup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test2"
namespace := "testNs2"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}

View File

@@ -0,0 +1,32 @@
package clean
import "strings"
func getCommand(
databases []string,
) string {
backupCommands := make([]string, 0)
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-db.sh",
certPath,
database,
}, " "))
}
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-user.sh",
certPath,
database,
}, " "))
}
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/clean-migration.sh",
certPath,
}, " "))
return strings.Join(backupCommands, " && ")
}

View File

@@ -0,0 +1,35 @@
package clean
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestClean_Command1(t *testing.T) {
databases := []string{}
cmd := getCommand(databases)
equals := "/scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}
func TestClean_Command2(t *testing.T) {
databases := []string{"test"}
cmd := getCommand(databases)
equals := "/scripts/clean-db.sh " + certPath + " test && /scripts/clean-user.sh " + certPath + " test && /scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}
func TestClean_Command3(t *testing.T) {
databases := []string{"test1", "test2", "test3"}
cmd := getCommand(databases)
equals := "/scripts/clean-db.sh " + certPath + " test1 && /scripts/clean-db.sh " + certPath + " test2 && /scripts/clean-db.sh " + certPath + " test3 && " +
"/scripts/clean-user.sh " + certPath + " test1 && /scripts/clean-user.sh " + certPath + " test2 && /scripts/clean-user.sh " + certPath + " test3 && " +
"/scripts/clean-migration.sh " + certPath
assert.Equal(t, equals, cmd)
}

View File

@@ -0,0 +1,73 @@
package clean
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getJob(
namespace string,
nameLabels *labels.Name,
nodeselector map[string]string,
tolerations []corev1.Toleration,
secretName string,
secretKey string,
version string,
command string,
) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeSelector: nodeselector,
Tolerations: tolerations,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{{
Name: nameLabels.Name(),
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
}

View File

@@ -0,0 +1,164 @@
package clean
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)
func TestBackup_Job1(t *testing.T) {
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
version := "testVersion"
command := "test"
secretKey := "testKey"
secretName := "testSecretName"
jobName := "testJob"
namespace := "testNs"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": jobName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testOpVersion",
"caos.ch/apiversion": "testVersion",
"caos.ch/kind": "testKind"}
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testOpVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
equals :=
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
Labels: k8sLabels,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: jobName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, version, command))
}
func TestBackup_Job2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
version := "testVersion2"
command := "test2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := "testJob2"
namespace := "testNs2"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent2",
"app.kubernetes.io/managed-by": "testOp2",
"app.kubernetes.io/name": jobName,
"app.kubernetes.io/part-of": "testProd2",
"app.kubernetes.io/version": "testOpVersion2",
"caos.ch/apiversion": "testVersion2",
"caos.ch/kind": "testKind2"}
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testOpVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
equals :=
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
Labels: k8sLabels,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: jobName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, version, command))
}

View File

@@ -0,0 +1,42 @@
package bucket
import (
secret2 "github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/pkg/errors"
)
type DesiredV0 struct {
Common *tree.Common `yaml:",inline"`
Spec *Spec
}
type Spec struct {
Verbose bool
Cron string `yaml:"cron,omitempty"`
Bucket string `yaml:"bucket,omitempty"`
ServiceAccountJSON *secret2.Secret `yaml:"serviceAccountJSON,omitempty"`
}
func (s *Spec) IsZero() bool {
if (s.ServiceAccountJSON == nil || s.ServiceAccountJSON.IsZero()) &&
!s.Verbose &&
s.Cron == "" &&
s.Bucket == "" {
return true
}
return false
}
func ParseDesiredV0(desiredTree *tree.Tree) (*DesiredV0, error) {
desiredKind := &DesiredV0{
Common: desiredTree.Common,
Spec: &Spec{},
}
if err := desiredTree.Original.Decode(desiredKind); err != nil {
return nil, errors.Wrap(err, "parsing desired state failed")
}
return desiredKind, nil
}

View File

@@ -0,0 +1,129 @@
package bucket
import (
"github.com/caos/orbos/pkg/secret"
"github.com/caos/orbos/pkg/tree"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
"testing"
)
const (
masterkey = "testMk"
cron = "testCron"
bucketName = "testBucket"
saJson = "testSa"
yamlFile = `kind: databases.caos.ch/BucketBackup
version: v0
spec:
verbose: true
cron: testCron
bucket: testBucket
serviceAccountJSON:
encryption: AES256
encoding: Base64
value: luyAqtopzwLcaIhJj7KhWmbUsA7cQg==
`
yamlFileWithoutSecret = `kind: databases.caos.ch/BucketBackup
version: v0
spec:
verbose: true
cron: testCron
bucket: testBucket
`
yamlEmpty = `kind: databases.caos.ch/BucketBackup
version: v0`
)
var (
desired = DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
ServiceAccountJSON: &secret.Secret{
Value: saJson,
Encryption: "AES256",
Encoding: "Base64",
},
},
}
desiredWithoutSecret = DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: true,
Cron: cron,
Bucket: bucketName,
},
}
desiredEmpty = DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
Spec: &Spec{
Verbose: false,
Cron: "",
Bucket: "",
ServiceAccountJSON: &secret.Secret{
Value: "",
},
},
}
desiredNil = DesiredV0{
Common: &tree.Common{
Kind: "databases.caos.ch/BucketBackup",
Version: "v0",
},
}
)
func marshalYaml(t *testing.T, masterkey string, struc *DesiredV0) []byte {
secret.Masterkey = masterkey
data, err := yaml.Marshal(struc)
assert.NoError(t, err)
return data
}
func unmarshalYaml(t *testing.T, masterkey string, yamlFile []byte) *tree.Tree {
secret.Masterkey = masterkey
desiredTree := &tree.Tree{}
assert.NoError(t, yaml.Unmarshal(yamlFile, desiredTree))
return desiredTree
}
func getDesiredTree(t *testing.T, masterkey string, desired *DesiredV0) *tree.Tree {
return unmarshalYaml(t, masterkey, marshalYaml(t, masterkey, desired))
}
func TestBucket_DesiredParse(t *testing.T) {
assert.Equal(t, yamlFileWithoutSecret, string(marshalYaml(t, masterkey, &desiredWithoutSecret)))
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlFile))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.Equal(t, &desired, desiredKind)
}
func TestBucket_DesiredNotZero(t *testing.T) {
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlFile))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.False(t, desiredKind.Spec.IsZero())
}
func TestBucket_DesiredZero(t *testing.T) {
desiredTree := unmarshalYaml(t, masterkey, []byte(yamlEmpty))
desiredKind, err := ParseDesiredV0(desiredTree)
assert.NoError(t, err)
assert.True(t, desiredKind.Spec.IsZero())
}

View File

@@ -0,0 +1,63 @@
package bucket
import (
"cloud.google.com/go/storage"
"context"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/tree"
"github.com/caos/zitadel/operator/database/kinds/backups/core"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"strings"
)
func BackupList() core.BackupListFunc {
return func(monitor mntr.Monitor, name string, desired *tree.Tree) ([]string, error) {
desiredKind, err := ParseDesiredV0(desired)
if err != nil {
return nil, errors.Wrap(err, "parsing desired state failed")
}
desired.Parsed = desiredKind
if !monitor.IsVerbose() && desiredKind.Spec.Verbose {
monitor.Verbose()
}
return listFilesWithFilter(desiredKind.Spec.ServiceAccountJSON.Value, desiredKind.Spec.Bucket, name)
}
}
func listFilesWithFilter(serviceAccountJSON string, bucketName, name string) ([]string, error) {
ctx := context.Background()
client, err := storage.NewClient(ctx, option.WithCredentialsJSON([]byte(serviceAccountJSON)))
if err != nil {
return nil, err
}
bkt := client.Bucket(bucketName)
names := make([]string, 0)
it := bkt.Objects(ctx, &storage.Query{Prefix: name + "/"})
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
parts := strings.Split(attrs.Name, "/")
found := false
for _, name := range names {
if name == parts[1] {
found = true
}
}
if !found {
names = append(names, parts[1])
}
}
return names, nil
}

View File

@@ -0,0 +1,97 @@
package bucket
import (
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/backup"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/clean"
"github.com/caos/zitadel/operator/database/kinds/backups/bucket/restore"
"github.com/caos/zitadel/operator/database/kinds/databases/core"
"github.com/golang/mock/gomock"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func SetQueriedForDatabases(databases []string) map[string]interface{} {
queried := map[string]interface{}{}
core.SetQueriedForDatabaseDBList(queried, databases)
return queried
}
func SetInstantBackup(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
labels map[string]string,
saJson string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
Labels: labels,
},
StringData: map[string]string{secretKey: saJson},
Type: "Opaque",
}).Times(1).Return(nil)
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, backup.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, backup.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, backup.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, backup.GetJobName(backupName)).Times(1).Return(nil)
}
func SetBackup(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
labels map[string]string,
saJson string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
Labels: labels,
},
StringData: map[string]string{secretKey: saJson},
Type: "Opaque",
}).Times(1).Return(nil)
k8sClient.EXPECT().ApplyCronJob(gomock.Any()).Times(1).Return(nil)
}
func SetClean(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
) {
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, clean.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, clean.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, clean.GetJobName(backupName)).Times(1).Return(nil)
}
func SetRestore(
k8sClient *kubernetesmock.MockClientInt,
namespace string,
backupName string,
labels map[string]string,
saJson string,
) {
k8sClient.EXPECT().ApplySecret(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
Labels: labels,
},
StringData: map[string]string{secretKey: saJson},
Type: "Opaque",
}).Times(1).Return(nil)
k8sClient.EXPECT().ApplyJob(gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().GetJob(namespace, restore.GetJobName(backupName)).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, restore.GetJobName(backupName)))
k8sClient.EXPECT().WaitUntilJobCompleted(namespace, restore.GetJobName(backupName), gomock.Any()).Times(1).Return(nil)
k8sClient.EXPECT().DeleteJob(namespace, restore.GetJobName(backupName)).Times(1).Return(nil)
}

View File

@@ -0,0 +1,95 @@
package restore
import (
"github.com/caos/zitadel/operator"
"time"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/resources/job"
"github.com/caos/orbos/pkg/labels"
corev1 "k8s.io/api/core/v1"
)
const (
Instant = "restore"
defaultMode = int32(256)
certPath = "/cockroach/cockroach-certs"
secretPath = "/secrets/sa.json"
jobPrefix = "backup-"
jobSuffix = "-restore"
image = "ghcr.io/caos/zitadel-crbackup"
internalSecretName = "client-certs"
rootSecretName = "cockroachdb.client.root"
timeout time.Duration = 60
)
func AdaptFunc(
monitor mntr.Monitor,
backupName string,
namespace string,
componentLabels *labels.Component,
databases []string,
bucketName string,
timestamp string,
nodeselector map[string]string,
tolerations []corev1.Toleration,
checkDBReady operator.EnsureFunc,
secretName string,
secretKey string,
version string,
) (
queryFunc operator.QueryFunc,
destroyFunc operator.DestroyFunc,
err error,
) {
jobName := jobPrefix + backupName + jobSuffix
command := getCommand(
timestamp,
databases,
bucketName,
backupName,
)
jobdef := getJob(
namespace,
labels.MustForName(componentLabels, GetJobName(backupName)),
nodeselector,
tolerations,
secretName,
secretKey,
version,
command)
destroyJ, err := job.AdaptFuncToDestroy(jobName, namespace)
if err != nil {
return nil, nil, err
}
destroyers := []operator.DestroyFunc{
operator.ResourceDestroyToZitadelDestroy(destroyJ),
}
queryJ, err := job.AdaptFuncToEnsure(jobdef)
if err != nil {
return nil, nil, err
}
queriers := []operator.QueryFunc{
operator.EnsureFuncToQueryFunc(checkDBReady),
operator.ResourceQueryToZitadelQuery(queryJ),
operator.EnsureFuncToQueryFunc(getCleanupFunc(monitor, jobdef.Namespace, jobdef.Name)),
}
return func(k8sClient kubernetes.ClientInt, queried map[string]interface{}) (operator.EnsureFunc, error) {
return operator.QueriersToEnsureFunc(monitor, false, queriers, k8sClient, queried)
},
operator.DestroyersToDestroyFunc(monitor, destroyers),
nil
}
func GetJobName(backupName string) string {
return jobPrefix + backupName + jobSuffix
}

View File

@@ -0,0 +1,148 @@
package restore
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/caos/orbos/pkg/labels"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
macherrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"testing"
)
func TestBackup_Adapt1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
namespace := "testNs"
databases := []string{"testDb"}
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
timestamp := "testTs"
backupName := "testName2"
bucketName := "testBucket2"
version := "testVersion"
secretKey := "testKey"
secretName := "testSecretName"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
version,
getCommand(
timestamp,
databases,
bucketName,
backupName,
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
bucketName,
timestamp,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}
func TestBackup_Adapt2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
namespace := "testNs2"
databases := []string{"testDb1", "testDb2"}
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
timestamp := "testTs"
backupName := "testName2"
bucketName := "testBucket2"
version := "testVersion2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := GetJobName(backupName)
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
return nil
}
jobDef := getJob(
namespace,
nameLabels,
nodeselector,
tolerations,
secretName,
secretKey,
version,
getCommand(
timestamp,
databases,
bucketName,
backupName,
),
)
client.EXPECT().ApplyJob(jobDef).Times(1).Return(nil)
client.EXPECT().GetJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil, macherrs.NewNotFound(schema.GroupResource{"batch", "jobs"}, jobName))
client.EXPECT().WaitUntilJobCompleted(jobDef.Namespace, jobDef.Name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(jobDef.Namespace, jobDef.Name).Times(1).Return(nil)
query, _, err := AdaptFunc(
monitor,
backupName,
namespace,
componentLabels,
databases,
bucketName,
timestamp,
nodeselector,
tolerations,
checkDBReady,
secretName,
secretKey,
version,
)
assert.NoError(t, err)
queried := map[string]interface{}{}
ensure, err := query(client, queried)
assert.NoError(t, err)
assert.NoError(t, ensure(client))
}

View File

@@ -0,0 +1,25 @@
package restore
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/zitadel/operator"
"github.com/pkg/errors"
)
func getCleanupFunc(monitor mntr.Monitor, namespace, jobName string) operator.EnsureFunc {
return func(k8sClient kubernetes.ClientInt) error {
monitor.Info("waiting for restore to be completed")
if err := k8sClient.WaitUntilJobCompleted(namespace, jobName, timeout); err != nil {
monitor.Error(errors.Wrap(err, "error while waiting for restore to be completed"))
return err
}
monitor.Info("restore is completed, cleanup")
if err := k8sClient.DeleteJob(namespace, jobName); err != nil {
monitor.Error(errors.Wrap(err, "error while trying to cleanup restore"))
return err
}
monitor.Info("restore cleanup is completed")
return nil
}
}

View File

@@ -0,0 +1,40 @@
package restore
import (
"errors"
"github.com/caos/orbos/mntr"
kubernetesmock "github.com/caos/orbos/pkg/kubernetes/mock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Cleanup1(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test"
namespace := "testNs"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}
func TestBackup_Cleanup2(t *testing.T) {
client := kubernetesmock.NewMockClientInt(gomock.NewController(t))
monitor := mntr.Monitor{}
name := "test2"
namespace := "testNs2"
cleanupFunc := getCleanupFunc(monitor, namespace, name)
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(nil)
client.EXPECT().DeleteJob(namespace, name).Times(1)
assert.NoError(t, cleanupFunc(client))
client.EXPECT().WaitUntilJobCompleted(namespace, name, timeout).Times(1).Return(errors.New("fail"))
assert.Error(t, cleanupFunc(client))
}

View File

@@ -0,0 +1,28 @@
package restore
import "strings"
func getCommand(
timestamp string,
databases []string,
bucketName string,
backupName string,
) string {
backupCommands := make([]string, 0)
for _, database := range databases {
backupCommands = append(backupCommands,
strings.Join([]string{
"/scripts/restore.sh",
bucketName,
backupName,
timestamp,
database,
secretPath,
certPath,
}, " "))
}
return strings.Join(backupCommands, " && ")
}

View File

@@ -0,0 +1,50 @@
package restore
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestBackup_Command1(t *testing.T) {
timestamp := ""
databases := []string{}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := ""
assert.Equal(t, equals, cmd)
}
func TestBackup_Command2(t *testing.T) {
timestamp := ""
databases := []string{"testDb"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := "/scripts/restore.sh testBucket testBackup testDb /secrets/sa.json /cockroach/cockroach-certs"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command3(t *testing.T) {
timestamp := "test"
databases := []string{"testDb"}
bucketName := "testBucket"
backupName := "testBackup"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := "/scripts/restore.sh testBucket testBackup test testDb /secrets/sa.json /cockroach/cockroach-certs"
assert.Equal(t, equals, cmd)
}
func TestBackup_Command4(t *testing.T) {
timestamp := ""
databases := []string{}
bucketName := "test"
backupName := "test"
cmd := getCommand(timestamp, databases, bucketName, backupName)
equals := ""
assert.Equal(t, equals, cmd)
}

View File

@@ -0,0 +1,72 @@
package restore
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getJob(
namespace string,
nameLabels *labels.Name,
nodeselector map[string]string,
tolerations []corev1.Toleration,
secretName string,
secretKey string,
version string,
command string,
) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: v1.ObjectMeta{
Name: nameLabels.Name(),
Namespace: namespace,
Labels: labels.MustK8sMap(nameLabels),
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeSelector: nodeselector,
Tolerations: tolerations,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{{
Name: nameLabels.Name(),
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
}

View File

@@ -0,0 +1,163 @@
package restore
import (
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator/helpers"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)
func TestBackup_Job1(t *testing.T) {
nodeselector := map[string]string{"test": "test"}
tolerations := []corev1.Toleration{
{Key: "testKey", Operator: "testOp"}}
version := "testVersion"
command := "test"
secretKey := "testKey"
secretName := "testSecretName"
jobName := "testJob"
namespace := "testNs"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent",
"app.kubernetes.io/managed-by": "testOp",
"app.kubernetes.io/name": jobName,
"app.kubernetes.io/part-of": "testProd",
"app.kubernetes.io/version": "testOpVersion",
"caos.ch/apiversion": "testVersion",
"caos.ch/kind": "testKind"}
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd", "testOp", "testOpVersion"), "testKind", "testVersion"), "testComponent")
nameLabels := labels.MustForName(componentLabels, jobName)
equals :=
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
Labels: k8sLabels,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: jobName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, version, command))
}
func TestBackup_Job2(t *testing.T) {
nodeselector := map[string]string{"test2": "test2"}
tolerations := []corev1.Toleration{
{Key: "testKey2", Operator: "testOp2"}}
version := "testVersion2"
command := "test2"
secretKey := "testKey2"
secretName := "testSecretName2"
jobName := "testJob2"
namespace := "testNs2"
k8sLabels := map[string]string{
"app.kubernetes.io/component": "testComponent2",
"app.kubernetes.io/managed-by": "testOp2",
"app.kubernetes.io/name": jobName,
"app.kubernetes.io/part-of": "testProd2",
"app.kubernetes.io/version": "testOpVersion2",
"caos.ch/apiversion": "testVersion2",
"caos.ch/kind": "testKind2"}
componentLabels := labels.MustForComponent(labels.MustForAPI(labels.MustForOperator("testProd2", "testOp2", "testOpVersion2"), "testKind2", "testVersion2"), "testComponent2")
nameLabels := labels.MustForName(componentLabels, jobName)
equals :=
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
Labels: k8sLabels,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
NodeSelector: nodeselector,
Tolerations: tolerations,
Containers: []corev1.Container{{
Name: jobName,
Image: image + ":" + version,
Command: []string{
"/bin/bash",
"-c",
command,
},
VolumeMounts: []corev1.VolumeMount{{
Name: internalSecretName,
MountPath: certPath,
}, {
Name: secretKey,
SubPath: secretKey,
MountPath: secretPath,
}},
ImagePullPolicy: corev1.PullAlways,
}},
Volumes: []corev1.Volume{{
Name: internalSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecretName,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: secretKey,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretName,
},
},
}},
},
},
},
}
assert.Equal(t, equals, getJob(namespace, nameLabels, nodeselector, tolerations, secretName, secretKey, version, command))
}

View File

@@ -0,0 +1,19 @@
package bucket
import (
"github.com/caos/orbos/pkg/secret"
)
func getSecretsMap(desiredKind *DesiredV0) map[string]*secret.Secret {
secrets := make(map[string]*secret.Secret, 0)
if desiredKind.Spec == nil {
desiredKind.Spec = &Spec{}
}
if desiredKind.Spec.ServiceAccountJSON == nil {
desiredKind.Spec.ServiceAccountJSON = &secret.Secret{}
}
secrets["serviceaccountjson"] = desiredKind.Spec.ServiceAccountJSON
return secrets
}

View File

@@ -0,0 +1,22 @@
package bucket
import (
"github.com/caos/orbos/pkg/secret"
"github.com/stretchr/testify/assert"
"testing"
)
func TestBucket_getSecretsFull(t *testing.T) {
secrets := getSecretsMap(&desired)
assert.Equal(t, desired.Spec.ServiceAccountJSON, secrets["serviceaccountjson"])
}
func TestBucket_getSecretsEmpty(t *testing.T) {
secrets := getSecretsMap(&desiredWithoutSecret)
assert.Equal(t, &secret.Secret{}, secrets["serviceaccountjson"])
}
func TestBucket_getSecretsNil(t *testing.T) {
secrets := getSecretsMap(&desiredNil)
assert.Equal(t, &secret.Secret{}, secrets["serviceaccountjson"])
}

View File

@@ -0,0 +1,8 @@
package core
import (
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/tree"
)
type BackupListFunc func(monitor mntr.Monitor, name string, desired *tree.Tree) ([]string, error)