zitadel/operator/database/kinds/databases/managed/statefulset/adapt.go
Livio Amstutz bacfc3b099
fix: operator picks (#1463)
* feat(crd): add crd mode for operators (#1329)

* feat(operator): add base for zitadel operator

* fix(operator): changed pipeline to release operator

* fix(operator): fmt with only one parameter

* fix(operator): corrected workflow job name

* fix(zitadelctl): added restore and backuplist command

* fix(zitadelctl): scale for restore

* chore(container): use scratch for deploy container

* fix(zitadelctl): limit image to scratch

* fix(migration): added migration scripts for newer version

* fix(operator): changed handling of kubeconfig in operator logic

* fix(operator): changed handling of secrets in operator logic

* fix(operator): use new version of zitadel

* fix(operator): added path for migrations

* fix(operator): delete doublets of migration scripts

* fix(operator): delete subpaths and integrate logic into init container

* fix(operator): corrected path in dockerfile for local migrations

* fix(operator): added migrations for cockroachdb-secure

* fix(operator): delete logic for ambassador module

* fix(operator): added read and write secret commands

* fix(operator): correct and align operator pipeline with zitadel pipeline

* fix(operator): correct yaml error in operator pipeline

* fix(operator): correct action name in operator pipeline

* fix(operator): correct case-sensitive filename in operator pipeline

* fix(operator): upload artifacts from buildx output

* fix(operator): corrected attribute spelling error

* fix(operator): combined jobs for operator binary and image

* fix(operator): added missing comma in operator pipeline

* fix(operator): added codecov for operator image

* fix(operator): added codecov for operator image

* fix(testing): code changes for testing and several unit-tests (#1009)

* fix(operator): usage of interface of kubernetes client for testing and several unit-tests

* fix(operator): several unit-tests

* fix(operator): several unit-tests

* fix(operator): changed order for the operator logic

* fix(operator): added version of zitadelctl from semantic release

* fix(operator): corrected function call with version of zitadelctl

* fix(operator): corrected function call with version of zitadelctl

* fix(operator): add check output to operator release pipeline

* fix(operator): set --short length everywhere to 12

* fix(operator): zitadel setup in job instead of exec with several unit tests

* fix(operator): fixes to combine newest zitadel and testing branch

* fix(operator): corrected path in Dockerfile

* fix(operator): fixed unit-test that was ignored during changes

* fix(operator): fixed unit-test that was ignored during changes

* fix(operator): corrected Dockerfile to correctly use env variable

* fix(operator): quickfix takeoff deployment

* fix(operator): corrected the clusterrolename in the applied artifacts

* fix: update secure migrations

* fix(operator): migrations (#1057)

* fix(operator): copied migrations from orbos repository

* fix(operator): newest migrations

* chore: use cockroach-secure

* fix: rename migration

* fix: remove insecure cockroach migrations

Co-authored-by: Stefan Benz <stefan@caos.ch>

* fix: finalize labels

* fix(operator): cli logging concurrent and fixe deployment of operator during restore

* fix: finalize labels and cli commands

* fix: restore

* chore: cockroachdb is always secure

* chore: use orbos consistent-labels latest commit

* test: make tests compatible with new labels

* fix: default to sa token for start command

* fix: use cockroachdb v12.02

* fix: don't delete flyway user

* test: fix migration test

* fix: use correct table qualifiers

* fix: don't alter sequence ownership

* fix: upgrade flyway

* fix: change ownership of all dbs and tables to admin user

* fix: change defaultdb user

* fix: treat clientid status codes >= 400 as errors

* fix: reconcile specified ZITADEL version, not binary version

* fix: add ca-certs

* fix: use latest orbos code

* fix: use orbos with fixed race condition

* fix: use latest ORBOS code

* fix: use latest ORBOS code

* fix: make migration and scaling around restoring work

* fix(operator): move zitadel operator

* chore(migrations): include owner change migration

* feat(db): add code base for database operator

* fix(db): change used image registry for database operator

* fix(db): generated mock

* fix(db): add accidentally ignored file

* fix(db): add cockroachdb backup image to pipeline

* fix(db): correct pipeline and image versions

* fix(db): correct version of used orbos

* fix(db): correct database import

* fix(db): go mod tidy

* fix(db): use new version for orbos

* fix(migrations): include migrations into zitadelctl binary (#1211)

* fix(db): use statik to integrate migrations into binary

* fix(migrations): corrections unit tests and pipeline for integrated migrations into zitadelctl binary

* fix(migrations): correction in dockerfile for pipeline build

* fix(migrations): correction in dockerfile for pipeline build

* fix(migrations):  dockerfile changes for cache optimization

* fix(database): correct used part-of label in database operator

* fix(database): correct used selectable label in zitadel operator

* fix(operator): correct lables for user secrets in zitadel operator

* fix(operator): correct lables for service test in zitadel operator

* fix: don't enable database features for user operations (#1227)

* fix: don't enable database features for user operations

* fix: omit database feature for connection info adapter

* fix: use latest orbos version

* fix(crd): corrected logic to get database connection and other info

* fix(crd): corrected yaml tags and start for zitadel operator

* fix(crd): move some dependencies and use consistent structure

* fix(crd): corrected unit-tests

* fix(crd): corrected main files for debug starts

* chore(pipeline): use correct version for zitadelctl build

* fix(crd): correct calculating of current db state for zitadel operator

* fix(crd): use binary version for deployment of crd mode operators

* fix(crd): add gitops attribute for reconciling

* fix(crd): corrected crd with newest version

* fix(migration): collect cleanup functions and only use them if all jobs are successful

* fix(zitadelctl): import gcp auth to connect to gke cluster

* feat: Add read and writesecret options for crd mode (#1435)

* fix: don't require orbconfig for crd mode

* test: pass

* fix(zitadelctl): import gcp auth to connect to gke cluster

* feat: add read and writesecret option for crd mode

* test: fix

* fix: make all crd secrets writable

* fix: use in-cluster configs for in-cluster operators

* chore: remove unnecessary debug files

Co-authored-by: Stefan Benz <stefan@caos.ch>

* fix: Crdoperatormerge review (#1385)

* fix: don't require orbconfig for crd mode

* test: pass

* fix(zitadelctl): import gcp auth to connect to gke cluster

* fix: ensure caos-system namespace

* fix: apply orbconfig at takeoff

* docs: improve help for creating an orbconfig

* docs: describe orbconfig properties

* docs: add --gitops to help message example

* fix(pipeline): correct upload of artifacts in dev releases

* test: pass

Co-authored-by: Stefan Benz <stefan@caos.ch>

* fix(test): corrected falsely merged tests

* chore: update orbos library

* fix: only handle exactly named and namespaced crd resource

* fix: print errors, check correct crd namespace

* fix: validate bucket secret

* chore: compile

* fix(operator): corrected secret handling when unused secrets are not defined

* fix(operator): corrected handling of jobs

* fix: dont print logs when readsecret path is provided

* fix(operator): corrected handling of jobs and sort for mounted volumes

* fix(operator): sort for volumes

* fix(operator): change orboos import to newest release

Co-authored-by: Florian Forster <florian@caos.ch>
Co-authored-by: Elio Bischof <eliobischof@gmail.com>

(cherry picked from commit fa9bd5a8e7)

* fix(operator): Standard timeout handling (#1458)

* fix: always use standard time.Duration

* fix: give backup and restore more time

* fix: give backup and restore jobs more time

(cherry picked from commit 7468b7d1e8)

* fix go mod

Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
Co-authored-by: Elio Bischof <eliobischof@gmail.com>
2021-03-24 10:31:19 +01:00

355 lines
10 KiB
Go

package statefulset
import (
"fmt"
"sort"
"strings"
"time"
"github.com/caos/orbos/pkg/labels"
"github.com/caos/zitadel/operator"
"github.com/caos/zitadel/operator/helpers"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/caos/orbos/mntr"
"github.com/caos/orbos/pkg/kubernetes"
"github.com/caos/orbos/pkg/kubernetes/k8s"
"github.com/caos/orbos/pkg/kubernetes/resources"
"github.com/caos/orbos/pkg/kubernetes/resources/statefulset"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
certPath = "/cockroach/cockroach-certs"
clientCertPath = "/cockroach/cockroach-client-certs"
datadirPath = "/cockroach/cockroach-data"
datadirInternal = "datadir"
certsInternal = "certs"
clientCertsInternal = "client-certs"
defaultMode = int32(256)
nodeSecret = "cockroachdb.node"
rootSecret = "cockroachdb.client.root"
)
type Affinity struct {
key string
value string
}
type Affinitys []metav1.LabelSelectorRequirement
func (a Affinitys) Len() int { return len(a) }
func (a Affinitys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Affinitys) Less(i, j int) bool { return a[i].Key < a[j].Key }
func AdaptFunc(
monitor mntr.Monitor,
sfsSelectable *labels.Selectable,
podSelector *labels.Selector,
force bool,
namespace string,
image string,
serviceAccountName string,
replicaCount int,
storageCapacity string,
dbPort int32,
httpPort int32,
storageClass string,
nodeSelector map[string]string,
tolerations []corev1.Toleration,
resourcesSFS *k8s.Resources,
) (
resources.QueryFunc,
resources.DestroyFunc,
operator.EnsureFunc,
operator.EnsureFunc,
func(k8sClient kubernetes.ClientInt) ([]string, error),
error,
) {
internalMonitor := monitor.WithField("component", "statefulset")
quantity, err := resource.ParseQuantity(storageCapacity)
if err != nil {
return nil, nil, nil, nil, nil, err
}
name := sfsSelectable.Name()
k8sSelectable := labels.MustK8sMap(sfsSelectable)
statefulsetDef := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: k8sSelectable,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: name,
Replicas: helpers.PointerInt32(int32(replicaCount)),
Selector: &metav1.LabelSelector{
MatchLabels: labels.MustK8sMap(podSelector),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: k8sSelectable,
},
Spec: corev1.PodSpec{
NodeSelector: nodeSelector,
Tolerations: tolerations,
ServiceAccountName: serviceAccountName,
Affinity: getAffinity(k8sSelectable),
Containers: []corev1.Container{{
Name: name,
Image: image,
ImagePullPolicy: "IfNotPresent",
Ports: []corev1.ContainerPort{
{ContainerPort: dbPort, Name: "grpc"},
{ContainerPort: httpPort, Name: "http"},
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.Parse("http"),
Scheme: "HTTPS",
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 5,
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health?ready=1",
Port: intstr.Parse("http"),
Scheme: "HTTPS",
},
},
InitialDelaySeconds: 10,
PeriodSeconds: 5,
FailureThreshold: 2,
},
VolumeMounts: []corev1.VolumeMount{{
Name: datadirInternal,
MountPath: datadirPath,
}, {
Name: certsInternal,
MountPath: certPath,
}, {
Name: clientCertsInternal,
MountPath: clientCertPath,
}},
Env: []corev1.EnvVar{{
Name: "COCKROACH_CHANNEL",
Value: "kubernetes-multiregion",
}},
Command: []string{
"/bin/bash",
"-ecx",
getJoinExec(
namespace,
name,
int(dbPort),
replicaCount,
),
},
Resources: getResources(resourcesSFS),
}},
Volumes: []corev1.Volume{{
Name: datadirInternal,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: datadirInternal,
},
},
}, {
Name: certsInternal,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: nodeSecret,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}, {
Name: clientCertsInternal,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: rootSecret,
DefaultMode: helpers.PointerInt32(defaultMode),
},
},
}},
},
},
PodManagementPolicy: appsv1.ParallelPodManagement,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: "RollingUpdate",
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: datadirInternal,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.PersistentVolumeAccessMode("ReadWriteOnce"),
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
"storage": quantity,
},
},
StorageClassName: &storageClass,
},
}},
},
}
query, err := statefulset.AdaptFuncToEnsure(statefulsetDef, force)
if err != nil {
return nil, nil, nil, nil, nil, err
}
destroy, err := statefulset.AdaptFuncToDestroy(namespace, name)
if err != nil {
return nil, nil, nil, nil, nil, err
}
wrapedQuery, wrapedDestroy, err := resources.WrapFuncs(internalMonitor, query, destroy)
checkDBRunning := func(k8sClient kubernetes.ClientInt) error {
internalMonitor.Info("waiting for statefulset to be running")
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, false, 60*time.Second); err != nil {
internalMonitor.Error(errors.Wrap(err, "error while waiting for statefulset to be running"))
return err
}
internalMonitor.Info("statefulset is running")
return nil
}
checkDBNotReady := func(k8sClient kubernetes.ClientInt) error {
internalMonitor.Info("checking for statefulset to not be ready")
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, true, 1*time.Second); err != nil {
internalMonitor.Info("statefulset is not ready")
return nil
}
internalMonitor.Info("statefulset is ready")
return errors.New("statefulset is ready")
}
ensureInit := func(k8sClient kubernetes.ClientInt) error {
if err := checkDBRunning(k8sClient); err != nil {
return err
}
if err := checkDBNotReady(k8sClient); err != nil {
return nil
}
command := "/cockroach/cockroach init --certs-dir=" + clientCertPath + " --host=" + name + "-0." + name
if err := k8sClient.ExecInPod(namespace, name+"-0", name, command); err != nil {
return err
}
return nil
}
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
internalMonitor.Info("waiting for statefulset to be ready")
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, true, 60*time.Second); err != nil {
internalMonitor.Error(errors.Wrap(err, "error while waiting for statefulset to be ready"))
return err
}
internalMonitor.Info("statefulset is ready")
return nil
}
getAllDBs := func(k8sClient kubernetes.ClientInt) ([]string, error) {
if err := checkDBRunning(k8sClient); err != nil {
return nil, err
}
if err := checkDBReady(k8sClient); err != nil {
return nil, err
}
command := "/cockroach/cockroach sql --certs-dir=" + clientCertPath + " --host=" + name + "-0." + name + " -e 'SHOW DATABASES;'"
databasesStr, err := k8sClient.ExecInPodWithOutput(namespace, name+"-0", name, command)
if err != nil {
return nil, err
}
databases := strings.Split(databasesStr, "\n")
dbAndOwners := databases[1 : len(databases)-1]
dbs := []string{}
for _, dbAndOwner := range dbAndOwners {
parts := strings.Split(dbAndOwner, "\t")
if parts[1] != "node" {
dbs = append(dbs, parts[0])
}
}
return dbs, nil
}
return wrapedQuery, wrapedDestroy, ensureInit, checkDBReady, getAllDBs, err
}
func getJoinExec(namespace string, name string, dbPort int, replicaCount int) string {
joinList := make([]string, 0)
for i := 0; i < replicaCount; i++ {
joinList = append(joinList, fmt.Sprintf("%s-%d.%s.%s:%d", name, i, name, namespace, dbPort))
}
joinListStr := strings.Join(joinList, ",")
locality := "zone=" + namespace
return "exec /cockroach/cockroach start --logtostderr --certs-dir " + certPath + " --advertise-host $(hostname -f) --http-addr 0.0.0.0 --join " + joinListStr + " --locality " + locality + " --cache 25% --max-sql-memory 25%"
}
func getResources(resourcesSFS *k8s.Resources) corev1.ResourceRequirements {
internalResources := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("512Mi"),
},
Limits: corev1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("512Mi"),
},
}
if resourcesSFS != nil {
internalResources = corev1.ResourceRequirements{}
if resourcesSFS.Requests != nil {
internalResources.Requests = resourcesSFS.Requests
}
if resourcesSFS.Limits != nil {
internalResources.Limits = resourcesSFS.Limits
}
}
return internalResources
}
func getAffinity(labels map[string]string) *corev1.Affinity {
affinity := Affinitys{}
for k, v := range labels {
affinity = append(affinity, metav1.LabelSelectorRequirement{
Key: k,
Operator: metav1.LabelSelectorOpIn,
Values: []string{
v,
}})
}
sort.Sort(affinity)
return &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: affinity,
},
TopologyKey: "kubernetes.io/hostname",
}},
},
}
}