tailscale/kube/state/state.go
Tom Proctor f421907c38
all-kube: create Tailscale Service for HA kube-apiserver ProxyGroup (#16572)
Adds a new reconciler for ProxyGroups of type kube-apiserver that will
provision a Tailscale Service for each replica to advertise. Adds two
new condition types to the ProxyGroup, TailscaleServiceValid and
TailscaleServiceConfigured, to post updates on the state of that
reconciler in a way that's consistent with the service-pg reconciler.
The created Tailscale Service name is configurable via a new ProxyGroup
field spec.kubeAPISserver.ServiceName, which expects a string of the
form "svc:<dns-label>".

Lots of supporting changes were needed to implement this in a way that's
consistent with other operator workflows, including:

* Pulled containerboot's ensureServicesUnadvertised and certManager into
  kube/ libraries to be shared with k8s-proxy. Use those in k8s-proxy to
  aid Service cert sharing between replicas and graceful Service shutdown.
* For certManager, add an initial wait to the cert loop to wait until
  the domain appears in the devices's netmap to avoid a guaranteed error
  on the first issue attempt when it's quick to start.
* Made several methods in ingress-for-pg.go and svc-for-pg.go into
  functions to share with the new reconciler
* Added a Resource struct to the owner refs stored in Tailscale Service
  annotations to be able to distinguish between Ingress- and ProxyGroup-
  based Services that need cleaning up in the Tailscale API.
* Added a ListVIPServices method to the internal tailscale client to aid
  cleaning up orphaned Services
* Support for reading config from a kube Secret, and partial support for
  config reloading, to prevent us having to force Pod restarts when
  config changes.
* Fixed up the zap logger so it's possible to set debug log level.

Updates #13358

Change-Id: Ia9607441157dd91fb9b6ecbc318eecbef446e116
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
2025-07-21 11:03:21 +01:00

108 lines
3.6 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package state updates state keys for tailnet client devices managed by the
// operator. These keys are used to signal readiness, metadata, and current
// configuration state to the operator. Client packages deployed by the operator
// include containerboot, tsrecorder, and k8s-proxy, but currently containerboot
// has its own implementation to manage the same keys.
package state
import (
"context"
"encoding/json"
"fmt"
"tailscale.com/ipn"
"tailscale.com/kube/kubetypes"
klc "tailscale.com/kube/localclient"
"tailscale.com/tailcfg"
"tailscale.com/util/deephash"
)
const (
keyPodUID = ipn.StateKey(kubetypes.KeyPodUID)
keyCapVer = ipn.StateKey(kubetypes.KeyCapVer)
keyDeviceID = ipn.StateKey(kubetypes.KeyDeviceID)
keyDeviceIPs = ipn.StateKey(kubetypes.KeyDeviceIPs)
keyDeviceFQDN = ipn.StateKey(kubetypes.KeyDeviceFQDN)
)
// SetInitialKeys sets Pod UID and cap ver and clears tailnet device state
// keys to help stop the operator using stale tailnet device state.
func SetInitialKeys(store ipn.StateStore, podUID string) error {
// Clear device state keys first so the operator knows if the pod UID
// matches, the other values are definitely not stale.
for _, key := range []ipn.StateKey{keyDeviceID, keyDeviceFQDN, keyDeviceIPs} {
if _, err := store.ReadState(key); err == nil {
if err := store.WriteState(key, nil); err != nil {
return fmt.Errorf("error writing %q to state store: %w", key, err)
}
}
}
if err := store.WriteState(keyPodUID, []byte(podUID)); err != nil {
return fmt.Errorf("error writing pod UID to state store: %w", err)
}
if err := store.WriteState(keyCapVer, fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)); err != nil {
return fmt.Errorf("error writing capability version to state store: %w", err)
}
return nil
}
// KeepKeysUpdated sets state store keys consistent with containerboot to
// signal proxy readiness to the operator. It runs until its context is
// cancelled or it hits an error. The passed in next function is expected to be
// from a local.IPNBusWatcher that is at least subscribed to
// ipn.NotifyInitialNetMap.
func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClient) error {
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil {
return fmt.Errorf("error watching IPN bus: %w", err)
}
defer w.Close()
var currentDeviceID, currentDeviceIPs, currentDeviceFQDN deephash.Sum
for {
n, err := w.Next() // Blocks on a streaming LocalAPI HTTP call.
if err != nil {
if err == ctx.Err() {
return nil
}
return err
}
if n.NetMap == nil {
continue
}
if deviceID := n.NetMap.SelfNode.StableID(); deephash.Update(&currentDeviceID, &deviceID) {
if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil {
return fmt.Errorf("failed to store device ID in state: %w", err)
}
}
if fqdn := n.NetMap.SelfNode.Name(); deephash.Update(&currentDeviceFQDN, &fqdn) {
if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil {
return fmt.Errorf("failed to store device FQDN in state: %w", err)
}
}
if addrs := n.NetMap.SelfNode.Addresses(); deephash.Update(&currentDeviceIPs, &addrs) {
var deviceIPs []string
for _, addr := range addrs.AsSlice() {
deviceIPs = append(deviceIPs, addr.Addr().String())
}
deviceIPsValue, err := json.Marshal(deviceIPs)
if err != nil {
return err
}
if err := store.WriteState(keyDeviceIPs, deviceIPsValue); err != nil {
return fmt.Errorf("failed to store device IPs in state: %w", err)
}
}
}
}