mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-05 22:52:46 +00:00
bacfc3b099
* feat(crd): add crd mode for operators (#1329) * feat(operator): add base for zitadel operator * fix(operator): changed pipeline to release operator * fix(operator): fmt with only one parameter * fix(operator): corrected workflow job name * fix(zitadelctl): added restore and backuplist command * fix(zitadelctl): scale for restore * chore(container): use scratch for deploy container * fix(zitadelctl): limit image to scratch * fix(migration): added migration scripts for newer version * fix(operator): changed handling of kubeconfig in operator logic * fix(operator): changed handling of secrets in operator logic * fix(operator): use new version of zitadel * fix(operator): added path for migrations * fix(operator): delete doublets of migration scripts * fix(operator): delete subpaths and integrate logic into init container * fix(operator): corrected path in dockerfile for local migrations * fix(operator): added migrations for cockroachdb-secure * fix(operator): delete logic for ambassador module * fix(operator): added read and write secret commands * fix(operator): correct and align operator pipeline with zitadel pipeline * fix(operator): correct yaml error in operator pipeline * fix(operator): correct action name in operator pipeline * fix(operator): correct case-sensitive filename in operator pipeline * fix(operator): upload artifacts from buildx output * fix(operator): corrected attribute spelling error * fix(operator): combined jobs for operator binary and image * fix(operator): added missing comma in operator pipeline * fix(operator): added codecov for operator image * fix(operator): added codecov for operator image * fix(testing): code changes for testing and several unit-tests (#1009) * fix(operator): usage of interface of kubernetes client for testing and several unit-tests * fix(operator): several unit-tests * fix(operator): several unit-tests * fix(operator): changed order for the operator logic * fix(operator): added version of zitadelctl from semantic release * fix(operator): corrected function call with version of zitadelctl * fix(operator): corrected function call with version of zitadelctl * fix(operator): add check output to operator release pipeline * fix(operator): set --short length everywhere to 12 * fix(operator): zitadel setup in job instead of exec with several unit tests * fix(operator): fixes to combine newest zitadel and testing branch * fix(operator): corrected path in Dockerfile * fix(operator): fixed unit-test that was ignored during changes * fix(operator): fixed unit-test that was ignored during changes * fix(operator): corrected Dockerfile to correctly use env variable * fix(operator): quickfix takeoff deployment * fix(operator): corrected the clusterrolename in the applied artifacts * fix: update secure migrations * fix(operator): migrations (#1057) * fix(operator): copied migrations from orbos repository * fix(operator): newest migrations * chore: use cockroach-secure * fix: rename migration * fix: remove insecure cockroach migrations Co-authored-by: Stefan Benz <stefan@caos.ch> * fix: finalize labels * fix(operator): cli logging concurrent and fixe deployment of operator during restore * fix: finalize labels and cli commands * fix: restore * chore: cockroachdb is always secure * chore: use orbos consistent-labels latest commit * test: make tests compatible with new labels * fix: default to sa token for start command * fix: use cockroachdb v12.02 * fix: don't delete flyway user * test: fix migration test * fix: use correct table qualifiers * fix: don't alter sequence ownership * fix: upgrade flyway * fix: change ownership of all dbs and tables to admin user * fix: change defaultdb user * fix: treat clientid status codes >= 400 as errors * fix: reconcile specified ZITADEL version, not binary version * fix: add ca-certs * fix: use latest orbos code * fix: use orbos with fixed race condition * fix: use latest ORBOS code * fix: use latest ORBOS code * fix: make migration and scaling around restoring work * fix(operator): move zitadel operator * chore(migrations): include owner change migration * feat(db): add code base for database operator * fix(db): change used image registry for database operator * fix(db): generated mock * fix(db): add accidentally ignored file * fix(db): add cockroachdb backup image to pipeline * fix(db): correct pipeline and image versions * fix(db): correct version of used orbos * fix(db): correct database import * fix(db): go mod tidy * fix(db): use new version for orbos * fix(migrations): include migrations into zitadelctl binary (#1211) * fix(db): use statik to integrate migrations into binary * fix(migrations): corrections unit tests and pipeline for integrated migrations into zitadelctl binary * fix(migrations): correction in dockerfile for pipeline build * fix(migrations): correction in dockerfile for pipeline build * fix(migrations): dockerfile changes for cache optimization * fix(database): correct used part-of label in database operator * fix(database): correct used selectable label in zitadel operator * fix(operator): correct lables for user secrets in zitadel operator * fix(operator): correct lables for service test in zitadel operator * fix: don't enable database features for user operations (#1227) * fix: don't enable database features for user operations * fix: omit database feature for connection info adapter * fix: use latest orbos version * fix(crd): corrected logic to get database connection and other info * fix(crd): corrected yaml tags and start for zitadel operator * fix(crd): move some dependencies and use consistent structure * fix(crd): corrected unit-tests * fix(crd): corrected main files for debug starts * chore(pipeline): use correct version for zitadelctl build * fix(crd): correct calculating of current db state for zitadel operator * fix(crd): use binary version for deployment of crd mode operators * fix(crd): add gitops attribute for reconciling * fix(crd): corrected crd with newest version * fix(migration): collect cleanup functions and only use them if all jobs are successful * fix(zitadelctl): import gcp auth to connect to gke cluster * feat: Add read and writesecret options for crd mode (#1435) * fix: don't require orbconfig for crd mode * test: pass * fix(zitadelctl): import gcp auth to connect to gke cluster * feat: add read and writesecret option for crd mode * test: fix * fix: make all crd secrets writable * fix: use in-cluster configs for in-cluster operators * chore: remove unnecessary debug files Co-authored-by: Stefan Benz <stefan@caos.ch> * fix: Crdoperatormerge review (#1385) * fix: don't require orbconfig for crd mode * test: pass * fix(zitadelctl): import gcp auth to connect to gke cluster * fix: ensure caos-system namespace * fix: apply orbconfig at takeoff * docs: improve help for creating an orbconfig * docs: describe orbconfig properties * docs: add --gitops to help message example * fix(pipeline): correct upload of artifacts in dev releases * test: pass Co-authored-by: Stefan Benz <stefan@caos.ch> * fix(test): corrected falsely merged tests * chore: update orbos library * fix: only handle exactly named and namespaced crd resource * fix: print errors, check correct crd namespace * fix: validate bucket secret * chore: compile * fix(operator): corrected secret handling when unused secrets are not defined * fix(operator): corrected handling of jobs * fix: dont print logs when readsecret path is provided * fix(operator): corrected handling of jobs and sort for mounted volumes * fix(operator): sort for volumes * fix(operator): change orboos import to newest release Co-authored-by: Florian Forster <florian@caos.ch> Co-authored-by: Elio Bischof <eliobischof@gmail.com> (cherry picked from commit fa9bd5a8e7a5032b21272552fbc4cdf1630db049) * fix(operator): Standard timeout handling (#1458) * fix: always use standard time.Duration * fix: give backup and restore more time * fix: give backup and restore jobs more time (cherry picked from commit 7468b7d1e818102dcd58dbb6d3647f49277a4e65) * fix go mod Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Elio Bischof <eliobischof@gmail.com>
355 lines
10 KiB
Go
355 lines
10 KiB
Go
package statefulset
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/caos/orbos/pkg/labels"
|
|
"github.com/caos/zitadel/operator"
|
|
"github.com/caos/zitadel/operator/helpers"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
|
|
"github.com/caos/orbos/mntr"
|
|
"github.com/caos/orbos/pkg/kubernetes"
|
|
"github.com/caos/orbos/pkg/kubernetes/k8s"
|
|
"github.com/caos/orbos/pkg/kubernetes/resources"
|
|
"github.com/caos/orbos/pkg/kubernetes/resources/statefulset"
|
|
"github.com/pkg/errors"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
const (
|
|
certPath = "/cockroach/cockroach-certs"
|
|
clientCertPath = "/cockroach/cockroach-client-certs"
|
|
datadirPath = "/cockroach/cockroach-data"
|
|
datadirInternal = "datadir"
|
|
certsInternal = "certs"
|
|
clientCertsInternal = "client-certs"
|
|
defaultMode = int32(256)
|
|
nodeSecret = "cockroachdb.node"
|
|
rootSecret = "cockroachdb.client.root"
|
|
)
|
|
|
|
type Affinity struct {
|
|
key string
|
|
value string
|
|
}
|
|
|
|
type Affinitys []metav1.LabelSelectorRequirement
|
|
|
|
func (a Affinitys) Len() int { return len(a) }
|
|
func (a Affinitys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a Affinitys) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
|
|
|
func AdaptFunc(
|
|
monitor mntr.Monitor,
|
|
sfsSelectable *labels.Selectable,
|
|
podSelector *labels.Selector,
|
|
force bool,
|
|
namespace string,
|
|
image string,
|
|
serviceAccountName string,
|
|
replicaCount int,
|
|
storageCapacity string,
|
|
dbPort int32,
|
|
httpPort int32,
|
|
storageClass string,
|
|
nodeSelector map[string]string,
|
|
tolerations []corev1.Toleration,
|
|
resourcesSFS *k8s.Resources,
|
|
) (
|
|
resources.QueryFunc,
|
|
resources.DestroyFunc,
|
|
operator.EnsureFunc,
|
|
operator.EnsureFunc,
|
|
func(k8sClient kubernetes.ClientInt) ([]string, error),
|
|
error,
|
|
) {
|
|
internalMonitor := monitor.WithField("component", "statefulset")
|
|
|
|
quantity, err := resource.ParseQuantity(storageCapacity)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, err
|
|
}
|
|
|
|
name := sfsSelectable.Name()
|
|
k8sSelectable := labels.MustK8sMap(sfsSelectable)
|
|
statefulsetDef := &appsv1.StatefulSet{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
Labels: k8sSelectable,
|
|
},
|
|
Spec: appsv1.StatefulSetSpec{
|
|
ServiceName: name,
|
|
Replicas: helpers.PointerInt32(int32(replicaCount)),
|
|
Selector: &metav1.LabelSelector{
|
|
MatchLabels: labels.MustK8sMap(podSelector),
|
|
},
|
|
Template: corev1.PodTemplateSpec{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: k8sSelectable,
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
NodeSelector: nodeSelector,
|
|
Tolerations: tolerations,
|
|
ServiceAccountName: serviceAccountName,
|
|
Affinity: getAffinity(k8sSelectable),
|
|
Containers: []corev1.Container{{
|
|
Name: name,
|
|
Image: image,
|
|
ImagePullPolicy: "IfNotPresent",
|
|
Ports: []corev1.ContainerPort{
|
|
{ContainerPort: dbPort, Name: "grpc"},
|
|
{ContainerPort: httpPort, Name: "http"},
|
|
},
|
|
LivenessProbe: &corev1.Probe{
|
|
Handler: corev1.Handler{
|
|
HTTPGet: &corev1.HTTPGetAction{
|
|
Path: "/health",
|
|
Port: intstr.Parse("http"),
|
|
Scheme: "HTTPS",
|
|
},
|
|
},
|
|
InitialDelaySeconds: 30,
|
|
PeriodSeconds: 5,
|
|
},
|
|
ReadinessProbe: &corev1.Probe{
|
|
Handler: corev1.Handler{
|
|
HTTPGet: &corev1.HTTPGetAction{
|
|
Path: "/health?ready=1",
|
|
Port: intstr.Parse("http"),
|
|
Scheme: "HTTPS",
|
|
},
|
|
},
|
|
InitialDelaySeconds: 10,
|
|
PeriodSeconds: 5,
|
|
FailureThreshold: 2,
|
|
},
|
|
VolumeMounts: []corev1.VolumeMount{{
|
|
Name: datadirInternal,
|
|
MountPath: datadirPath,
|
|
}, {
|
|
Name: certsInternal,
|
|
MountPath: certPath,
|
|
}, {
|
|
Name: clientCertsInternal,
|
|
MountPath: clientCertPath,
|
|
}},
|
|
Env: []corev1.EnvVar{{
|
|
Name: "COCKROACH_CHANNEL",
|
|
Value: "kubernetes-multiregion",
|
|
}},
|
|
Command: []string{
|
|
"/bin/bash",
|
|
"-ecx",
|
|
getJoinExec(
|
|
namespace,
|
|
name,
|
|
int(dbPort),
|
|
replicaCount,
|
|
),
|
|
},
|
|
Resources: getResources(resourcesSFS),
|
|
}},
|
|
Volumes: []corev1.Volume{{
|
|
Name: datadirInternal,
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: datadirInternal,
|
|
},
|
|
},
|
|
}, {
|
|
Name: certsInternal,
|
|
VolumeSource: corev1.VolumeSource{
|
|
Secret: &corev1.SecretVolumeSource{
|
|
SecretName: nodeSecret,
|
|
DefaultMode: helpers.PointerInt32(defaultMode),
|
|
},
|
|
},
|
|
}, {
|
|
Name: clientCertsInternal,
|
|
VolumeSource: corev1.VolumeSource{
|
|
Secret: &corev1.SecretVolumeSource{
|
|
SecretName: rootSecret,
|
|
DefaultMode: helpers.PointerInt32(defaultMode),
|
|
},
|
|
},
|
|
}},
|
|
},
|
|
},
|
|
PodManagementPolicy: appsv1.ParallelPodManagement,
|
|
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
|
|
Type: "RollingUpdate",
|
|
},
|
|
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: datadirInternal,
|
|
},
|
|
Spec: corev1.PersistentVolumeClaimSpec{
|
|
AccessModes: []corev1.PersistentVolumeAccessMode{
|
|
corev1.PersistentVolumeAccessMode("ReadWriteOnce"),
|
|
},
|
|
Resources: corev1.ResourceRequirements{
|
|
Requests: corev1.ResourceList{
|
|
"storage": quantity,
|
|
},
|
|
},
|
|
StorageClassName: &storageClass,
|
|
},
|
|
}},
|
|
},
|
|
}
|
|
|
|
query, err := statefulset.AdaptFuncToEnsure(statefulsetDef, force)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, err
|
|
}
|
|
destroy, err := statefulset.AdaptFuncToDestroy(namespace, name)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, nil, err
|
|
}
|
|
|
|
wrapedQuery, wrapedDestroy, err := resources.WrapFuncs(internalMonitor, query, destroy)
|
|
checkDBRunning := func(k8sClient kubernetes.ClientInt) error {
|
|
internalMonitor.Info("waiting for statefulset to be running")
|
|
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, false, 60*time.Second); err != nil {
|
|
internalMonitor.Error(errors.Wrap(err, "error while waiting for statefulset to be running"))
|
|
return err
|
|
}
|
|
internalMonitor.Info("statefulset is running")
|
|
return nil
|
|
}
|
|
|
|
checkDBNotReady := func(k8sClient kubernetes.ClientInt) error {
|
|
internalMonitor.Info("checking for statefulset to not be ready")
|
|
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, true, 1*time.Second); err != nil {
|
|
internalMonitor.Info("statefulset is not ready")
|
|
return nil
|
|
}
|
|
internalMonitor.Info("statefulset is ready")
|
|
return errors.New("statefulset is ready")
|
|
}
|
|
|
|
ensureInit := func(k8sClient kubernetes.ClientInt) error {
|
|
if err := checkDBRunning(k8sClient); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := checkDBNotReady(k8sClient); err != nil {
|
|
return nil
|
|
}
|
|
|
|
command := "/cockroach/cockroach init --certs-dir=" + clientCertPath + " --host=" + name + "-0." + name
|
|
|
|
if err := k8sClient.ExecInPod(namespace, name+"-0", name, command); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
checkDBReady := func(k8sClient kubernetes.ClientInt) error {
|
|
internalMonitor.Info("waiting for statefulset to be ready")
|
|
if err := k8sClient.WaitUntilStatefulsetIsReady(namespace, name, true, true, 60*time.Second); err != nil {
|
|
internalMonitor.Error(errors.Wrap(err, "error while waiting for statefulset to be ready"))
|
|
return err
|
|
}
|
|
internalMonitor.Info("statefulset is ready")
|
|
return nil
|
|
}
|
|
|
|
getAllDBs := func(k8sClient kubernetes.ClientInt) ([]string, error) {
|
|
if err := checkDBRunning(k8sClient); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := checkDBReady(k8sClient); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
command := "/cockroach/cockroach sql --certs-dir=" + clientCertPath + " --host=" + name + "-0." + name + " -e 'SHOW DATABASES;'"
|
|
|
|
databasesStr, err := k8sClient.ExecInPodWithOutput(namespace, name+"-0", name, command)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
databases := strings.Split(databasesStr, "\n")
|
|
dbAndOwners := databases[1 : len(databases)-1]
|
|
dbs := []string{}
|
|
for _, dbAndOwner := range dbAndOwners {
|
|
parts := strings.Split(dbAndOwner, "\t")
|
|
if parts[1] != "node" {
|
|
dbs = append(dbs, parts[0])
|
|
}
|
|
}
|
|
return dbs, nil
|
|
}
|
|
|
|
return wrapedQuery, wrapedDestroy, ensureInit, checkDBReady, getAllDBs, err
|
|
}
|
|
|
|
func getJoinExec(namespace string, name string, dbPort int, replicaCount int) string {
|
|
joinList := make([]string, 0)
|
|
for i := 0; i < replicaCount; i++ {
|
|
joinList = append(joinList, fmt.Sprintf("%s-%d.%s.%s:%d", name, i, name, namespace, dbPort))
|
|
}
|
|
joinListStr := strings.Join(joinList, ",")
|
|
locality := "zone=" + namespace
|
|
|
|
return "exec /cockroach/cockroach start --logtostderr --certs-dir " + certPath + " --advertise-host $(hostname -f) --http-addr 0.0.0.0 --join " + joinListStr + " --locality " + locality + " --cache 25% --max-sql-memory 25%"
|
|
}
|
|
|
|
func getResources(resourcesSFS *k8s.Resources) corev1.ResourceRequirements {
|
|
internalResources := corev1.ResourceRequirements{
|
|
Requests: corev1.ResourceList{
|
|
"cpu": resource.MustParse("100m"),
|
|
"memory": resource.MustParse("512Mi"),
|
|
},
|
|
Limits: corev1.ResourceList{
|
|
"cpu": resource.MustParse("100m"),
|
|
"memory": resource.MustParse("512Mi"),
|
|
},
|
|
}
|
|
|
|
if resourcesSFS != nil {
|
|
internalResources = corev1.ResourceRequirements{}
|
|
if resourcesSFS.Requests != nil {
|
|
internalResources.Requests = resourcesSFS.Requests
|
|
}
|
|
if resourcesSFS.Limits != nil {
|
|
internalResources.Limits = resourcesSFS.Limits
|
|
}
|
|
}
|
|
|
|
return internalResources
|
|
}
|
|
|
|
func getAffinity(labels map[string]string) *corev1.Affinity {
|
|
affinity := Affinitys{}
|
|
for k, v := range labels {
|
|
affinity = append(affinity, metav1.LabelSelectorRequirement{
|
|
Key: k,
|
|
Operator: metav1.LabelSelectorOpIn,
|
|
Values: []string{
|
|
v,
|
|
}})
|
|
}
|
|
sort.Sort(affinity)
|
|
|
|
return &corev1.Affinity{
|
|
PodAntiAffinity: &corev1.PodAntiAffinity{
|
|
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
|
|
LabelSelector: &metav1.LabelSelector{
|
|
MatchExpressions: affinity,
|
|
},
|
|
TopologyKey: "kubernetes.io/hostname",
|
|
}},
|
|
},
|
|
}
|
|
}
|