cmd/containerboot,kube/kubetypes: refactor containerboot kube client

So that the kube client is no longer a global var.

Updates tailscale/tailscale#12079

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2024-11-20 17:17:34 +00:00
parent 60bf1e168e
commit 5eca369530
5 changed files with 87 additions and 60 deletions

View File

@ -9,30 +9,56 @@
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/netip"
"os"
"tailscale.com/kube/kubeapi"
"tailscale.com/kube/kubeclient"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
)
// kubeClient is a wrapper around Tailscale's internal kube client that knows how to talk to the kube API server. We use
// this rather than any of the upstream Kubernetes client libaries to avoid extra imports.
type kubeClient struct {
kubeclient.Client
stateSecret string
}
func newKubeClient(root string, stateSecret string) (*kubeClient, error) {
if root != "/" {
// If we are running in a test, we need to set the root path to the fake
// service account directory.
kubeclient.SetRootPathForTesting(root)
}
var err error
kc, err := kubeclient.New("tailscale-container")
if err != nil {
return nil, fmt.Errorf("Error creating kube client: %w", err)
}
if (root != "/") || os.Getenv("TS_KUBERNETES_READ_API_SERVER_ADDRESS_FROM_ENV") == "true" {
// Derive the API server address from the environment variables
// Used to set http server in tests, or optionally enabled by flag
kc.SetURL(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")))
}
return &kubeClient{Client: kc, stateSecret: stateSecret}, nil
}
// storeDeviceID writes deviceID to 'device_id' data field of the named
// Kubernetes Secret.
func storeDeviceID(ctx context.Context, secretName string, deviceID tailcfg.StableNodeID) error {
func (kc *kubeClient) storeDeviceID(ctx context.Context, deviceID tailcfg.StableNodeID) error {
s := &kubeapi.Secret{
Data: map[string][]byte{
"device_id": []byte(deviceID),
kubetypes.KeyDeviceID: []byte(deviceID),
},
}
return kc.StrategicMergePatchSecret(ctx, secretName, s, "tailscale-container")
return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container")
}
// storeDeviceEndpoints writes device's tailnet IPs and MagicDNS name to fields
// 'device_ips', 'device_fqdn' of the named Kubernetes Secret.
func storeDeviceEndpoints(ctx context.Context, secretName string, fqdn string, addresses []netip.Prefix) error {
func (kc *kubeClient) storeDeviceEndpoints(ctx context.Context, fqdn string, addresses []netip.Prefix) error {
var ips []string
for _, addr := range addresses {
ips = append(ips, addr.Addr().String())
@ -44,8 +70,8 @@ func storeDeviceEndpoints(ctx context.Context, secretName string, fqdn string, a
s := &kubeapi.Secret{
Data: map[string][]byte{
"device_fqdn": []byte(fqdn),
"device_ips": deviceIPs,
kubetypes.KeyDeviceFQDN: []byte(fqdn),
kubetypes.KeyDeviceIPs: deviceIPs,
},
}
return kc.StrategicMergePatchSecret(ctx, secretName, s, "tailscale-container")
@ -53,7 +79,7 @@ func storeDeviceEndpoints(ctx context.Context, secretName string, fqdn string, a
// deleteAuthKey deletes the 'authkey' field of the given kube
// secret. No-op if there is no authkey in the secret.
func deleteAuthKey(ctx context.Context, secretName string) error {
func (kc *kubeClient) deleteAuthKey(ctx context.Context) error {
// m is a JSON Patch data structure, see https://jsonpatch.com/ or RFC 6902.
m := []kubeclient.JSONPatch{
{
@ -61,7 +87,7 @@ func deleteAuthKey(ctx context.Context, secretName string) error {
Path: "/data/authkey",
},
}
if err := kc.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m); err != nil {
if err := kc.JSONPatchResource(ctx, kc.stateSecret, kubeclient.TypeSecrets, m); err != nil {
if s, ok := err.(*kubeapi.Status); ok && s.Code == http.StatusUnprocessableEntity {
// This is kubernetes-ese for "the field you asked to
// delete already doesn't exist", aka no-op.
@ -74,35 +100,15 @@ func deleteAuthKey(ctx context.Context, secretName string) error {
// storeCapVer stores the current capability version of tailscale and, if provided, UID of the Pod in the tailscale
// state Secret. This can be used to observe the current capability version of tailscaled running in this container.
func storeCapVer(ctx context.Context, secretName string, podUID string) error {
func (kc *kubeClient) storeCapVer(ctx context.Context, podUID string) error {
capVerS := fmt.Sprintf("%d", tailcfg.CurrentCapabilityVersion)
if podUID != "" {
capVerS += fmt.Sprintf(":%s", podUID)
}
s := &kubeapi.Secret{
Data: map[string][]byte{
"tailscale_capver": []byte(capVerS),
kubetypes.KeyCapVer: []byte(capVerS),
},
}
return kc.StrategicMergePatchSecret(ctx, secretName, s, "tailscale-container")
}
var kc kubeclient.Client
func initKubeClient(root string) {
if root != "/" {
// If we are running in a test, we need to set the root path to the fake
// service account directory.
kubeclient.SetRootPathForTesting(root)
}
var err error
kc, err = kubeclient.New("tailscale-container")
if err != nil {
log.Fatalf("Error creating kube client: %v", err)
}
if (root != "/") || os.Getenv("TS_KUBERNETES_READ_API_SERVER_ADDRESS_FROM_ENV") == "true" {
// Derive the API server address from the environment variables
// Used to set http server in tests, or optionally enabled by flag
kc.SetURL(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")))
}
return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container")
}

View File

@ -21,7 +21,7 @@ func TestSetupKube(t *testing.T) {
cfg *settings
wantErr bool
wantCfg *settings
kc kubeclient.Client
kc *kubeClient
}{
{
name: "TS_AUTHKEY set, state Secret exists",
@ -29,14 +29,14 @@ func TestSetupKube(t *testing.T) {
AuthKey: "foo",
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return nil, nil
},
},
}},
wantCfg: &settings{
AuthKey: "foo",
KubeSecret: "foo",
@ -48,14 +48,14 @@ func TestSetupKube(t *testing.T) {
AuthKey: "foo",
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, true, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return nil, &kubeapi.Status{Code: 404}
},
},
}},
wantCfg: &settings{
AuthKey: "foo",
KubeSecret: "foo",
@ -67,14 +67,14 @@ func TestSetupKube(t *testing.T) {
AuthKey: "foo",
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return nil, &kubeapi.Status{Code: 404}
},
},
}},
wantCfg: &settings{
AuthKey: "foo",
KubeSecret: "foo",
@ -87,14 +87,14 @@ func TestSetupKube(t *testing.T) {
AuthKey: "foo",
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return nil, &kubeapi.Status{Code: 403}
},
},
}},
wantCfg: &settings{
AuthKey: "foo",
KubeSecret: "foo",
@ -111,11 +111,11 @@ func TestSetupKube(t *testing.T) {
AuthKey: "foo",
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, errors.New("broken")
},
},
}},
wantErr: true,
},
{
@ -127,14 +127,14 @@ func TestSetupKube(t *testing.T) {
wantCfg: &settings{
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, true, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return nil, &kubeapi.Status{Code: 404}
},
},
}},
},
{
// Interactive login using URL in Pod logs
@ -145,28 +145,28 @@ func TestSetupKube(t *testing.T) {
wantCfg: &settings{
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return &kubeapi.Secret{}, nil
},
},
}},
},
{
name: "TS_AUTHKEY not set, state Secret contains auth key, we do not have RBAC to patch it",
cfg: &settings{
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return false, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return &kubeapi.Secret{Data: map[string][]byte{"authkey": []byte("foo")}}, nil
},
},
}},
wantCfg: &settings{
KubeSecret: "foo",
},
@ -177,14 +177,14 @@ func TestSetupKube(t *testing.T) {
cfg: &settings{
KubeSecret: "foo",
},
kc: &kubeclient.FakeClient{
kc: &kubeClient{stateSecret: "foo", Client: &kubeclient.FakeClient{
CheckSecretPermissionsImpl: func(context.Context, string) (bool, bool, error) {
return true, false, nil
},
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
return &kubeapi.Secret{Data: map[string][]byte{"authkey": []byte("foo")}}, nil
},
},
}},
wantCfg: &settings{
KubeSecret: "foo",
AuthKey: "foo",
@ -194,9 +194,9 @@ func TestSetupKube(t *testing.T) {
}
for _, tt := range tests {
kc = tt.kc
kc := tt.kc
t.Run(tt.name, func(t *testing.T) {
if err := tt.cfg.setupKube(context.Background()); (err != nil) != tt.wantErr {
if err := tt.cfg.setupKube(context.Background(), kc); (err != nil) != tt.wantErr {
t.Errorf("settings.setupKube() error = %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(*tt.cfg, *tt.wantCfg); diff != "" {

View File

@ -114,6 +114,7 @@
"tailscale.com/client/tailscale"
"tailscale.com/ipn"
kubeutils "tailscale.com/k8s-operator"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/types/ptr"
@ -160,9 +161,13 @@ func main() {
bootCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var kc *kubeClient
if cfg.InKubernetes {
initKubeClient(cfg.Root)
if err := cfg.setupKube(bootCtx); err != nil {
kc, err = newKubeClient(cfg.Root, cfg.KubeSecret)
if err != nil {
log.Fatalf("error initializing kube client: %v", err)
}
if err := cfg.setupKube(bootCtx, kc); err != nil {
log.Fatalf("error setting up for running on Kubernetes: %v", err)
}
}
@ -297,13 +302,13 @@ func main() {
// authkey is no longer needed. We don't strictly need to
// wipe it, but it's good hygiene.
log.Printf("Deleting authkey from kube secret")
if err := deleteAuthKey(ctx, cfg.KubeSecret); err != nil {
if err := kc.deleteAuthKey(ctx); err != nil {
log.Fatalf("deleting authkey from kube secret: %v", err)
}
}
if hasKubeStateStore(cfg) {
if err := storeCapVer(ctx, cfg.KubeSecret, cfg.PodUID); err != nil {
if err := kc.storeCapVer(ctx, cfg.PodUID); err != nil {
log.Fatalf("storing capability version: %v", err)
}
}
@ -433,7 +438,7 @@ func main() {
// fails.
deviceID := n.NetMap.SelfNode.StableID()
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceID, &deviceID) {
if err := storeDeviceID(ctx, cfg.KubeSecret, n.NetMap.SelfNode.StableID()); err != nil {
if err := kc.storeDeviceID(ctx, n.NetMap.SelfNode.StableID()); err != nil {
log.Fatalf("storing device ID in Kubernetes Secret: %v", err)
}
}
@ -549,7 +554,7 @@ func main() {
// TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'.
deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()}
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceEndpoints, &deviceEndpoints) {
if err := storeDeviceEndpoints(ctx, cfg.KubeSecret, n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses().AsSlice()); err != nil {
if err := kc.storeDeviceEndpoints(ctx, n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses().AsSlice()); err != nil {
log.Fatalf("storing device IPs and FQDN in Kubernetes Secret: %v", err)
}
}

View File

@ -183,7 +183,7 @@ func (s *settings) validate() error {
// setupKube is responsible for doing any necessary configuration and checks to
// ensure that tailscale state storage and authentication mechanism will work on
// Kubernetes.
func (cfg *settings) setupKube(ctx context.Context) error {
func (cfg *settings) setupKube(ctx context.Context, kc *kubeClient) error {
if cfg.KubeSecret == "" {
return nil
}

View File

@ -27,4 +27,20 @@
MetricEgressServiceCount = "k8s_egress_service_resources"
MetricProxyGroupEgressCount = "k8s_proxygroup_egress_resources"
MetricProxyGroupIngressCount = "k8s_proxygroup_ingress_resources"
// TODO: rename this file, this is not a metric
// Keys that containerboot writes to state file that can be used to determine its state.
// fields set in Tailscale state Secret. These are mostly used by the Tailscale Kubernetes operator to determine
// the state of this tailscale device.
KeyDeviceID string = "device_id" // node stable ID of the device
KeyDeviceFQDN string = "device_fqdn" // device's tailnet hostname
KeyDeviceIPs string = "device_ips" // device's tailnet IPs
// KeyCapVer contains Tailscale capability version of a proxy in form <capver>[:<pod-uid>]. This is used by the
// Kubernetes operator to determine the capability version of a deployed proxy.
KeyCapVer string = "tailscale_capver"
// KeyHTTPSEndpoint is a name of a field that can be set to the value of any HTTPS endpoint currently exposed by
// this device to the tailnet. This is used by the Kubernetes operator Ingress proxy to communicate to the operator
// that cluster workloads behind the Ingress can now be accessed via the given DNS name over HTTPS.
KeyHTTPSEndpoint string = "https_endpoint"
ValueNoHTTPS string = "no-https"
)