cmd/{k8s-operator,k8s-proxy}: add kube-apiserver ProxyGroup type

Adds a new k8s-proxy command to convert operator's in-process proxy to
a separately deployable type of ProxyGroup: kube-apiserver. k8s-proxy
reads in a new config file written by the operator, modelled on tailscaled's
conffile but with some modifications to ensure multiple versions of the
config can co-exist within a file. This should make it much easier to
support reading that config file from a Kube Secret with a stable file name.

To avoid needing to give the operator ClusterRole{,Binding} permissions,
the helm chart now optionally deploys a new static ServiceAccount for
the API Server proxy to use if in auth mode.

Proxies deployed by kube-apiserver ProxyGroups currently work the same as
the operator's in-process proxy. They do not yet leverage Tailscale Services
for presenting a single HA DNS name.

Updates #13358

Change-Id: Ib6ead69b2173c5e1929f3c13fb48a9a5362195d8
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Tom Proctor 2025-06-26 09:07:43 +01:00
parent 079134d3c0
commit 0e605c53c8
30 changed files with 1417 additions and 300 deletions

View File

@ -92,38 +92,38 @@ pushspk: spk ## Push and install synology package on ${SYNO_HOST} host
scp tailscale.spk root@${SYNO_HOST}:
ssh root@${SYNO_HOST} /usr/syno/bin/synopkg install tailscale.spk
publishdevimage: ## Build and publish tailscale image to location specified by ${REPO}
@test -n "${REPO}" || (echo "REPO=... required; e.g. REPO=ghcr.io/${USER}/tailscale" && exit 1)
@test "${REPO}" != "tailscale/tailscale" || (echo "REPO=... must not be tailscale/tailscale" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/tailscale" || (echo "REPO=... must not be ghcr.io/tailscale/tailscale" && exit 1)
@test "${REPO}" != "tailscale/k8s-operator" || (echo "REPO=... must not be tailscale/k8s-operator" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/k8s-operator" || (echo "REPO=... must not be ghcr.io/tailscale/k8s-operator" && exit 1)
.PHONY: check-image-repo
check-image-repo:
@if [ -z "$(REPO)" ]; then \
echo "REPO=... required; e.g. REPO=ghcr.io/$$USER/tailscale" >&2; \
exit 1; \
fi
@for repo in tailscale/tailscale ghcr.io/tailscale/tailscale \
tailscale/k8s-operator ghcr.io/tailscale/k8s-operator \
tailscale/k8s-nameserver ghcr.io/tailscale/k8s-nameserver \
tailscale/tsidp ghcr.io/tailscale/tsidp \
tailscale/k8s-proxy ghcr.io/tailscale/k8s-proxy; do \
if [ "$(REPO)" = "$$repo" ]; then \
echo "REPO=... must not be $$repo" >&2; \
exit 1; \
fi; \
done
publishdevimage: check-image-repo ## Build and publish tailscale image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=client ./build_docker.sh
publishdevoperator: ## Build and publish k8s-operator image to location specified by ${REPO}
@test -n "${REPO}" || (echo "REPO=... required; e.g. REPO=ghcr.io/${USER}/tailscale" && exit 1)
@test "${REPO}" != "tailscale/tailscale" || (echo "REPO=... must not be tailscale/tailscale" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/tailscale" || (echo "REPO=... must not be ghcr.io/tailscale/tailscale" && exit 1)
@test "${REPO}" != "tailscale/k8s-operator" || (echo "REPO=... must not be tailscale/k8s-operator" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/k8s-operator" || (echo "REPO=... must not be ghcr.io/tailscale/k8s-operator" && exit 1)
publishdevoperator: check-image-repo ## Build and publish k8s-operator image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=k8s-operator ./build_docker.sh
publishdevnameserver: ## Build and publish k8s-nameserver image to location specified by ${REPO}
@test -n "${REPO}" || (echo "REPO=... required; e.g. REPO=ghcr.io/${USER}/tailscale" && exit 1)
@test "${REPO}" != "tailscale/tailscale" || (echo "REPO=... must not be tailscale/tailscale" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/tailscale" || (echo "REPO=... must not be ghcr.io/tailscale/tailscale" && exit 1)
@test "${REPO}" != "tailscale/k8s-nameserver" || (echo "REPO=... must not be tailscale/k8s-nameserver" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/k8s-nameserver" || (echo "REPO=... must not be ghcr.io/tailscale/k8s-nameserver" && exit 1)
publishdevnameserver: check-image-repo ## Build and publish k8s-nameserver image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=k8s-nameserver ./build_docker.sh
publishdevtsidp: ## Build and publish tsidp image to location specified by ${REPO}
@test -n "${REPO}" || (echo "REPO=... required; e.g. REPO=ghcr.io/${USER}/tailscale" && exit 1)
@test "${REPO}" != "tailscale/tailscale" || (echo "REPO=... must not be tailscale/tailscale" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/tailscale" || (echo "REPO=... must not be ghcr.io/tailscale/tailscale" && exit 1)
@test "${REPO}" != "tailscale/tsidp" || (echo "REPO=... must not be tailscale/tsidp" && exit 1)
@test "${REPO}" != "ghcr.io/tailscale/tsidp" || (echo "REPO=... must not be ghcr.io/tailscale/tsidp" && exit 1)
publishdevtsidp: check-image-repo ## Build and publish tsidp image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=tsidp ./build_docker.sh
publishdevproxy: check-image-repo ## Build and publish k8s-proxy image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=k8s-proxy ./build_docker.sh
.PHONY: sshintegrationtest
sshintegrationtest: ## Run the SSH integration tests in various Docker containers
@GOOS=linux GOARCH=amd64 ./tool/go test -tags integrationtest -c ./ssh/tailssh -o ssh/tailssh/testcontainers/tailssh.test && \

View File

@ -118,6 +118,24 @@ case "$TARGET" in
--annotations="${ANNOTATIONS}" \
/usr/local/bin/tsidp
;;
k8s-proxy)
DEFAULT_REPOS="tailscale/k8s-proxy"
REPOS="${REPOS:-${DEFAULT_REPOS}}"
go run github.com/tailscale/mkctr \
--gopaths="tailscale.com/cmd/k8s-proxy:/usr/local/bin/k8s-proxy" \
--ldflags=" \
-X tailscale.com/version.longStamp=${VERSION_LONG} \
-X tailscale.com/version.shortStamp=${VERSION_SHORT} \
-X tailscale.com/version.gitCommitStamp=${VERSION_GIT_HASH}" \
--base="${BASE}" \
--tags="${TAGS}" \
--gotags="ts_kube,ts_package_container" \
--repos="${REPOS}" \
--push="${PUSH}" \
--target="${PLATFORM}" \
--annotations="${ANNOTATIONS}" \
/usr/local/bin/k8s-proxy
;;
*)
echo "unknown target: $TARGET"
exit 1

View File

@ -200,7 +200,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
github.com/tailscale/goupnp/scpd from github.com/tailscale/goupnp
github.com/tailscale/goupnp/soap from github.com/tailscale/goupnp+
github.com/tailscale/goupnp/ssdp from github.com/tailscale/goupnp
github.com/tailscale/hujson from tailscale.com/ipn/conffile
github.com/tailscale/hujson from tailscale.com/ipn/conffile+
L 💣 github.com/tailscale/netlink from tailscale.com/net/routetable+
L 💣 github.com/tailscale/netlink/nl from github.com/tailscale/netlink
github.com/tailscale/peercred from tailscale.com/ipn/ipnauth
@ -822,6 +822,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
tailscale.com/k8s-operator/sessionrecording/ws from tailscale.com/k8s-operator/sessionrecording
tailscale.com/kube/egressservices from tailscale.com/cmd/k8s-operator
tailscale.com/kube/ingressservices from tailscale.com/cmd/k8s-operator
tailscale.com/kube/k8s-proxy/conf from tailscale.com/cmd/k8s-operator
tailscale.com/kube/kubeapi from tailscale.com/ipn/store/kubestore+
tailscale.com/kube/kubeclient from tailscale.com/ipn/store/kubestore
tailscale.com/kube/kubetypes from tailscale.com/cmd/k8s-operator+

View File

@ -1,7 +1,16 @@
# Copyright (c) Tailscale Inc & AUTHORS
# SPDX-License-Identifier: BSD-3-Clause
{{ if eq .Values.apiServerProxyConfig.mode "true" }}
# If deprecated setting used, enable both legacy and new workflows.
# If new setting used, enable only new workflow.
{{ if or (eq .Values.apiServerProxyConfig.mode "true")
(eq .Values.apiServerProxyConfig.authEnabled "true") }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: kube-apiserver-auth-proxy
namespace: {{ .Release.Namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
@ -16,9 +25,14 @@ kind: ClusterRoleBinding
metadata:
name: tailscale-auth-proxy
subjects:
{{- if eq .Values.apiServerProxyConfig.mode "true" }}
- kind: ServiceAccount
name: operator
namespace: {{ .Release.Namespace }}
{{- end }}
- kind: ServiceAccount
name: kube-apiserver-auth-proxy
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: tailscale-auth-proxy

View File

@ -111,6 +111,13 @@ proxyConfig:
# Kubernetes API server.
# https://tailscale.com/kb/1437/kubernetes-operator-api-server-proxy
apiServerProxyConfig:
# Set to "true" to enable the ClusterRole permissions required for the API
# server proxy to impersonate groups and users based on tailnet ACL grants.
# Required to deploy ProxyGroups of type "kube-apiserver" in auth mode.
authEnabled: "false" # "true", "false"
# If true or noauth, the operator will run an in-process API server proxy.
# Deprecated: use apiServerProxyConfig.authEnabled instead.
mode: "false" # "true", "false", "noauth"
imagePullSecrets: []

View File

@ -77,6 +77,18 @@ spec:
must not start with a dash and must be between 1 and 62 characters long.
type: string
pattern: ^[a-z0-9][a-z0-9-]{0,61}$
kubeAPIServerConfig:
description: |-
KubeAPIServerConfig contains configuration specific to the kube-apiserver
ProxyGroup type. This field is only used when Type is set to "kube-apiserver".
type: object
properties:
authMode:
description: |-
AuthMode enables auth mode for the API Server proxy. In auth mode,
requests from the tailnet proxied over to the Kubernetes API server
are additionally impersonated using the sender's tailnet identity.
type: boolean
proxyClass:
description: |-
ProxyClass is the name of the ProxyClass custom resource that contains
@ -106,12 +118,13 @@ spec:
pattern: ^tag:[a-zA-Z][a-zA-Z0-9-]*$
type:
description: |-
Type of the ProxyGroup proxies. Supported types are egress and ingress.
Type of the ProxyGroup proxies. Supported types are egress, ingress, and kube-apiserver.
Type is immutable once a ProxyGroup is created.
type: string
enum:
- egress
- ingress
- kube-apiserver
x-kubernetes-validations:
- rule: self == oldSelf
message: ProxyGroup type is immutable

View File

@ -1,6 +1,12 @@
# Copyright (c) Tailscale Inc & AUTHORS
# SPDX-License-Identifier: BSD-3-Clause
apiVersion: v1
kind: ServiceAccount
metadata:
name: kube-apiserver-auth-proxy
namespace: tailscale
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
@ -18,6 +24,9 @@ subjects:
- kind: ServiceAccount
name: operator
namespace: tailscale
- kind: ServiceAccount
name: kube-apiserver-auth-proxy
namespace: tailscale
roleRef:
kind: ClusterRole
name: tailscale-auth-proxy

View File

@ -2904,6 +2904,18 @@ spec:
must not start with a dash and must be between 1 and 62 characters long.
pattern: ^[a-z0-9][a-z0-9-]{0,61}$
type: string
kubeAPIServerConfig:
description: |-
KubeAPIServerConfig contains configuration specific to the kube-apiserver
ProxyGroup type. This field is only used when Type is set to "kube-apiserver".
properties:
authMode:
description: |-
AuthMode enables auth mode for the API Server proxy. In auth mode,
requests from the tailnet proxied over to the Kubernetes API server
are additionally impersonated using the sender's tailnet identity.
type: boolean
type: object
proxyClass:
description: |-
ProxyClass is the name of the ProxyClass custom resource that contains
@ -2933,11 +2945,12 @@ spec:
type: array
type:
description: |-
Type of the ProxyGroup proxies. Supported types are egress and ingress.
Type of the ProxyGroup proxies. Supported types are egress, ingress, and kube-apiserver.
Type is immutable once a ProxyGroup is created.
enum:
- egress
- ingress
- kube-apiserver
type: string
x-kubernetes-validations:
- message: ProxyGroup type is immutable

View File

@ -238,7 +238,7 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin
// This checks and ensures that Tailscale Service's owner references are updated
// for this Ingress and errors if that is not possible (i.e. because it
// appears that the Tailscale Service has been created by a non-operator actor).
updatedAnnotations, err := r.ownerAnnotations(existingTSSvc)
updatedAnnotations, err := ownerAnnotations(r.operatorID, existingTSSvc)
if err != nil {
const instr = "To proceed, you can either manually delete the existing Tailscale Service or choose a different MagicDNS name at `.spec.tls.hosts[0] in the Ingress definition"
msg := fmt.Sprintf("error ensuring ownership of Tailscale Service %s: %v. %s", hostname, err, instr)
@ -866,9 +866,9 @@ type OwnerRef struct {
// nil, but does not contain an owner reference we return an error as this likely means
// that the Service was created by somthing other than a Tailscale
// Kubernetes operator.
func (r *HAIngressReconciler) ownerAnnotations(svc *tailscale.VIPService) (map[string]string, error) {
func ownerAnnotations(operatorID string, svc *tailscale.VIPService) (map[string]string, error) {
ref := OwnerRef{
OperatorID: r.operatorID,
OperatorID: operatorID,
}
if svc == nil {
c := ownerAnnotationValue{OwnerRefs: []OwnerRef{ref}}

View File

@ -12,8 +12,10 @@ import (
"maps"
"reflect"
"slices"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
@ -645,6 +647,53 @@ func TestIngressPGReconciler_MultiCluster(t *testing.T) {
}
}
func TestOwnerAnnotations(t *testing.T) {
singleSelfOwner := map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"self-id"}]}`,
}
for name, tc := range map[string]struct {
svc *tailscale.VIPService
wantAnnotations map[string]string
wantErr string
}{
"no_svc": {
svc: nil,
wantAnnotations: singleSelfOwner,
},
"empty_svc": {
svc: &tailscale.VIPService{},
wantErr: "likely a resource created by something other than the Tailscale Kubernetes operator",
},
"already_owner": {
svc: &tailscale.VIPService{
Annotations: singleSelfOwner,
},
wantAnnotations: singleSelfOwner,
},
"add_owner": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"operator-2"}]}`,
},
},
wantAnnotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"operator-2"},{"operatorID":"self-id"}]}`,
},
},
} {
t.Run(name, func(t *testing.T) {
got, err := ownerAnnotations("self-id", tc.svc)
if tc.wantErr != "" && !strings.Contains(err.Error(), tc.wantErr) {
t.Errorf("ownerAnnotations() error = %v, wantErr %v", err, tc.wantErr)
}
if diff := cmp.Diff(tc.wantAnnotations, got); diff != "" {
t.Errorf("ownerAnnotations() mismatch (-want +got):\n%s", diff)
}
})
}
}
func populateTLSSecret(ctx context.Context, c client.Client, pgName, domain string) error {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{

View File

@ -77,6 +77,7 @@ func main() {
tsNamespace = defaultEnv("OPERATOR_NAMESPACE", "")
tslogging = defaultEnv("OPERATOR_LOGGING", "info")
image = defaultEnv("PROXY_IMAGE", "tailscale/tailscale:latest")
k8sProxyImage = defaultEnv("K8S_PROXY_IMAGE", "tailscale/k8s-proxy:latest")
priorityClassName = defaultEnv("PROXY_PRIORITY_CLASS_NAME", "")
tags = defaultEnv("PROXY_TAGS", "tag:k8s")
tsFirewallMode = defaultEnv("PROXY_FIREWALL_MODE", "")
@ -109,17 +110,27 @@ func main() {
// The operator can run either as a plain operator or it can
// additionally act as api-server proxy
// https://tailscale.com/kb/1236/kubernetes-operator/?q=kubernetes#accessing-the-kubernetes-control-plane-using-an-api-server-proxy.
mode := apiproxy.ParseAPIProxyMode()
if mode == apiproxy.APIServerProxyModeDisabled {
mode := parseAPIProxyMode()
if mode == apiServerProxyModeDisabled {
hostinfo.SetApp(kubetypes.AppOperator)
} else {
hostinfo.SetApp(kubetypes.AppAPIServerProxy)
hostinfo.SetApp(kubetypes.AppInProcessAPIServerProxy)
}
s, tsc := initTSNet(zlog, loginServer)
defer s.Close()
restConfig := config.GetConfigOrDie()
apiproxy.MaybeLaunchAPIServerProxy(zlog, restConfig, s, mode)
if mode != apiServerProxyModeDisabled {
ap, err := apiproxy.NewAPIServerProxy(zlog, restConfig, s, mode == apiServerProxyModeEnabled)
if err != nil {
zlog.Fatalf("error creating API server proxy: %v", err)
}
go func() {
if err := ap.Run(context.Background()); err != nil {
zlog.Fatalf("error running API server proxy: %v", err)
}
}()
}
rOpts := reconcilerOpts{
log: zlog,
tsServer: s,
@ -127,6 +138,7 @@ func main() {
tailscaleNamespace: tsNamespace,
restConfig: restConfig,
proxyImage: image,
k8sProxyImage: k8sProxyImage,
proxyPriorityClassName: priorityClassName,
proxyActAsDefaultLoadBalancer: isDefaultLoadBalancer,
proxyTags: tags,
@ -411,7 +423,6 @@ func runReconcilers(opts reconcilerOpts) {
Complete(&HAServiceReconciler{
recorder: eventRecorder,
tsClient: opts.tsClient,
tsnetServer: opts.tsServer,
defaultTags: strings.Split(opts.proxyTags, ","),
Client: mgr.GetClient(),
logger: opts.log.Named("service-pg-reconciler"),
@ -641,7 +652,8 @@ func runReconcilers(opts reconcilerOpts) {
tsClient: opts.tsClient,
tsNamespace: opts.tailscaleNamespace,
proxyImage: opts.proxyImage,
tsProxyImage: opts.proxyImage,
k8sProxyImage: opts.k8sProxyImage,
defaultTags: strings.Split(opts.proxyTags, ","),
tsFirewallMode: opts.proxyFirewallMode,
defaultProxyClass: opts.defaultProxyClass,
@ -664,6 +676,7 @@ type reconcilerOpts struct {
tailscaleNamespace string // namespace in which operator resources will be deployed
restConfig *rest.Config // config for connecting to the kube API server
proxyImage string // <proxy-image-repo>:<proxy-image-tag>
k8sProxyImage string // <k8s-proxy-image-repo>:<k8s-proxy-image-tag>
// proxyPriorityClassName isPriorityClass to be set for proxy Pods. This
// is a legacy mechanism for cluster resource configuration options -
// going forward use ProxyClass.

61
cmd/k8s-operator/proxy.go Normal file
View File

@ -0,0 +1,61 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"fmt"
"log"
"os"
)
type apiServerProxyMode int
func (a apiServerProxyMode) String() string {
switch a {
case apiServerProxyModeDisabled:
return "disabled"
case apiServerProxyModeEnabled:
return "auth"
case apiServerProxyModeNoAuth:
return "noauth"
default:
return "unknown"
}
}
const (
apiServerProxyModeDisabled apiServerProxyMode = iota
apiServerProxyModeEnabled
apiServerProxyModeNoAuth
)
func parseAPIProxyMode() apiServerProxyMode {
haveAuthProxyEnv := os.Getenv("AUTH_PROXY") != ""
haveAPIProxyEnv := os.Getenv("APISERVER_PROXY") != ""
switch {
case haveAPIProxyEnv && haveAuthProxyEnv:
log.Fatal("AUTH_PROXY (deprecated) and APISERVER_PROXY are mutually exclusive, please unset AUTH_PROXY")
case haveAuthProxyEnv:
var authProxyEnv = defaultBool("AUTH_PROXY", false) // deprecated
if authProxyEnv {
return apiServerProxyModeEnabled
}
return apiServerProxyModeDisabled
case haveAPIProxyEnv:
var apiProxyEnv = defaultEnv("APISERVER_PROXY", "") // true, false or "noauth"
switch apiProxyEnv {
case "true":
return apiServerProxyModeEnabled
case "false", "":
return apiServerProxyModeDisabled
case "noauth":
return apiServerProxyModeNoAuth
default:
panic(fmt.Sprintf("unknown APISERVER_PROXY value %q", apiProxyEnv))
}
}
return apiServerProxyModeDisabled
}

View File

@ -36,9 +36,11 @@ import (
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/types/opt"
"tailscale.com/types/ptr"
"tailscale.com/util/clientmetric"
"tailscale.com/util/mak"
@ -49,6 +51,7 @@ const (
reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed"
reasonProxyGroupReady = "ProxyGroupReady"
reasonProxyGroupCreating = "ProxyGroupCreating"
reasonProxyGroupInvalid = "ProxyGroupInvalid"
// Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
@ -63,12 +66,14 @@ const (
//
// tailcfg.CurrentCapabilityVersion was 106 when the ProxyGroup controller was
// first introduced.
pgMinCapabilityVersion = 106
pgMinCapabilityVersion = 106
kubeAPIServerConfigFile = "config.hujson"
)
var (
gaugeEgressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupEgressCount)
gaugeIngressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupIngressCount)
gaugeEgressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupEgressCount)
gaugeIngressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupIngressCount)
gaugeAPIServerProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupAPIServerCount)
)
// ProxyGroupReconciler ensures cluster resources for a ProxyGroup definition.
@ -81,15 +86,17 @@ type ProxyGroupReconciler struct {
// User-specified defaults from the helm installation.
tsNamespace string
proxyImage string
tsProxyImage string
k8sProxyImage string
defaultTags []string
tsFirewallMode string
defaultProxyClass string
loginServer string
mu sync.Mutex // protects following
egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
mu sync.Mutex // protects following
egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
}
func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger {
@ -153,6 +160,10 @@ func (r *ProxyGroupReconciler) reconcilePG(ctx context.Context, pg *tsapi.ProxyG
}
}
if err := r.validate(ctx, pg); err != nil {
return r.notReady(reasonProxyGroupInvalid, fmt.Sprintf("invalid ProxyGroup spec: %v", err))
}
proxyClassName := r.defaultProxyClass
if pg.Spec.ProxyClass != "" {
proxyClassName = pg.Spec.ProxyClass
@ -192,6 +203,30 @@ func (r *ProxyGroupReconciler) reconcilePG(ctx context.Context, pg *tsapi.ProxyG
return staticEndpoints, nrr, nil
}
func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGroup) error {
if isAuthAPIServerProxy(pg) {
// Validate that the static ServiceAccount already exists.
sa := &corev1.ServiceAccount{}
if err := r.Get(ctx, types.NamespacedName{Namespace: r.tsNamespace, Name: authAPIServerProxySAName}, sa); err != nil {
if apierrors.IsNotFound(err) {
return fmt.Errorf("the ServiceAccount %q used for the API server proxy in auth mode does not exist but should have been created during operator installation", authAPIServerProxySAName)
}
return fmt.Errorf("error validating that ServiceAccount %q exists: %w", authAPIServerProxySAName, err)
}
} else {
// Validate that the ServiceAccount we create won't overwrite the static one.
// TODO(tomhjp): This doesn't cover other controllers that could create a
// ServiceAccount. Perhaps should have some guards to ensure that an update
// would never change the ownership of a resource we expect to already be owned.
if pgServiceAccountName(pg) == authAPIServerProxySAName {
return fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name)
}
}
return nil
}
// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup.
func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) {
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
@ -263,14 +298,21 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
return r.notReadyErrf(pg, "error provisioning state Secrets: %w", err)
}
}
sa := pgServiceAccount(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sa, func(s *corev1.ServiceAccount) {
s.ObjectMeta.Labels = sa.ObjectMeta.Labels
s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = sa.ObjectMeta.OwnerReferences
}); err != nil {
return r.notReadyErrf(pg, "error provisioning ServiceAccount: %w", err)
// auth mode kube-apiserver ProxyGroups use a statically created
// ServiceAccount to keep ClusterRole creation permissions limited to the
// helm chart installer.
if !isAuthAPIServerProxy(pg) {
sa := pgServiceAccount(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sa, func(s *corev1.ServiceAccount) {
s.ObjectMeta.Labels = sa.ObjectMeta.Labels
s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = sa.ObjectMeta.OwnerReferences
}); err != nil {
return r.notReadyErrf(pg, "error provisioning ServiceAccount: %w", err)
}
}
role := pgRole(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
r.ObjectMeta.Labels = role.ObjectMeta.Labels
@ -280,6 +322,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
}); err != nil {
return r.notReadyErrf(pg, "error provisioning Role: %w", err)
}
roleBinding := pgRoleBinding(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
r.ObjectMeta.Labels = roleBinding.ObjectMeta.Labels
@ -290,6 +333,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
}); err != nil {
return r.notReadyErrf(pg, "error provisioning RoleBinding: %w", err)
}
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
cm, hp := pgEgressCM(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) {
@ -300,6 +344,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
return r.notReadyErrf(pg, "error provisioning egress ConfigMap %q: %w", cm.Name, err)
}
}
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
cm := pgIngressCM(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) {
@ -309,7 +354,12 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
return r.notReadyErrf(pg, "error provisioning ingress ConfigMap %q: %w", cm.Name, err)
}
}
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, tailscaledPort, proxyClass)
defaultImage := r.tsProxyImage
if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
defaultImage = r.k8sProxyImage
}
ss, err := pgStatefulSet(pg, r.tsNamespace, defaultImage, r.tsFirewallMode, tailscaledPort, proxyClass)
if err != nil {
return r.notReadyErrf(pg, "error generating StatefulSet spec: %w", err)
}
@ -702,17 +752,57 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
return nil, err
}
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[nodePortSvcName], existingAdvertiseServices, r.loginServer)
if err != nil {
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
hostname := pgHostname(pg, i)
for cap, cfg := range configs {
cfgJSON, err := json.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
if authKey == nil && existingCfgSecret != nil {
deviceAuthed := false
for _, d := range pg.Status.Devices {
if d.Hostname == hostname {
deviceAuthed = true
break
}
}
if !deviceAuthed {
existingCfg := conf.ConfigV1Alpha1{}
if err := json.Unmarshal(existingCfgSecret.Data[kubeAPIServerConfigFile], &existingCfg); err != nil {
return nil, fmt.Errorf("error unmarshalling existing config: %w", err)
}
if existingCfg.AuthKey != nil {
authKey = existingCfg.AuthKey
}
}
}
cfg := conf.VersionedConfig{
Version: "v1alpha1",
ConfigV1Alpha1: &conf.ConfigV1Alpha1{
Hostname: ptr.To(hostname),
State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))),
App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer),
AuthKey: authKey,
KubeAPIServer: &conf.KubeAPIServer{
AuthMode: opt.NewBool(isAuthAPIServerProxy(pg)),
},
},
}
cfgB, err := json.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("error marshalling k8s-proxy config: %w", err)
}
mak.Set(&cfgSecret.Data, kubeAPIServerConfigFile, cfgB)
} else {
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[nodePortSvcName], existingAdvertiseServices, r.loginServer)
if err != nil {
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
for cap, cfg := range configs {
cfgJSON, err := json.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&cfgSecret.Data, tsoperator.TailscaledConfigFileName(cap), cfgJSON)
}
mak.Set(&cfgSecret.Data, tsoperator.TailscaledConfigFileName(cap), cfgJSON)
}
if existingCfgSecret != nil {
@ -834,9 +924,12 @@ func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGr
r.egressProxyGroups.Add(pg.UID)
case tsapi.ProxyGroupTypeIngress:
r.ingressProxyGroups.Add(pg.UID)
case tsapi.ProxyGroupTypeKubernetesAPIServer:
r.apiServerProxyGroups.Add(pg.UID)
}
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
}
// ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
@ -847,9 +940,12 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
r.egressProxyGroups.Remove(pg.UID)
case tsapi.ProxyGroupTypeIngress:
r.ingressProxyGroups.Remove(pg.UID)
case tsapi.ProxyGroupTypeKubernetesAPIServer:
r.apiServerProxyGroups.Remove(pg.UID)
}
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
}
func pgTailscaledConfig(pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string, loginServer string) (tailscaledConfigs, error) {
@ -858,7 +954,7 @@ func pgTailscaledConfig(pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, idx int32, a
AcceptDNS: "false",
AcceptRoutes: "false", // AcceptRoutes defaults to true
Locked: "false",
Hostname: ptr.To(fmt.Sprintf("%s-%d", pg.Name, idx)),
Hostname: ptr.To(pgHostname(pg, idx)),
AdvertiseServices: oldAdvertiseServices,
AuthKey: authKey,
}

View File

@ -7,6 +7,7 @@ package main
import (
"fmt"
"path/filepath"
"slices"
"strconv"
"strings"
@ -28,6 +29,9 @@ const (
// deletionGracePeriodSeconds is set to 6 minutes to ensure that the pre-stop hook of these proxies have enough chance to terminate gracefully.
deletionGracePeriodSeconds int64 = 360
staticEndpointPortName = "static-endpoint-port"
// authAPIServerProxySAName is the ServiceAccount deployed by the helm chart
// if apiServerProxy.authEnabled is true.
authAPIServerProxySAName = "kube-apiserver-auth-proxy"
)
func pgNodePortServiceName(proxyGroupName string, replica int32) string {
@ -61,6 +65,9 @@ func pgNodePortService(pg *tsapi.ProxyGroup, name string, namespace string) *cor
// Returns the base StatefulSet definition for a ProxyGroup. A ProxyClass may be
// applied over the top after.
func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string, port *uint16, proxyClass *tsapi.ProxyClass) (*appsv1.StatefulSet, error) {
if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
return kubeAPIServerStatefulSet(pg, namespace, image)
}
ss := new(appsv1.StatefulSet)
if err := yaml.Unmarshal(proxyYaml, &ss); err != nil {
return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err)
@ -167,6 +174,7 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
Value: "$(POD_NAME)",
},
{
// TODO(tomhjp): This is tsrecorder-specific and does nothing. Delete.
Name: "TS_STATE",
Value: "kube:$(POD_NAME)",
},
@ -264,9 +272,124 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
// gracefully.
ss.Spec.Template.DeletionGracePeriodSeconds = ptr.To(deletionGracePeriodSeconds)
}
return ss, nil
}
func kubeAPIServerStatefulSet(pg *tsapi.ProxyGroup, namespace, image string) (*appsv1.StatefulSet, error) {
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: namespace,
Labels: pgLabels(pg.Name, nil),
OwnerReferences: pgOwnerReference(pg),
},
Spec: appsv1.StatefulSetSpec{
Replicas: ptr.To(pgReplicas(pg)),
Selector: &metav1.LabelSelector{
MatchLabels: pgLabels(pg.Name, nil),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: namespace,
Labels: pgLabels(pg.Name, nil),
DeletionGracePeriodSeconds: ptr.To[int64](10),
},
Spec: corev1.PodSpec{
ServiceAccountName: pgServiceAccountName(pg),
Containers: []corev1.Container{
{
Name: "k8s-proxy",
Image: image,
Env: []corev1.EnvVar{
{
// Used as default hostname and in Secret names.
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
// Used by kubeclient to post Events about the Pod's lifecycle.
Name: "POD_UID",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.uid",
},
},
},
{
// Used in an interpolated env var if metrics enabled.
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
{
// Included for completeness with POD_IP and easier backwards compatibility in future.
Name: "POD_IPS",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIPs",
},
},
},
{
Name: "TS_K8S_PROXY_CONFIG",
Value: filepath.Join("/etc/tsconfig/$(POD_NAME)/", kubeAPIServerConfigFile),
},
},
VolumeMounts: func() []corev1.VolumeMount {
var mounts []corev1.VolumeMount
// TODO(tomhjp): Read config directly from the Secret instead.
for i := range pgReplicas(pg) {
mounts = append(mounts, corev1.VolumeMount{
Name: fmt.Sprintf("k8s-proxy-config-%d", i),
ReadOnly: true,
MountPath: fmt.Sprintf("/etc/tsconfig/%s-%d", pg.Name, i),
})
}
return mounts
}(),
Ports: []corev1.ContainerPort{
{
Name: "k8s-proxy",
ContainerPort: 443,
Protocol: corev1.ProtocolTCP,
},
},
},
},
Volumes: func() []corev1.Volume {
var volumes []corev1.Volume
for i := range pgReplicas(pg) {
volumes = append(volumes, corev1.Volume{
Name: fmt.Sprintf("k8s-proxy-config-%d", i),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: pgConfigSecretName(pg.Name, i),
},
},
})
}
return volumes
}(),
},
},
},
}
return sts, nil
}
func pgServiceAccount(pg *tsapi.ProxyGroup, namespace string) *corev1.ServiceAccount {
return &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
@ -304,9 +427,10 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role {
},
ResourceNames: func() (secrets []string) {
for i := range pgReplicas(pg) {
podName := pgPodName(pg.Name, i)
secrets = append(secrets,
pgConfigSecretName(pg.Name, i), // Config with auth key.
fmt.Sprintf("%s-%d", pg.Name, i), // State.
pgConfigSecretName(pg.Name, i), // Config with auth key.
podName, // State.
)
}
return secrets
@ -336,7 +460,7 @@ func pgRoleBinding(pg *tsapi.ProxyGroup, namespace string) *rbacv1.RoleBinding {
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: pg.Name,
Name: pgServiceAccountName(pg),
Namespace: namespace,
},
},
@ -347,6 +471,23 @@ func pgRoleBinding(pg *tsapi.ProxyGroup, namespace string) *rbacv1.RoleBinding {
}
}
// kube-apiserver proxies in auth mode use a static ServiceAccount. Everything
// else uses a per-ProxyGroup ServiceAccount.
func pgServiceAccountName(pg *tsapi.ProxyGroup) string {
if isAuthAPIServerProxy(pg) {
return authAPIServerProxySAName
}
return pg.Name
}
func isAuthAPIServerProxy(pg *tsapi.ProxyGroup) bool {
return pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer &&
pg.Spec.KubeAPIServerConfig != nil &&
pg.Spec.KubeAPIServerConfig.AuthMode != nil &&
*pg.Spec.KubeAPIServerConfig.AuthMode
}
func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.Secret) {
for i := range pgReplicas(pg) {
secrets = append(secrets, &corev1.Secret{
@ -418,6 +559,18 @@ func pgReplicas(pg *tsapi.ProxyGroup) int32 {
return 2
}
func pgPodName(pgName string, i int32) string {
return fmt.Sprintf("%s-%d", pgName, i)
}
func pgHostname(pg *tsapi.ProxyGroup, i int32) string {
if pg.Spec.HostnamePrefix != "" {
return fmt.Sprintf("%s-%d", pg.Spec.HostnamePrefix, i)
}
return fmt.Sprintf("%s-%d", pg.Name, i)
}
func pgConfigSecretName(pgName string, i int32) string {
return fmt.Sprintf("%s-%d-config", pgName, i)
}

View File

@ -629,7 +629,7 @@ func TestProxyGroupWithStaticEndpoints(t *testing.T) {
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
tsProxyImage: testProxyImage,
defaultTags: []string{"tag:test-tag"},
tsFirewallMode: "auto",
defaultProxyClass: "default-pc",
@ -772,7 +772,7 @@ func TestProxyGroupWithStaticEndpoints(t *testing.T) {
t.Run("delete_and_cleanup", func(t *testing.T) {
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
tsProxyImage: testProxyImage,
defaultTags: []string{"tag:test-tag"},
tsFirewallMode: "auto",
defaultProxyClass: "default-pc",
@ -832,7 +832,7 @@ func TestProxyGroup(t *testing.T) {
cl := tstest.NewClock(tstest.ClockOpts{})
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
tsProxyImage: testProxyImage,
defaultTags: []string{"tag:test-tag"},
tsFirewallMode: "auto",
defaultProxyClass: "default-pc",
@ -1025,12 +1025,12 @@ func TestProxyGroupTypes(t *testing.T) {
zl, _ := zap.NewDevelopment()
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
Client: fc,
l: zl.Sugar(),
tsClient: &fakeTSClient{},
clock: tstest.NewClock(tstest.ClockOpts{}),
tsNamespace: tsNamespace,
tsProxyImage: testProxyImage,
Client: fc,
l: zl.Sugar(),
tsClient: &fakeTSClient{},
clock: tstest.NewClock(tstest.ClockOpts{}),
}
t.Run("egress_type", func(t *testing.T) {
@ -1047,7 +1047,7 @@ func TestProxyGroupTypes(t *testing.T) {
mustCreate(t, fc, pg)
expectReconciled(t, reconciler, "", pg.Name)
verifyProxyGroupCounts(t, reconciler, 0, 1)
verifyProxyGroupCounts(t, reconciler, 0, 1, 0)
sts := &appsv1.StatefulSet{}
if err := fc.Get(t.Context(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil {
@ -1161,7 +1161,7 @@ func TestProxyGroupTypes(t *testing.T) {
}
expectReconciled(t, reconciler, "", pg.Name)
verifyProxyGroupCounts(t, reconciler, 1, 2)
verifyProxyGroupCounts(t, reconciler, 1, 2, 0)
sts := &appsv1.StatefulSet{}
if err := fc.Get(t.Context(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil {
@ -1198,6 +1198,41 @@ func TestProxyGroupTypes(t *testing.T) {
t.Errorf("unexpected volume mounts (-want +got):\n%s", diff)
}
})
t.Run("kubernetes_api_server_type", func(t *testing.T) {
pg := &tsapi.ProxyGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-k8s-apiserver",
UID: "test-k8s-apiserver-uid",
},
Spec: tsapi.ProxyGroupSpec{
Type: tsapi.ProxyGroupTypeKubernetesAPIServer,
Replicas: ptr.To[int32](2),
},
}
if err := fc.Create(t.Context(), pg); err != nil {
t.Fatal(err)
}
expectReconciled(t, reconciler, "", pg.Name)
verifyProxyGroupCounts(t, reconciler, 1, 2, 1)
sts := &appsv1.StatefulSet{}
if err := fc.Get(t.Context(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil {
t.Fatalf("failed to get StatefulSet: %v", err)
}
// Verify the StatefulSet configuration for KubernetesAPIServer type.
if sts.Spec.Template.Spec.Containers[0].Name != "k8s-proxy" {
t.Errorf("unexpected container name %s, want k8s-proxy", sts.Spec.Template.Spec.Containers[0].Name)
}
if sts.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort != 443 {
t.Errorf("unexpected container port %d, want 443", sts.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort)
}
if sts.Spec.Template.Spec.Containers[0].Ports[0].Name != "k8s-proxy" {
t.Errorf("unexpected port name %s, want k8s-proxy", sts.Spec.Template.Spec.Containers[0].Ports[0].Name)
}
})
}
func TestIngressAdvertiseServicesConfigPreserved(t *testing.T) {
@ -1206,12 +1241,12 @@ func TestIngressAdvertiseServicesConfigPreserved(t *testing.T) {
WithStatusSubresource(&tsapi.ProxyGroup{}).
Build()
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
Client: fc,
l: zap.Must(zap.NewDevelopment()).Sugar(),
tsClient: &fakeTSClient{},
clock: tstest.NewClock(tstest.ClockOpts{}),
tsNamespace: tsNamespace,
tsProxyImage: testProxyImage,
Client: fc,
l: zap.Must(zap.NewDevelopment()).Sugar(),
tsClient: &fakeTSClient{},
clock: tstest.NewClock(tstest.ClockOpts{}),
}
existingServices := []string{"svc1", "svc2"}
@ -1326,7 +1361,7 @@ func setProxyClassReady(t *testing.T, fc client.Client, cl *tstest.Clock, name s
return pc
}
func verifyProxyGroupCounts(t *testing.T, r *ProxyGroupReconciler, wantIngress, wantEgress int) {
func verifyProxyGroupCounts(t *testing.T, r *ProxyGroupReconciler, wantIngress, wantEgress, wantAPIServer int) {
t.Helper()
if r.ingressProxyGroups.Len() != wantIngress {
t.Errorf("expected %d ingress proxy groups, got %d", wantIngress, r.ingressProxyGroups.Len())
@ -1334,6 +1369,9 @@ func verifyProxyGroupCounts(t *testing.T, r *ProxyGroupReconciler, wantIngress,
if r.egressProxyGroups.Len() != wantEgress {
t.Errorf("expected %d egress proxy groups, got %d", wantEgress, r.egressProxyGroups.Len())
}
if r.apiServerProxyGroups.Len() != wantAPIServer {
t.Errorf("expected %d kube-apiserver proxy groups, got %d", wantAPIServer, r.apiServerProxyGroups.Len())
}
}
func verifyEnvVar(t *testing.T, sts *appsv1.StatefulSet, name, expectedValue string) {
@ -1512,7 +1550,7 @@ func TestProxyGroupLetsEncryptStaging(t *testing.T) {
reconciler := &ProxyGroupReconciler{
tsNamespace: tsNamespace,
proxyImage: testProxyImage,
tsProxyImage: testProxyImage,
defaultTags: []string{"tag:test"},
defaultProxyClass: tt.defaultProxyClass,
Client: fc,

View File

@ -761,7 +761,7 @@ func applyProxyClassToStatefulSet(pc *tsapi.ProxyClass, ss *appsv1.StatefulSet,
}
if pc.Spec.UseLetsEncryptStagingEnvironment && (stsCfg.proxyType == proxyTypeIngressResource || stsCfg.proxyType == string(tsapi.ProxyGroupTypeIngress)) {
for i, c := range ss.Spec.Template.Spec.Containers {
if c.Name == "tailscale" {
if isMainContainer(&c) {
ss.Spec.Template.Spec.Containers[i].Env = append(ss.Spec.Template.Spec.Containers[i].Env, corev1.EnvVar{
Name: "TS_DEBUG_ACME_DIRECTORY_URL",
Value: letsEncryptStagingEndpoint,
@ -829,7 +829,7 @@ func applyProxyClassToStatefulSet(pc *tsapi.ProxyClass, ss *appsv1.StatefulSet,
return base
}
for i, c := range ss.Spec.Template.Spec.Containers {
if c.Name == "tailscale" {
if isMainContainer(&c) {
ss.Spec.Template.Spec.Containers[i] = updateContainer(wantsPod.TailscaleContainer, ss.Spec.Template.Spec.Containers[i])
break
}
@ -847,7 +847,7 @@ func applyProxyClassToStatefulSet(pc *tsapi.ProxyClass, ss *appsv1.StatefulSet,
func enableEndpoints(ss *appsv1.StatefulSet, metrics, debug bool) {
for i, c := range ss.Spec.Template.Spec.Containers {
if c.Name == "tailscale" {
if isMainContainer(&c) {
if debug {
ss.Spec.Template.Spec.Containers[i].Env = append(ss.Spec.Template.Spec.Containers[i].Env,
// Serve tailscaled's debug metrics on on
@ -902,6 +902,10 @@ func enableEndpoints(ss *appsv1.StatefulSet, metrics, debug bool) {
}
}
func isMainContainer(c *corev1.Container) bool {
return c.Name == "tailscale" || c.Name == "k8s-proxy"
}
// tailscaledConfig takes a proxy config, a newly generated auth key if generated and a Secret with the previous proxy
// state and auth key and returns tailscaled config files for currently supported proxy versions.
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {

View File

@ -60,7 +60,6 @@ type HAServiceReconciler struct {
recorder record.EventRecorder
logger *zap.SugaredLogger
tsClient tsClient
tsnetServer tsnetServer
tsNamespace string
lc localClient
defaultTags []string
@ -221,7 +220,7 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
// This checks and ensures that Tailscale Service's owner references are updated
// for this Service and errors if that is not possible (i.e. because it
// appears that the Tailscale Service has been created by a non-operator actor).
updatedAnnotations, err := r.ownerAnnotations(existingTSSvc)
updatedAnnotations, err := ownerAnnotations(r.operatorID, existingTSSvc)
if err != nil {
instr := fmt.Sprintf("To proceed, you can either manually delete the existing Tailscale Service or choose a different hostname with the '%s' annotaion", AnnotationHostname)
msg := fmt.Sprintf("error ensuring ownership of Tailscale Service %s: %v. %s", hostname, err, instr)
@ -395,7 +394,7 @@ func (r *HAServiceReconciler) maybeCleanup(ctx context.Context, hostname string,
serviceName := tailcfg.ServiceName("svc:" + hostname)
// 1. Clean up the Tailscale Service.
svcChanged, err = r.cleanupTailscaleService(ctx, serviceName, logger)
svcChanged, err = cleanupTailscaleService(ctx, r.tsClient, serviceName, r.operatorID, logger)
if err != nil {
return false, fmt.Errorf("error deleting Tailscale Service: %w", err)
}
@ -456,7 +455,7 @@ func (r *HAServiceReconciler) maybeCleanupProxyGroup(ctx context.Context, proxyG
return false, fmt.Errorf("failed to update tailscaled config services: %w", err)
}
svcsChanged, err = r.cleanupTailscaleService(ctx, tailcfg.ServiceName(tsSvcName), logger)
svcsChanged, err = cleanupTailscaleService(ctx, r.tsClient, tailcfg.ServiceName(tsSvcName), r.operatorID, logger)
if err != nil {
return false, fmt.Errorf("deleting Tailscale Service %q: %w", tsSvcName, err)
}
@ -529,8 +528,8 @@ func (r *HAServiceReconciler) tailnetCertDomain(ctx context.Context) (string, er
// If a Tailscale Service is found, but contains other owner references, only removes this operator's owner reference.
// If a Tailscale Service by the given name is not found or does not contain this operator's owner reference, do nothing.
// It returns true if an existing Tailscale Service was updated to remove owner reference, as well as any error that occurred.
func (r *HAServiceReconciler) cleanupTailscaleService(ctx context.Context, name tailcfg.ServiceName, logger *zap.SugaredLogger) (updated bool, err error) {
svc, err := r.tsClient.GetVIPService(ctx, name)
func cleanupTailscaleService(ctx context.Context, tsClient tsClient, name tailcfg.ServiceName, operatorID string, logger *zap.SugaredLogger) (updated bool, err error) {
svc, err := tsClient.GetVIPService(ctx, name)
if isErrorFeatureFlagNotEnabled(err) {
msg := fmt.Sprintf("Unable to proceed with cleanup: %s.", msgFeatureFlagNotEnabled)
logger.Warn(msg)
@ -563,14 +562,14 @@ func (r *HAServiceReconciler) cleanupTailscaleService(ctx context.Context, name
// cluster before deleting the Ingress. Perhaps the comparison could be
// 'if or.OperatorID == r.operatorID || or.ingressUID == r.ingressUID'.
ix := slices.IndexFunc(o.OwnerRefs, func(or OwnerRef) bool {
return or.OperatorID == r.operatorID
return or.OperatorID == operatorID
})
if ix == -1 {
return false, nil
}
if len(o.OwnerRefs) == 1 {
logger.Infof("Deleting Tailscale Service %q", name)
return false, r.tsClient.DeleteVIPService(ctx, name)
return false, tsClient.DeleteVIPService(ctx, name)
}
o.OwnerRefs = slices.Delete(o.OwnerRefs, ix, ix+1)
logger.Infof("Updating Tailscale Service %q", name)
@ -579,7 +578,7 @@ func (r *HAServiceReconciler) cleanupTailscaleService(ctx context.Context, name
return false, fmt.Errorf("error marshalling updated Tailscale Service owner reference: %w", err)
}
svc.Annotations[ownerAnnotation] = string(json)
return true, r.tsClient.CreateOrUpdateVIPService(ctx, svc)
return true, tsClient.CreateOrUpdateVIPService(ctx, svc)
}
func (a *HAServiceReconciler) backendRoutesSetup(ctx context.Context, serviceName, replicaName, pgName string, wantsCfg *ingressservices.Config, logger *zap.SugaredLogger) (bool, error) {
@ -742,49 +741,6 @@ func (a *HAServiceReconciler) numberPodsAdvertising(ctx context.Context, pgName
return count, nil
}
// ownerAnnotations returns the updated annotations required to ensure this
// instance of the operator is included as an owner. If the Tailscale Service is not
// nil, but does not contain an owner we return an error as this likely means
// that the Tailscale Service was created by something other than a Tailscale
// Kubernetes operator.
func (r *HAServiceReconciler) ownerAnnotations(svc *tailscale.VIPService) (map[string]string, error) {
ref := OwnerRef{
OperatorID: r.operatorID,
}
if svc == nil {
c := ownerAnnotationValue{OwnerRefs: []OwnerRef{ref}}
json, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf("[unexpected] unable to marshal Tailscale Service owner annotation contents: %w, please report this", err)
}
return map[string]string{
ownerAnnotation: string(json),
}, nil
}
o, err := parseOwnerAnnotation(svc)
if err != nil {
return nil, err
}
if o == nil || len(o.OwnerRefs) == 0 {
return nil, fmt.Errorf("Tailscale Service %s exists, but does not contain owner annotation with owner references; not proceeding as this is likely a resource created by something other than the Tailscale Kubernetes operator", svc.Name)
}
if slices.Contains(o.OwnerRefs, ref) { // up to date
return svc.Annotations, nil
}
o.OwnerRefs = append(o.OwnerRefs, ref)
json, err := json.Marshal(o)
if err != nil {
return nil, fmt.Errorf("error marshalling updated owner references: %w", err)
}
newAnnots := make(map[string]string, len(svc.Annotations)+1)
for k, v := range svc.Annotations {
newAnnots[k] = v
}
newAnnots[ownerAnnotation] = string(json)
return newAnnots, nil
}
// dnsNameForService returns the DNS name for the given Tailscale Service name.
func (r *HAServiceReconciler) dnsNameForService(ctx context.Context, svc tailcfg.ServiceName) (string, error) {
s := svc.WithoutPrefix()

View File

@ -187,7 +187,6 @@ func setupServiceTest(t *testing.T) (*HAServiceReconciler, *corev1.Secret, clien
if err := fc.Status().Update(context.Background(), pg); err != nil {
t.Fatal(err)
}
fakeTsnetServer := &fakeTSNetServer{certDomains: []string{"foo.com"}}
ft := &fakeTSClient{}
zl, err := zap.NewDevelopment()
@ -210,7 +209,6 @@ func setupServiceTest(t *testing.T) (*HAServiceReconciler, *corev1.Secret, clien
clock: cl,
defaultTags: []string{"tag:k8s"},
tsNamespace: "operator-ns",
tsnetServer: fakeTsnetServer,
logger: zl.Sugar(),
recorder: record.NewFakeRecorder(10),
lc: lc,

201
cmd/k8s-proxy/k8s-proxy.go Normal file
View File

@ -0,0 +1,201 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// k8s-proxy proxies between tailnet and Kubernetes cluster traffic.
// Currently, it only supports proxying tailnet clients to the Kubernetes API
// server.
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"tailscale.com/hostinfo"
"tailscale.com/ipn"
"tailscale.com/ipn/store"
apiproxy "tailscale.com/k8s-operator/api-proxy"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/state"
"tailscale.com/syncs"
"tailscale.com/tsnet"
)
func main() {
logger := zap.Must(zap.NewProduction()).Sugar()
defer logger.Sync()
if err := run(logger); err != nil {
logger.Fatal(err.Error())
}
}
func run(logger *zap.SugaredLogger) error {
var (
configFile = os.Getenv("TS_K8S_PROXY_CONFIG")
podUID = os.Getenv("POD_UID")
)
if configFile == "" {
return errors.New("TS_K8S_PROXY_CONFIG unset")
}
// ctx to live for the lifetime of the process.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO(tomhjp): Support reloading config.
// TODO(tomhjp): Support reading config from a Secret.
cfg, err := conf.Load(configFile)
if err != nil {
return fmt.Errorf("error loading config file %q: %w", configFile, err)
}
if cfg.Parsed.LogLevel != nil {
level, err := zapcore.ParseLevel(*cfg.Parsed.LogLevel)
if err != nil {
return fmt.Errorf("error parsing log level %q: %w", *cfg.Parsed.LogLevel, err)
}
logger = logger.WithOptions(zap.IncreaseLevel(level))
}
if cfg.Parsed.App != nil {
hostinfo.SetApp(*cfg.Parsed.App)
}
st, err := getStateStore(cfg.Parsed.State, logger)
if err != nil {
return err
}
// If Pod UID unset, assume we're running outside of a cluster/not managed
// by the operator, so no need to set additional state keys.
if podUID != "" {
if err := state.SetInitialKeys(st, podUID); err != nil {
return fmt.Errorf("error setting initial state: %w", err)
}
}
var authKey string
if cfg.Parsed.AuthKey != nil {
authKey = *cfg.Parsed.AuthKey
}
ts := &tsnet.Server{
Logf: logger.Named("tsnet").Debugf,
UserLogf: logger.Named("tsnet").Infof,
Store: st,
AuthKey: authKey,
}
if cfg.Parsed.Hostname != nil {
ts.Hostname = *cfg.Parsed.Hostname
}
// Make sure we crash loop if Up doesn't complete in reasonable time.
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
defer upCancel()
if _, err := ts.Up(upCtx); err != nil {
return fmt.Errorf("error starting tailscale server: %w", err)
}
defer ts.Close()
lc, err := ts.LocalClient()
if err != nil {
return fmt.Errorf("error getting local client: %w", err)
}
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil {
return fmt.Errorf("error watching IPN bus: %w", err)
}
defer w.Close()
errs := make(chan error)
wg := syncs.WaitGroup{}
if podUID != "" {
wg.Go(func() {
err := state.KeepKeysUpdated(st, w.Next)
if err != nil && err != ctx.Err() {
errs <- fmt.Errorf("error keeping state keys updated: %w", err)
}
})
}
restConfig, err := getRestConfig()
if err != nil {
return fmt.Errorf("error getting rest config: %w", err)
}
authMode := true
if cfg.Parsed.KubeAPIServer != nil {
v, ok := cfg.Parsed.KubeAPIServer.AuthMode.Get()
if ok {
authMode = v
}
}
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, authMode)
if err != nil {
return fmt.Errorf("error creating api server proxy: %w", err)
}
// TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not,
// and possibly issue certs upfront here before serving.
wg.Go(func() {
if err := ap.Run(ctx); err != nil {
errs <- fmt.Errorf("error running api server proxy: %w", err)
}
})
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
select {
case err = <-errs:
logger.Errorf("Shutting down due to error: %v", err)
case s := <-sig:
logger.Infof("Received %s, shutting down", s)
}
cancel()
wg.Wait()
return err
}
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
p := "mem:"
if path != nil {
p = *path
} else {
logger.Warn("No state Secret provided; using in-memory store, which will lose state on restart")
}
st, err := store.New(logger.Errorf, p)
if err != nil {
return nil, fmt.Errorf("error creating state store: %w", err)
}
return st, nil
}
func getRestConfig() (*rest.Config, error) {
restConfig, err := rest.InClusterConfig()
if err == nil {
return restConfig, nil
}
inClusterErr := fmt.Errorf("could not use in-cluster config: %w", err)
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
restConfig, err = clientConfig.ClientConfig()
if err == nil {
return restConfig, nil
}
return nil, errors.Join(inClusterErr, fmt.Errorf("could not use kubeconfig: %w", err))
}

View File

@ -1,29 +0,0 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package apiproxy
import (
"os"
"tailscale.com/types/opt"
)
func defaultBool(envName string, defVal bool) bool {
vs := os.Getenv(envName)
if vs == "" {
return defVal
}
v, _ := opt.Bool(vs).Get()
return v
}
func defaultEnv(envName, defVal string) string {
v := os.Getenv(envName)
if v == "" {
return defVal
}
return v
}

View File

@ -6,17 +6,17 @@
package apiproxy
import (
"context"
"crypto/tls"
"errors"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/netip"
"net/url"
"os"
"strings"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
@ -37,123 +37,49 @@ var (
whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil))
)
type APIServerProxyMode int
func (a APIServerProxyMode) String() string {
switch a {
case APIServerProxyModeDisabled:
return "disabled"
case APIServerProxyModeEnabled:
return "auth"
case APIServerProxyModeNoAuth:
return "noauth"
default:
return "unknown"
}
}
const (
APIServerProxyModeDisabled APIServerProxyMode = iota
APIServerProxyModeEnabled
APIServerProxyModeNoAuth
)
func ParseAPIProxyMode() APIServerProxyMode {
haveAuthProxyEnv := os.Getenv("AUTH_PROXY") != ""
haveAPIProxyEnv := os.Getenv("APISERVER_PROXY") != ""
switch {
case haveAPIProxyEnv && haveAuthProxyEnv:
log.Fatal("AUTH_PROXY and APISERVER_PROXY are mutually exclusive")
case haveAuthProxyEnv:
var authProxyEnv = defaultBool("AUTH_PROXY", false) // deprecated
if authProxyEnv {
return APIServerProxyModeEnabled
}
return APIServerProxyModeDisabled
case haveAPIProxyEnv:
var apiProxyEnv = defaultEnv("APISERVER_PROXY", "") // true, false or "noauth"
switch apiProxyEnv {
case "true":
return APIServerProxyModeEnabled
case "false", "":
return APIServerProxyModeDisabled
case "noauth":
return APIServerProxyModeNoAuth
default:
panic(fmt.Sprintf("unknown APISERVER_PROXY value %q", apiProxyEnv))
}
}
return APIServerProxyModeDisabled
}
// maybeLaunchAPIServerProxy launches the auth proxy, which is a small HTTP server
// that authenticates requests using the Tailscale LocalAPI and then proxies
// them to the kube-apiserver.
func MaybeLaunchAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, s *tsnet.Server, mode APIServerProxyMode) {
if mode == APIServerProxyModeDisabled {
return
}
startlog := zlog.Named("launchAPIProxy")
if mode == APIServerProxyModeNoAuth {
// NewAPIServerProxy creates a new APIServerProxy that's ready to start once Run
// is called. No network traffic will flow until Run is called.
//
// authMode controls how the proxy behaves:
// - true: the proxy is started and requests are impersonated using the
// caller's Tailscale identity and the rules defined in the tailnet ACLs.
// - false: the proxy is started and requests are passed through to the
// Kubernetes API without any auth modifications.
func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsnet.Server, authMode bool) (*APIServerProxy, error) {
if !authMode {
restConfig = rest.AnonymousClientConfig(restConfig)
}
cfg, err := restConfig.TransportConfig()
if err != nil {
startlog.Fatalf("could not get rest.TransportConfig(): %v", err)
return nil, fmt.Errorf("could not get rest.TransportConfig(): %w", err)
}
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
// incompatible with HTTP/2; so disable HTTP/2 in the proxy.
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig, err = transport.TLSConfigFor(cfg)
if err != nil {
startlog.Fatalf("could not get transport.TLSConfigFor(): %v", err)
return nil, fmt.Errorf("could not get transport.TLSConfigFor(): %w", err)
}
tr.TLSNextProto = make(map[string]func(authority string, c *tls.Conn) http.RoundTripper)
rt, err := transport.HTTPWrappersForConfig(cfg, tr)
if err != nil {
startlog.Fatalf("could not get rest.TransportConfig(): %v", err)
return nil, fmt.Errorf("could not get rest.TransportConfig(): %w", err)
}
go runAPIServerProxy(s, rt, zlog.Named("apiserver-proxy"), mode, restConfig.Host)
}
// runAPIServerProxy runs an HTTP server that authenticates requests using the
// Tailscale LocalAPI and then proxies them to the Kubernetes API.
// It listens on :443 and uses the Tailscale HTTPS certificate.
// s will be started if it is not already running.
// rt is used to proxy requests to the Kubernetes API.
//
// mode controls how the proxy behaves:
// - apiserverProxyModeDisabled: the proxy is not started.
// - apiserverProxyModeEnabled: the proxy is started and requests are impersonated using the
// caller's identity from the Tailscale LocalAPI.
// - apiserverProxyModeNoAuth: the proxy is started and requests are not impersonated and
// are passed through to the Kubernetes API.
//
// It never returns.
func runAPIServerProxy(ts *tsnet.Server, rt http.RoundTripper, log *zap.SugaredLogger, mode APIServerProxyMode, host string) {
if mode == APIServerProxyModeDisabled {
return
}
ln, err := ts.Listen("tcp", ":443")
u, err := url.Parse(restConfig.Host)
if err != nil {
log.Fatalf("could not listen on :443: %v", err)
}
u, err := url.Parse(host)
if err != nil {
log.Fatalf("runAPIServerProxy: failed to parse URL %v", err)
return nil, fmt.Errorf("failed to parse URL %w", err)
}
lc, err := ts.LocalClient()
if err != nil {
log.Fatalf("could not get local client: %v", err)
return nil, fmt.Errorf("could not get local client: %w", err)
}
ap := &apiserverProxy{
log: log,
ap := &APIServerProxy{
log: zlog,
lc: lc,
mode: mode,
authMode: authMode,
upstreamURL: u,
ts: ts,
}
@ -164,41 +90,69 @@ func runAPIServerProxy(ts *tsnet.Server, rt http.RoundTripper, log *zap.SugaredL
Transport: rt,
}
return ap, nil
}
// Run starts the HTTP server that authenticates requests using the
// Tailscale LocalAPI and then proxies them to the Kubernetes API.
// It listens on :443 and uses the Tailscale HTTPS certificate.
//
// It return when ctx is cancelled or ServeTLS fails.
func (ap *APIServerProxy) Run(ctx context.Context) error {
ln, err := ap.ts.Listen("tcp", ":443")
if err != nil {
return fmt.Errorf("could not listen on :443: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("/", ap.serveDefault)
mux.HandleFunc("POST /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecSPDY)
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecWS)
hs := &http.Server{
ap.hs = &http.Server{
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
// incompatible with HTTP/2; so disable HTTP/2 in the proxy.
TLSConfig: &tls.Config{
GetCertificate: lc.GetCertificate,
GetCertificate: ap.lc.GetCertificate,
NextProtos: []string{"http/1.1"},
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
Handler: mux,
}
log.Infof("API server proxy in %q mode is listening on %s", mode, ln.Addr())
if err := hs.ServeTLS(ln, "", ""); err != nil {
log.Fatalf("runAPIServerProxy: failed to serve %v", err)
errs := make(chan error)
go func() {
ap.log.Infof("API server proxy is listening on %s with auth mode: %v", ln.Addr(), ap.authMode)
if err := ap.hs.ServeTLS(ln, "", ""); err != nil && err != http.ErrServerClosed {
errs <- fmt.Errorf("failed to serve: %w", err)
}
}()
select {
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return ap.hs.Shutdown(shutdownCtx)
case err := <-errs:
return err
}
}
// apiserverProxy is an [net/http.Handler] that authenticates requests using the Tailscale
// APIServerProxy is an [net/http.Handler] that authenticates requests using the Tailscale
// LocalAPI and then proxies them to the Kubernetes API.
type apiserverProxy struct {
type APIServerProxy struct {
log *zap.SugaredLogger
lc *local.Client
rp *httputil.ReverseProxy
mode APIServerProxyMode
authMode bool
ts *tsnet.Server
hs *http.Server
upstreamURL *url.URL
}
// serveDefault is the default handler for Kubernetes API server requests.
func (ap *apiserverProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
func (ap *APIServerProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
who, err := ap.whoIs(r)
if err != nil {
ap.authError(w, err)
@ -210,17 +164,17 @@ func (ap *apiserverProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
// serveExecSPDY serves 'kubectl exec' requests for sessions streamed over SPDY,
// optionally configuring the kubectl exec sessions to be recorded.
func (ap *apiserverProxy) serveExecSPDY(w http.ResponseWriter, r *http.Request) {
func (ap *APIServerProxy) serveExecSPDY(w http.ResponseWriter, r *http.Request) {
ap.execForProto(w, r, ksr.SPDYProtocol)
}
// serveExecWS serves 'kubectl exec' requests for sessions streamed over WebSocket,
// optionally configuring the kubectl exec sessions to be recorded.
func (ap *apiserverProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
func (ap *APIServerProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
ap.execForProto(w, r, ksr.WSProtocol)
}
func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) {
func (ap *APIServerProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) {
const (
podNameKey = "pod"
namespaceNameKey = "namespace"
@ -282,10 +236,10 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
ap.rp.ServeHTTP(h, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
}
func (h *apiserverProxy) addImpersonationHeadersAsRequired(r *http.Request) {
r.URL.Scheme = h.upstreamURL.Scheme
r.URL.Host = h.upstreamURL.Host
if h.mode == APIServerProxyModeNoAuth {
func (ap *APIServerProxy) addImpersonationHeadersAsRequired(r *http.Request) {
r.URL.Scheme = ap.upstreamURL.Scheme
r.URL.Host = ap.upstreamURL.Host
if !ap.authMode {
// If we are not providing authentication, then we are just
// proxying to the Kubernetes API, so we don't need to do
// anything else.
@ -310,16 +264,16 @@ func (h *apiserverProxy) addImpersonationHeadersAsRequired(r *http.Request) {
}
// Now add the impersonation headers that we want.
if err := addImpersonationHeaders(r, h.log); err != nil {
log.Print("failed to add impersonation headers: ", err.Error())
if err := addImpersonationHeaders(r, ap.log); err != nil {
ap.log.Errorf("failed to add impersonation headers: %v", err)
}
}
func (ap *apiserverProxy) whoIs(r *http.Request) (*apitype.WhoIsResponse, error) {
func (ap *APIServerProxy) whoIs(r *http.Request) (*apitype.WhoIsResponse, error) {
return ap.lc.WhoIs(r.Context(), r.RemoteAddr)
}
func (ap *apiserverProxy) authError(w http.ResponseWriter, err error) {
func (ap *APIServerProxy) authError(w http.ResponseWriter, err error) {
ap.log.Errorf("failed to authenticate caller: %v", err)
http.Error(w, "failed to authenticate caller", http.StatusInternalServerError)
}

View File

@ -313,6 +313,22 @@ _Appears in:_
#### KubeAPIServerConfig
KubeAPIServerConfig contains configuration specific to the kube-apiserver ProxyGroup type.
_Appears in:_
- [ProxyGroupSpec](#proxygroupspec)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `authMode` _boolean_ | AuthMode enables auth mode for the API Server proxy. In auth mode,<br />requests from the tailnet proxied over to the Kubernetes API server<br />are additionally impersonated using the sender's tailnet identity. | | |
#### LabelValue
_Underlying type:_ _string_
@ -638,11 +654,12 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `type` _[ProxyGroupType](#proxygrouptype)_ | Type of the ProxyGroup proxies. Supported types are egress and ingress.<br />Type is immutable once a ProxyGroup is created. | | Enum: [egress ingress] <br />Type: string <br /> |
| `type` _[ProxyGroupType](#proxygrouptype)_ | Type of the ProxyGroup proxies. Supported types are egress, ingress, and kube-apiserver.<br />Type is immutable once a ProxyGroup is created. | | Enum: [egress ingress kube-apiserver] <br />Type: string <br /> |
| `tags` _[Tags](#tags)_ | Tags that the Tailscale devices will be tagged with. Defaults to [tag:k8s].<br />If you specify custom tags here, make sure you also make the operator<br />an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a ProxyGroup device has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> |
| `replicas` _integer_ | Replicas specifies how many replicas to create the StatefulSet with.<br />Defaults to 2. | | Minimum: 0 <br /> |
| `hostnamePrefix` _[HostnamePrefix](#hostnameprefix)_ | HostnamePrefix is the hostname prefix to use for tailnet devices created<br />by the ProxyGroup. Each device will have the integer number from its<br />StatefulSet pod appended to this prefix to form the full hostname.<br />HostnamePrefix can contain lower case letters, numbers and dashes, it<br />must not start with a dash and must be between 1 and 62 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}$` <br />Type: string <br /> |
| `proxyClass` _string_ | ProxyClass is the name of the ProxyClass custom resource that contains<br />configuration options that should be applied to the resources created<br />for this ProxyGroup. If unset, and there is no default ProxyClass<br />configured, the operator will create resources with the default<br />configuration. | | |
| `kubeAPIServerConfig` _[KubeAPIServerConfig](#kubeapiserverconfig)_ | KubeAPIServerConfig contains configuration specific to the kube-apiserver<br />ProxyGroup type. This field is only used when Type is set to "kube-apiserver". | | |
#### ProxyGroupStatus
@ -669,7 +686,7 @@ _Underlying type:_ _string_
_Validation:_
- Enum: [egress ingress]
- Enum: [egress ingress kube-apiserver]
- Type: string
_Appears in:_

View File

@ -226,4 +226,6 @@ const (
IngressSvcValid ConditionType = `TailscaleIngressSvcValid`
IngressSvcConfigured ConditionType = `TailscaleIngressSvcConfigured`
APIServerProxyReady ConditionType = `APIServerProxyReady`
)

View File

@ -49,7 +49,7 @@ type ProxyGroupList struct {
}
type ProxyGroupSpec struct {
// Type of the ProxyGroup proxies. Supported types are egress and ingress.
// Type of the ProxyGroup proxies. Supported types are egress, ingress, and kube-apiserver.
// Type is immutable once a ProxyGroup is created.
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="ProxyGroup type is immutable"
Type ProxyGroupType `json:"type"`
@ -84,6 +84,11 @@ type ProxyGroupSpec struct {
// configuration.
// +optional
ProxyClass string `json:"proxyClass,omitempty"`
// KubeAPIServerConfig contains configuration specific to the kube-apiserver
// ProxyGroup type. This field is only used when Type is set to "kube-apiserver".
// +optional
KubeAPIServerConfig *KubeAPIServerConfig `json:"kubeAPIServerConfig,omitempty"`
}
type ProxyGroupStatus struct {
@ -122,14 +127,24 @@ type TailnetDevice struct {
}
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Enum=egress;ingress
// +kubebuilder:validation:Enum=egress;ingress;kube-apiserver
type ProxyGroupType string
const (
ProxyGroupTypeEgress ProxyGroupType = "egress"
ProxyGroupTypeIngress ProxyGroupType = "ingress"
ProxyGroupTypeEgress ProxyGroupType = "egress"
ProxyGroupTypeIngress ProxyGroupType = "ingress"
ProxyGroupTypeKubernetesAPIServer ProxyGroupType = "kube-apiserver"
)
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Pattern=`^[a-z0-9][a-z0-9-]{0,61}$`
type HostnamePrefix string
// KubeAPIServerConfig contains configuration specific to the kube-apiserver ProxyGroup type.
type KubeAPIServerConfig struct {
// AuthMode enables auth mode for the API Server proxy. In auth mode,
// requests from the tailnet proxied over to the Kubernetes API server
// are additionally impersonated using the sender's tailnet identity.
// +optional
AuthMode *bool `json:"authMode,omitempty"`
}

View File

@ -316,6 +316,26 @@ func (in *Env) DeepCopy() *Env {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KubeAPIServerConfig) DeepCopyInto(out *KubeAPIServerConfig) {
*out = *in
if in.AuthMode != nil {
in, out := &in.AuthMode, &out.AuthMode
*out = new(bool)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubeAPIServerConfig.
func (in *KubeAPIServerConfig) DeepCopy() *KubeAPIServerConfig {
if in == nil {
return nil
}
out := new(KubeAPIServerConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in Labels) DeepCopyInto(out *Labels) {
{
@ -731,6 +751,11 @@ func (in *ProxyGroupSpec) DeepCopyInto(out *ProxyGroupSpec) {
*out = new(int32)
**out = **in
}
if in.KubeAPIServerConfig != nil {
in, out := &in.KubeAPIServerConfig, &out.KubeAPIServerConfig
*out = new(KubeAPIServerConfig)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProxyGroupSpec.

101
kube/k8s-proxy/conf/conf.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package conf contains code to load, manipulate, and access config file
// settings for k8s-proxy.
package conf
import (
"encoding/json"
"fmt"
"os"
"github.com/tailscale/hujson"
"tailscale.com/types/opt"
)
const v1Alpha1 = "v1alpha1"
// Config describes a config file.
type Config struct {
Path string // disk path of HuJSON
Raw []byte // raw bytes from disk, in HuJSON form
Std []byte // standardized JSON form
Version string // "v1alpha1"
// Parsed is the parsed config, converted from its on-disk version to the
// latest known format.
Parsed ConfigV1Alpha1
}
// VersionedConfig allows specifying config at the root of the object, or in
// a versioned sub-object.
// e.g. {"version": "v1alpha1", "authKey": "abc123"}
// or {"version": "v1beta1", "a-beta-config": "a-beta-value", "v1alpha1": {"authKey": "abc123"}}
type VersionedConfig struct {
Version string `json:",omitempty"` // "v1alpha1"
// Latest version of the config.
*ConfigV1Alpha1
// Backwards compatibility version(s) of the config. Fields and sub-fields
// from here should only be added to, never changed in place.
V1Alpha1 *ConfigV1Alpha1 `json:",omitempty"`
// V1Beta1 *ConfigV1Beta1 `json:",omitempty"` // Not yet used.
}
type ConfigV1Alpha1 struct {
AuthKey *string `json:",omitempty"` // Tailscale auth key to use.
Hostname *string `json:",omitempty"` // Tailscale device hostname.
State *string `json:",omitempty"` // Path to the Tailscale state.
LogLevel *string `json:",omitempty"` // "debug", "info". Defaults to "info".
App *string `json:",omitempty"` // e.g. kubetypes.AppProxyGroupKubeAPIServer
KubeAPIServer *KubeAPIServer `json:",omitempty"` // Config specific to the API Server proxy.
}
type KubeAPIServer struct {
AuthMode opt.Bool `json:",omitempty"`
}
// Load reads and parses the config file at the provided path on disk.
func Load(path string) (c Config, err error) {
c.Path = path
c.Raw, err = os.ReadFile(path)
if err != nil {
return c, fmt.Errorf("error reading config file %q: %w", path, err)
}
c.Std, err = hujson.Standardize(c.Raw)
if err != nil {
return c, fmt.Errorf("error parsing config file %q HuJSON/JSON: %w", path, err)
}
var ver VersionedConfig
if err := json.Unmarshal(c.Std, &ver); err != nil {
return c, fmt.Errorf("error parsing config file %q: %w", path, err)
}
rootV1Alpha1 := (ver.Version == v1Alpha1)
backCompatV1Alpha1 := (ver.V1Alpha1 != nil)
switch {
case ver.Version == "":
return c, fmt.Errorf("error parsing config file %q: no \"version\" field provided", path)
case rootV1Alpha1 && backCompatV1Alpha1:
// Exactly one of these should be set.
return c, fmt.Errorf("error parsing config file %q: both root and v1alpha1 config provided", path)
case rootV1Alpha1 != backCompatV1Alpha1:
c.Version = v1Alpha1
switch {
case rootV1Alpha1 && ver.ConfigV1Alpha1 != nil:
c.Parsed = *ver.ConfigV1Alpha1
case backCompatV1Alpha1:
c.Parsed = *ver.V1Alpha1
default:
c.Parsed = ConfigV1Alpha1{}
}
default:
return c, fmt.Errorf("error parsing config file %q: unsupported \"version\" value %q; want \"%s\"", path, ver.Version, v1Alpha1)
}
return c, nil
}

View File

@ -0,0 +1,86 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package conf
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"tailscale.com/types/ptr"
)
// Test that the config file can be at the root of the object, or in a versioned sub-object.
// or {"version": "v1beta1", "a-beta-config": "a-beta-value", "v1alpha1": {"authKey": "abc123"}}
func TestVersionedConfig(t *testing.T) {
testCases := map[string]struct {
inputConfig string
expectedConfig ConfigV1Alpha1
expectedError string
}{
"root_config_v1alpha1": {
inputConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
},
"backwards_compat_v1alpha1_config": {
// Client doesn't know about v1beta1, so it should read in v1alpha1.
inputConfig: `{"version": "v1beta1", "beta-key": "beta-value", "authKey": "def456", "v1alpha1": {"authKey": "abc123"}}`,
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
},
"unknown_key_allowed": {
// Adding new keys to the config doesn't require a version bump.
inputConfig: `{"version": "v1alpha1", "unknown-key": "unknown-value", "authKey": "abc123"}`,
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
},
"version_only_no_authkey": {
inputConfig: `{"version": "v1alpha1"}`,
expectedConfig: ConfigV1Alpha1{},
},
"both_config_v1alpha1": {
inputConfig: `{"version": "v1alpha1", "authKey": "abc123", "v1alpha1": {"authKey": "def456"}}`,
expectedError: "both root and v1alpha1 config provided",
},
"empty_config": {
inputConfig: `{}`,
expectedError: `no "version" field provided`,
},
"v1beta1_without_backwards_compat": {
inputConfig: `{"version": "v1beta1", "beta-key": "beta-value", "authKey": "def456"}`,
expectedError: `unsupported "version" value "v1beta1"; want "v1alpha1"`,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "config.json")
if err := os.WriteFile(path, []byte(tc.inputConfig), 0644); err != nil {
t.Fatalf("failed to write config file: %v", err)
}
cfg, err := Load(path)
switch {
case tc.expectedError == "" && err != nil:
t.Fatalf("unexpected error: %v", err)
case tc.expectedError != "":
if err == nil {
t.Fatalf("expected error %q, got nil", tc.expectedError)
} else if !strings.Contains(err.Error(), tc.expectedError) {
t.Fatalf("expected error %q, got %q", tc.expectedError, err.Error())
}
return
}
if cfg.Version != "v1alpha1" {
t.Fatalf("expected version %q, got %q", "v1alpha1", cfg.Version)
}
// Diff actual vs expected config.
if diff := cmp.Diff(cfg.Parsed, tc.expectedConfig); diff != "" {
t.Fatalf("Unexpected parsed config (-got +want):\n%s", diff)
}
})
}
}

View File

@ -5,14 +5,15 @@ package kubetypes
const (
// Hostinfo App values for the Tailscale Kubernetes Operator components.
AppOperator = "k8s-operator"
AppAPIServerProxy = "k8s-operator-proxy"
AppIngressProxy = "k8s-operator-ingress-proxy"
AppIngressResource = "k8s-operator-ingress-resource"
AppEgressProxy = "k8s-operator-egress-proxy"
AppConnector = "k8s-operator-connector-resource"
AppProxyGroupEgress = "k8s-operator-proxygroup-egress"
AppProxyGroupIngress = "k8s-operator-proxygroup-ingress"
AppOperator = "k8s-operator"
AppInProcessAPIServerProxy = "k8s-operator-proxy"
AppIngressProxy = "k8s-operator-ingress-proxy"
AppIngressResource = "k8s-operator-ingress-resource"
AppEgressProxy = "k8s-operator-egress-proxy"
AppConnector = "k8s-operator-connector-resource"
AppProxyGroupEgress = "k8s-operator-proxygroup-egress"
AppProxyGroupIngress = "k8s-operator-proxygroup-ingress"
AppProxyGroupKubeAPIServer = "k8s-operator-proxygroup-kube-apiserver"
// Clientmetrics for Tailscale Kubernetes Operator components
MetricIngressProxyCount = "k8s_ingress_proxies" // L3
@ -29,6 +30,7 @@ const (
MetricEgressServiceCount = "k8s_egress_service_resources"
MetricProxyGroupEgressCount = "k8s_proxygroup_egress_resources"
MetricProxyGroupIngressCount = "k8s_proxygroup_ingress_resources"
MetricProxyGroupAPIServerCount = "k8s_proxygroup_kube_apiserver_resources"
// Keys that containerboot writes to state file that can be used to determine its state.
// fields set in Tailscale state Secret. These are mostly used by the Tailscale Kubernetes operator to determine

97
kube/state/state.go Normal file
View File

@ -0,0 +1,97 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package state updates state keys for tailnet client devices managed by the
// operator. These keys are used to signal readiness, metadata, and current
// configuration state to the operator. Client packages deployed by the operator
// include containerboot, tsrecorder, and k8s-proxy, but currently containerboot
// has its own implementation to manage the same keys.
package state
import (
"encoding/json"
"fmt"
"tailscale.com/ipn"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/util/deephash"
)
const (
keyPodUID = ipn.StateKey(kubetypes.KeyPodUID)
keyCapVer = ipn.StateKey(kubetypes.KeyCapVer)
keyDeviceID = ipn.StateKey(kubetypes.KeyDeviceID)
keyDeviceIPs = ipn.StateKey(kubetypes.KeyDeviceIPs)
keyDeviceFQDN = ipn.StateKey(kubetypes.KeyDeviceFQDN)
)
// SetInitialKeys sets Pod UID and cap ver and clears tailnet device state
// keys to help stop the operator using stale tailnet device state.
func SetInitialKeys(store ipn.StateStore, podUID string) error {
// Clear device state keys first so the operator knows if the pod UID
// matches, the other values are definitely not stale.
for _, key := range []ipn.StateKey{keyDeviceID, keyDeviceFQDN, keyDeviceIPs} {
if _, err := store.ReadState(key); err == nil {
if err := store.WriteState(key, nil); err != nil {
return fmt.Errorf("error writing %q to state store: %w", key, err)
}
}
}
if err := store.WriteState(keyPodUID, []byte(podUID)); err != nil {
return fmt.Errorf("error writing pod UID to state store: %w", err)
}
if err := store.WriteState(keyCapVer, fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)); err != nil {
return fmt.Errorf("error writing capability version to state store: %w", err)
}
return nil
}
// KeepKeysUpdated sets state store keys consistent with containerboot to
// signal proxy readiness to the operator. It runs until its context is
// cancelled or it hits an error. The passed in next function is expected to be
// from a local.IPNBusWatcher that is at least subscribed to
// ipn.NotifyInitialNetMap.
func KeepKeysUpdated(store ipn.StateStore, next func() (ipn.Notify, error)) error {
var currentDeviceID, currentDeviceIPs, currentDeviceFQDN deephash.Sum
for {
n, err := next() // Blocks on a streaming LocalAPI HTTP call.
if err != nil {
return err
}
if n.NetMap == nil {
continue
}
if deviceID := n.NetMap.SelfNode.StableID(); deephash.Update(&currentDeviceID, &deviceID) {
if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil {
return fmt.Errorf("failed to store device ID in state: %w", err)
}
}
if fqdn := n.NetMap.SelfNode.Name(); deephash.Update(&currentDeviceFQDN, &fqdn) {
if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil {
return fmt.Errorf("failed to store device FQDN in state: %w", err)
}
}
if addrs := n.NetMap.SelfNode.Addresses(); deephash.Update(&currentDeviceIPs, &addrs) {
var deviceIPs []string
for _, addr := range addrs.AsSlice() {
deviceIPs = append(deviceIPs, addr.Addr().String())
}
deviceIPsValue, err := json.Marshal(deviceIPs)
if err != nil {
return err
}
if err := store.WriteState(keyDeviceIPs, deviceIPsValue); err != nil {
return fmt.Errorf("failed to store device IPs in state: %w", err)
}
}
}
}

203
kube/state/state_test.go Normal file
View File

@ -0,0 +1,203 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package state
import (
"bytes"
"fmt"
"net/netip"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"tailscale.com/ipn"
"tailscale.com/ipn/store"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
)
func TestSetInitialStateKeys(t *testing.T) {
var (
podUID = []byte("test-pod-uid")
expectedCapVer = fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)
)
for name, tc := range map[string]struct {
initial map[ipn.StateKey][]byte
expected map[ipn.StateKey][]byte
}{
"empty_initial": {
initial: map[ipn.StateKey][]byte{},
expected: map[ipn.StateKey][]byte{
keyPodUID: podUID,
keyCapVer: expectedCapVer,
},
},
"existing_pod_uid_and_capver": {
initial: map[ipn.StateKey][]byte{
keyPodUID: podUID,
keyCapVer: expectedCapVer,
},
expected: map[ipn.StateKey][]byte{
keyPodUID: podUID,
keyCapVer: expectedCapVer,
},
},
"all_keys_preexisting": {
initial: map[ipn.StateKey][]byte{
keyPodUID: podUID,
keyCapVer: expectedCapVer,
keyDeviceID: []byte("existing-device-id"),
keyDeviceFQDN: []byte("existing-device-fqdn"),
keyDeviceIPs: []byte(`["1.2.3.4"]`),
},
expected: map[ipn.StateKey][]byte{
keyPodUID: podUID,
keyCapVer: expectedCapVer,
keyDeviceID: nil,
keyDeviceFQDN: nil,
keyDeviceIPs: nil,
},
},
} {
t.Run(name, func(t *testing.T) {
store, err := store.New(logger.Discard, "mem:")
if err != nil {
t.Fatalf("error creating in-memory store: %v", err)
}
for key, value := range tc.initial {
if err := store.WriteState(key, value); err != nil {
t.Fatalf("error writing initial state key %q: %v", key, err)
}
}
if err := SetInitialKeys(store, string(podUID)); err != nil {
t.Fatalf("setInitialStateKeys failed: %v", err)
}
actual := make(map[ipn.StateKey][]byte)
for expectedKey, expectedValue := range tc.expected {
actualValue, err := store.ReadState(expectedKey)
if err != nil {
t.Errorf("error reading state key %q: %v", expectedKey, err)
continue
}
actual[expectedKey] = actualValue
if !bytes.Equal(actualValue, expectedValue) {
t.Errorf("state key %q mismatch: expected %q, got %q", expectedKey, expectedValue, actualValue)
}
}
if diff := cmp.Diff(actual, tc.expected); diff != "" {
t.Errorf("state keys mismatch (-got +want):\n%s", diff)
}
})
}
}
func TestKeepStateKeysUpdated(t *testing.T) {
store, err := store.New(logger.Discard, "mem:")
if err != nil {
t.Fatalf("error creating in-memory store: %v", err)
}
nextWaiting := make(chan struct{})
go func() {
<-nextWaiting // Acknowledge the initial signal.
}()
notifyCh := make(chan ipn.Notify)
next := func() (ipn.Notify, error) {
nextWaiting <- struct{}{} // Send signal to test that state is consistent.
return <-notifyCh, nil // Wait for test input.
}
errs := make(chan error, 1)
go func() {
err := KeepKeysUpdated(store, next)
if err != nil {
errs <- fmt.Errorf("keepStateKeysUpdated returned with error: %w", err)
}
}()
for _, tc := range []struct {
name string
notify ipn.Notify
expected map[ipn.StateKey][]byte
}{
{
name: "initial_not_authed",
notify: ipn.Notify{},
expected: map[ipn.StateKey][]byte{
keyDeviceID: nil,
keyDeviceFQDN: nil,
keyDeviceIPs: nil,
},
},
{
name: "authed",
notify: ipn.Notify{
NetMap: &netmap.NetworkMap{
SelfNode: (&tailcfg.Node{
StableID: "TESTCTRL00000001",
Name: "test-node.test.ts.net",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")},
}).View(),
},
},
expected: map[ipn.StateKey][]byte{
keyDeviceID: []byte("TESTCTRL00000001"),
keyDeviceFQDN: []byte("test-node.test.ts.net"),
keyDeviceIPs: []byte(`["100.64.0.1","fd7a:115c:a1e0:ab12:4843:cd96:0:1"]`),
},
},
{
name: "updated_fields",
notify: ipn.Notify{
NetMap: &netmap.NetworkMap{
SelfNode: (&tailcfg.Node{
StableID: "TESTCTRL00000001",
Name: "updated.test.ts.net",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")},
}).View(),
},
},
expected: map[ipn.StateKey][]byte{
keyDeviceID: []byte("TESTCTRL00000001"),
keyDeviceFQDN: []byte("updated.test.ts.net"),
keyDeviceIPs: []byte(`["100.64.0.250"]`),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Send test input.
select {
case notifyCh <- tc.notify:
case <-errs:
t.Fatal("keepStateKeysUpdated returned before test input")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for next() to be called again")
}
// Wait for next() to be called again so we know the goroutine has
// processed the event.
select {
case <-nextWaiting:
case <-errs:
t.Fatal("keepStateKeysUpdated returned before test input")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for next() to be called again")
}
for key, value := range tc.expected {
got, _ := store.ReadState(key)
if !bytes.Equal(got, value) {
t.Errorf("state key %q mismatch: expected %q, got %q", key, value, got)
}
}
})
}
}