egress HA: WIP

This commit is contained in:
Irbe Krumina 2024-08-29 13:25:13 +03:00
parent b78df4d48a
commit c154f3676c
26 changed files with 1632 additions and 489 deletions

View File

@ -34,6 +34,9 @@ depaware: ## Run depaware checks
tailscale.com/cmd/k8s-operator \ tailscale.com/cmd/k8s-operator \
tailscale.com/cmd/stund tailscale.com/cmd/stund
publishfwegress: ## Build and publish k8s-fwegress image to location specified by ${REPO}
TAGS="${TAGS}" REPOS=${REPO} PLATFORM=${PLATFORM} PUSH=true TARGET=k8s-fwegress ./build_docker.sh
buildwindows: ## Build tailscale CLI for windows/amd64 buildwindows: ## Build tailscale CLI for windows/amd64
GOOS=windows GOARCH=amd64 ./tool/go install tailscale.com/cmd/tailscale tailscale.com/cmd/tailscaled GOOS=windows GOARCH=amd64 ./tool/go install tailscale.com/cmd/tailscale tailscale.com/cmd/tailscaled

View File

@ -77,6 +77,22 @@ case "$TARGET" in
--target="${PLATFORM}" \ --target="${PLATFORM}" \
/usr/local/bin/k8s-nameserver /usr/local/bin/k8s-nameserver
;; ;;
k8s-fwegress)
DEFAULT_REPOS="tailscale/k8s-fwegress"
REPOS="${REPOS:-${DEFAULT_REPOS}}"
go run github.com/tailscale/mkctr \
--gopaths="tailscale.com/cmd/k8s-fwegress:/usr/local/bin/k8s-fwegress" \
--ldflags=" \
-X tailscale.com/version.longStamp=${VERSION_LONG} \
-X tailscale.com/version.shortStamp=${VERSION_SHORT} \
-X tailscale.com/version.gitCommitStamp=${VERSION_GIT_HASH}" \
--base="${BASE}" \
--tags="${TAGS}" \
--repos="${REPOS}" \
--push="${PUSH}" \
--target="${PLATFORM}" \
/usr/local/bin/k8s-fwegress
;;
*) *)
echo "unknown target: $TARGET" echo "unknown target: $TARGET"
exit 1 exit 1

View File

@ -81,6 +81,10 @@
// cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy) // cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy)
// as a non-cluster workload on tailnet. // as a non-cluster workload on tailnet.
// This is only meant to be configured by the Kubernetes operator. // This is only meant to be configured by the Kubernetes operator.
// - TS_EGRESS_SERVICES_CONFIG_PATH: can be set to a path to config for how to route HA egress services.
// If set, containerboot will monitor the filepath, update firewall rules on changes and update
// 'egressServicesStatus' in the ConfigMap.
// - TS_EGRESS_SERVICES_CONFIGMAP_NAME
// //
// When running on Kubernetes, containerboot defaults to storing state in the // When running on Kubernetes, containerboot defaults to storing state in the
// "tailscale" kube secret. To store state on local disk instead, set // "tailscale" kube secret. To store state on local disk instead, set
@ -123,6 +127,7 @@ import (
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/conffile" "tailscale.com/ipn/conffile"
kubeutils "tailscale.com/k8s-operator" kubeutils "tailscale.com/k8s-operator"
"tailscale.com/kube"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
@ -137,6 +142,121 @@ func newNetfilterRunner(logf logger.Logf) (linuxfw.NetfilterRunner, error) {
return linuxfw.New(logf, "") return linuxfw.New(logf, "")
} }
// TODO: for now we assume that tailnet IPs of this node won't change.
// TODO: for now we assume an IPv4 cluster
func ensureEgressServiceConfig(ctx context.Context, path, cmName string, nfr linuxfw.NetfilterRunner, tsIPs []netip.Prefix) {
log.Printf("ensureEgressServiceConfig")
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("ensureEgressServiceConfig: failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
if err := w.Add(filepath.Dir(path)); err != nil {
log.Fatalf("ensureEgressServiceConfig: failed to add fsnotify watch: %v", err)
}
eventChan = w.Events
}
// TODO: for now we assume an IPv4 cluster
var local netip.Addr
for _, pfx := range tsIPs {
if !(pfx.IsSingleIP() && pfx.Addr().Is4()) {
continue
}
local = pfx.Addr()
break
}
var prevESConfig *kubeutils.EgressServices
f := func() {
svcCfg, err := readEgressServiceConfig(path)
if err != nil {
log.Fatalf("error reading egress service config path %s: %v", path, err)
}
if prevESConfig != nil && reflect.DeepEqual(prevESConfig, svcCfg) {
log.Printf("ensureEgressServiceConfig: nothing changed")
return
}
prevESConfig = svcCfg
// TODO: delete any no-longer needed routes, only apply new routes if they don't already exist
// for each egress service
if svcCfg == nil || len(svcCfg.Services) == 0 {
return
}
statusHint := make(map[string][]string, 0)
for svcName, svcConfig := range svcCfg.Services {
log.Printf("setting up routes for egress service %s", svcName)
// parse tailnet target
dst, err := netip.ParseAddr(svcConfig.TailnetTargetIP)
if err != nil {
log.Fatalf("error parsing tailnet target IP: %v", err)
}
if !dst.Is4() {
log.Fatalf("dst %v is not IPv4 address", dst)
}
statusHint[svcName] = make([]string, 0)
if err := nfr.AddSNATRuleForDst(local, dst); err != nil {
log.Fatalf("error adding SNAT rule: %v", err)
}
log.Printf("cluster sources: %v", svcConfig.ClusterSources)
for _, clusterSourceS := range svcConfig.ClusterSources {
if clusterSourceS == "" {
break
}
clusterSource, err := netip.ParseAddr(clusterSourceS)
if err != nil {
log.Fatalf("error parsing cluster source: %q", clusterSourceS)
}
if !clusterSource.Is4() {
log.Fatalf("cluster source %v is not an IPv4 address", clusterSource)
}
if err := nfr.AddDNATRuleWithSrc(clusterSource, dst); err != nil {
log.Fatalf("error adding DNAT rule with src %v dst %v: %v", clusterSource, dst, err)
}
statusHint[svcName] = append(statusHint[svcName], clusterSourceS)
}
}
statusHintBs, err := json.Marshal(statusHint)
if err != nil {
log.Fatalf("error marshalling status hint: %v", err)
}
jsonPatches := []kube.JSONPatch{{Op: "replace", Value: statusHintBs, Path: "/data/statusHint"}}
if err := kc.JSONPatchSecret(ctx, cmName, jsonPatches); err != nil {
log.Fatalf("error updating configmap %s: %v", cmName, err)
}
}
f()
for {
select {
case <-tickChan:
case <-eventChan:
log.Printf("egress service config update event")
f()
}
}
}
func readEgressServiceConfig(path string) (*kubeutils.EgressServices, error) {
if path == "" {
return nil, nil
}
j, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var sc kubeutils.EgressServices
if err := json.Unmarshal(j, &sc); err != nil {
return nil, err
}
return &sc, nil
}
func main() { func main() {
log.SetPrefix("boot: ") log.SetPrefix("boot: ")
tailscale.I_Acknowledge_This_API_Is_Unstable = true tailscale.I_Acknowledge_This_API_Is_Unstable = true
@ -166,6 +286,8 @@ func main() {
PodIP: defaultEnv("POD_IP", ""), PodIP: defaultEnv("POD_IP", ""),
EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false),
HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""),
EgressServicesConfigPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""),
EgressServicesConfigMapName: defaultEnv("TS_EGRESS_SERVICES_CONFIGMAP_NAME", ""),
} }
if err := cfg.validate(); err != nil { if err := cfg.validate(); err != nil {
@ -585,6 +707,7 @@ runLoop:
} }
} }
if !startupTasksDone { if !startupTasksDone {
log.Printf("not startup tasks done")
// For containerboot instances that act as TCP // For containerboot instances that act as TCP
// proxies (proxying traffic to an endpoint // proxies (proxying traffic to an endpoint
// passed via one of the env vars that // passed via one of the env vars that
@ -600,6 +723,10 @@ runLoop:
// post-auth configuration is done. // post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal") log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true startupTasksDone = true
if cfg.EgressServicesConfigMapName != "" && cfg.EgressServicesConfigPath != "" {
log.Println("about to ensure service config")
go ensureEgressServiceConfig(ctx, cfg.EgressServicesConfigPath, cfg.EgressServicesConfigMapName, nfr, addrs)
}
// Wait on tailscaled process. It won't // Wait on tailscaled process. It won't
// be cleaned up by default when the // be cleaned up by default when the
@ -1172,6 +1299,8 @@ type settings struct {
// target. // target.
PodIP string PodIP string
HealthCheckAddrPort string HealthCheckAddrPort string
EgressServicesConfigPath string
EgressServicesConfigMapName string
} }
func (s *settings) validate() error { func (s *settings) validate() error {
@ -1351,7 +1480,7 @@ func isOneStepConfig(cfg *settings) bool {
// as an L3 proxy, proxying to an endpoint provided via one of the config env // as an L3 proxy, proxying to an endpoint provided via one of the config env
// vars. // vars.
func isL3Proxy(cfg *settings) bool { func isL3Proxy(cfg *settings) bool {
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress return cfg.EgressServicesConfigPath != "" || cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress
} }
// hasKubeStateStore returns true if the state must be stored in a Kubernetes // hasKubeStateStore returns true if the state must be stored in a Kubernetes

View File

@ -0,0 +1,58 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: fwegress
namespace: tailscale
spec:
# TODO: experiment with upgrade strategy
replicas: 2
selector:
matchLabels:
app: fwegress
template:
metadata:
labels:
app: fwegress
spec:
serviceAccount: fwegress
readinessGates:
- conditionType: TailscaleRoutesReady
initContainers:
- name: sysctler
image: tailscale/alpine-base:3.18
securityContext:
privileged: true
command: ["/bin/sh", "-c"]
args: [sysctl -w net.ipv4.ip_forward=1 && if sysctl net.ipv6.conf.all.forwarding; then sysctl -w net.ipv6.conf.all.forwarding=1; fi]
containers:
- image: gcr.io/csi-test-290908/k8s-fwegress:v0.0.20
imagePullPolicy: IfNotPresent
name: fwegress
env:
- name: TS_DEBUG_FIREWALL_MODE
value: "auto"
- name: TS_EGRESS_SVC
value: "kuard"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
securityContext:
capabilities:
add:
- NET_ADMIN
---
apiVersion: v1
kind: Service
metadata:
name: fwegress
namespace: tailscale
spec:
selector:
app: fwegress
clusterIP: None
type: ClusterIP

11
cmd/k8s-fwegress/eps.yaml Normal file
View File

@ -0,0 +1,11 @@
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 10.12.0.27
kind: EndpointSlice
metadata:
labels:
tailscale.com/fwegress-ip: "kuard"
name: fwegress-2
namespace: tailscale

View File

@ -0,0 +1,253 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"errors"
"fmt"
"net/netip"
"os"
"strings"
"sync"
"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
xslices "golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
logf "sigs.k8s.io/controller-runtime/pkg/log"
kzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/util/linuxfw"
)
func main() {
var opts []kzap.Opts
opts = append(opts, kzap.UseDevMode(true), kzap.Level(zapcore.DebugLevel))
zlog := kzap.NewRaw(opts...).Sugar()
logf.SetLogger(zapr.NewLogger(zlog.Desugar()))
restConfig := config.GetConfigOrDie()
// One EndpointSlice marked by egress service name for the Service
egressSvc := os.Getenv("TS_EGRESS_SVC")
if egressSvc == "" {
zlog.Fatalf("empty egress service name")
}
podIP := os.Getenv("POD_IP")
if podIP == "" {
zlog.Fatalf("empty POD_IP")
}
podUID := os.Getenv("POD_UID")
if podUID == "" {
zlog.Fatalf("empty Pod UID")
}
labelReq, err := labels.NewRequirement("tailscale.com/fwegress", selection.Equals, []string{egressSvc})
if err != nil {
zlog.Fatalf("error creating a label requirement: %v", err)
}
labelFilter := cache.ByObject{
Label: labels.NewSelector().Add(*labelReq),
}
nsFilter := cache.ByObject{
Namespaces: map[string]cache.Config{"tailscale": {LabelSelector: labelFilter.Label}},
}
nsFilter1 := cache.ByObject{
Field: client.InNamespace("tailscale").AsSelector(),
}
mgr, err := manager.New(restConfig, manager.Options{Scheme: tsapi.GlobalScheme,
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&discoveryv1.EndpointSlice{}: nsFilter,
&corev1.Pod{}: nsFilter1,
},
}})
if err != nil {
zlog.Fatalf("could not create manager: %v", err)
}
// TODO: does this result in setting up unnecessary default firewall
// rules for tailscale?
nfRunner, err := linuxfw.New(zlog.Debugf, "")
if err != nil {
zlog.Fatalf("could not create netfilter runner: %v", err)
}
podName := os.Getenv("POD_NAME")
if podName == "" {
zlog.Fatal("empty Pod name")
}
err = builder.
ControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{}). // label filter
Complete(&FWEgressReconciler{
Client: mgr.GetClient(),
logger: zlog.Named("FWEgress-reconciler"),
nfRunner: nfRunner,
podIP: netip.MustParseAddr(podIP),
podName: podName,
podUID: podUID,
state: &state{routes: make([]netip.Addr, 0)},
egressSvc: egressSvc,
})
if err != nil {
zlog.Fatalf("error creating FWEgress reconciler: %v", err)
}
if mgr.Start(signals.SetupSignalHandler()); err != nil {
zlog.Fatalf("error starting controller manager: %v", err)
}
}
type FWEgressReconciler struct {
client.Client
state *state
logger *zap.SugaredLogger
nfRunner linuxfw.NetfilterRunner
podIP netip.Addr
podName string
podUID string
egressSvc string
}
// The operator creates the EndpointSlice as that makes it easier to co-ordinate the IP family thing.
func (r *FWEgressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
r.logger.Debugf("starting reconcile")
defer r.logger.Debugf("reconcile finished")
newRoutes := make([]netip.Addr, 0)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: r.podName,
Namespace: "tailscale",
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(pod), pod); err != nil {
return res, err
}
ready := corev1.ConditionFalse
defer func() {
r.logger.Debugf("setting new routes %v", newRoutes)
r.state.set(newRoutes)
oldPodStatus := pod.Status.DeepCopy()
podSetTailscaleReady(ready, pod)
if !apiequality.Semantic.DeepEqual(pod.Status, oldPodStatus) {
r.logger.Debugf("updating Pod status", newRoutes)
if updateErr := r.Status().Update(ctx, pod); updateErr != nil {
err = errors.Join(err, fmt.Errorf("error updating proxy headless Service metadata: %w", err))
}
}
// custom pod status condition is only used for readiness check, it is not reliable for internal use because it is possible that container restarted and we lost routes, but pod status is still the same
}()
eps := new(discoveryv1.EndpointSlice)
err = r.Get(ctx, req.NamespacedName, eps)
if apierrors.IsNotFound(err) {
r.logger.Debugf("EndpointSlice not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err)
}
egressSvc := eps.Labels["tailscale.com/fwegress"]
if !strings.EqualFold(egressSvc, r.egressSvc) {
r.logger.Debugf("got EndpointSlice for service %s interested in %s", egressSvc, r.egressSvc)
return res, nil
}
// TODO: this is round robin for iptables, but not nftables- must fix
// nftables
addrs := make([]netip.Addr, 0)
for _, ep := range eps.Endpoints {
if !strings.EqualFold(*ep.Hostname, r.podUID) {
r.logger.Debugf("skip endpoint for fwegress Pod %s", *ep.Hostname)
break
}
for _, addrS := range ep.Addresses {
addr, err := netip.ParseAddr(addrS)
if err != nil {
return res, fmt.Errorf("error parsing EndpointSlice address %s: %v", addrS, err)
}
// duplicates aren't expected
addrs = append(addrs, addr)
}
}
if !r.state.routesNeedUpdate(addrs) {
r.logger.Debugf("routes don't need update")
ready = corev1.ConditionTrue
return
}
r.logger.Debugf("routes need update, new routes are %v", addrs)
// TODO: also add a mark
// we could mark packets for this service so don't have to reconfigure as these Pods go up and down
if err := r.nfRunner.DNATWithLoadBalancer(r.podIP, addrs); err != nil {
r.logger.Errorf("error updating routes: %v", err)
return res, fmt.Errorf("error setting up load balancer rules: %v", err)
}
for _, addr := range addrs {
if err := r.nfRunner.AddSNATRuleForDst(r.podIP, addr); err != nil {
return res, fmt.Errorf("error setting up SNAT rules %w", err)
}
}
newRoutes = addrs
ready = corev1.ConditionTrue
return res, nil
}
type state struct {
sync.RWMutex
routes []netip.Addr
}
func (s *state) routesNeedUpdate(newRoutes []netip.Addr) bool {
s.Lock()
defer s.Unlock()
if len(newRoutes) != len(s.routes) {
return true
}
// TODO: bart.Table would be more efficient maybe
// Routes should be sorted
for i, r := range s.routes {
if newRoutes[i].Compare(r) != 0 {
return true
}
}
return false
}
// we need to store routes internally - they can be lost during container
// restarts and container restarts can happen in a way that cannot be tied to
// resource garbage collection etc
func (s *state) set(routes []netip.Addr) {
s.Lock()
s.routes = routes
s.Unlock()
}
func podSetTailscaleReady(status corev1.ConditionStatus, pod *corev1.Pod) {
newCondition := corev1.PodCondition{
Type: corev1.PodConditionType("TailscaleRoutesReady"),
Status: status,
}
idx := xslices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool {
return cond.Type == corev1.PodConditionType("TailscaleRoutesReady")
})
if idx == -1 {
pod.Status.Conditions = append(pod.Status.Conditions, newCondition)
return
}
pod.Status.Conditions[idx] = newCondition
}

View File

@ -156,77 +156,77 @@ func (a *ConnectorReconciler) Reconcile(ctx context.Context, req reconcile.Reque
// maybeProvisionConnector ensures that any new resources required for this // maybeProvisionConnector ensures that any new resources required for this
// Connector instance are deployed to the cluster. // Connector instance are deployed to the cluster.
func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logger *zap.SugaredLogger, cn *tsapi.Connector) error { func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logger *zap.SugaredLogger, cn *tsapi.Connector) error {
hostname := cn.Name + "-connector" // hostname := cn.Name + "-connector"
if cn.Spec.Hostname != "" { // if cn.Spec.Hostname != "" {
hostname = string(cn.Spec.Hostname) // hostname = string(cn.Spec.Hostname)
} // }
crl := childResourceLabels(cn.Name, a.tsnamespace, "connector") // crl := childResourceLabels(cn.Name, a.tsnamespace, "connector")
proxyClass := cn.Spec.ProxyClass // proxyClass := cn.Spec.ProxyClass
if proxyClass != "" { // if proxyClass != "" {
if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil { // if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil {
return fmt.Errorf("error verifying ProxyClass for Connector: %w", err) // return fmt.Errorf("error verifying ProxyClass for Connector: %w", err)
} else if !ready { // } else if !ready {
logger.Infof("ProxyClass %s specified for the Connector, but is not (yet) Ready, waiting..", proxyClass) // logger.Infof("ProxyClass %s specified for the Connector, but is not (yet) Ready, waiting..", proxyClass)
return nil // return nil
} // }
} // }
sts := &tailscaleSTSConfig{ // sts := &tailscaleSTSConfig{
ParentResourceName: cn.Name, // ParentResourceName: cn.Name,
ParentResourceUID: string(cn.UID), // ParentResourceUID: string(cn.UID),
Hostname: hostname, // Hostname: hostname,
ChildResourceLabels: crl, // ChildResourceLabels: crl,
Tags: cn.Spec.Tags.Stringify(), // Tags: cn.Spec.Tags.Stringify(),
Connector: &connector{ // Connector: &connector{
isExitNode: cn.Spec.ExitNode, // isExitNode: cn.Spec.ExitNode,
}, // },
ProxyClassName: proxyClass, // ProxyClassName: proxyClass,
} // }
if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 { // if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 {
sts.Connector.routes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify() // sts.Connector.routes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify()
} // }
a.mu.Lock() // a.mu.Lock()
if sts.Connector.isExitNode { // if sts.Connector.isExitNode {
a.exitNodes.Add(cn.UID) // a.exitNodes.Add(cn.UID)
} else { // } else {
a.exitNodes.Remove(cn.UID) // a.exitNodes.Remove(cn.UID)
} // }
if sts.Connector.routes != "" { // if sts.Connector.routes != "" {
a.subnetRouters.Add(cn.GetUID()) // a.subnetRouters.Add(cn.GetUID())
} else { // } else {
a.subnetRouters.Remove(cn.GetUID()) // a.subnetRouters.Remove(cn.GetUID())
} // }
a.mu.Unlock() // a.mu.Unlock()
gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len())) // gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len()))
gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len())) // gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len()))
var connectors set.Slice[types.UID] // var connectors set.Slice[types.UID]
connectors.AddSlice(a.exitNodes.Slice()) // connectors.AddSlice(a.exitNodes.Slice())
connectors.AddSlice(a.subnetRouters.Slice()) // connectors.AddSlice(a.subnetRouters.Slice())
gaugeConnectorResources.Set(int64(connectors.Len())) // gaugeConnectorResources.Set(int64(connectors.Len()))
_, err := a.ssr.Provision(ctx, logger, sts) // _, err := a.ssr.Provision(ctx, logger, sts)
if err != nil { // if err != nil {
return err // return err
} // }
_, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl) // _, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl)
if err != nil { // if err != nil {
return err // return err
} // }
if tsHost == "" { // if tsHost == "" {
logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth") // logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth")
// No hostname yet. Wait for the connector pod to auth. // // No hostname yet. Wait for the connector pod to auth.
cn.Status.TailnetIPs = nil // cn.Status.TailnetIPs = nil
cn.Status.Hostname = "" // cn.Status.Hostname = ""
return nil // return nil
} // }
cn.Status.TailnetIPs = ips // cn.Status.TailnetIPs = ips
cn.Status.Hostname = tsHost // cn.Status.Hostname = tsHost
return nil return nil
} }

View File

@ -0,0 +1,45 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: fwegress
namespace: tailscale
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: fwegress
namespace: tailscale
rules:
- apiGroups:
- "discovery.k8s.io"
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
- pods/status
verbs:
- get
- list
- watch
- update
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: fwegress
namespace: tailscale
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: fwegress
subjects:
- kind: ServiceAccount
name: fwegress
namespace: tailscale

View File

@ -48,14 +48,14 @@ metadata:
namespace: {{ .Release.Namespace }} namespace: {{ .Release.Namespace }}
rules: rules:
- apiGroups: [""] - apiGroups: [""]
resources: ["secrets", "serviceaccounts", "configmaps"] resources: ["secrets", "serviceaccounts", "configmaps", "pods"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
- apiGroups: ["apps"] - apiGroups: ["apps"]
resources: ["statefulsets", "deployments"] resources: ["statefulsets", "deployments"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
- apiGroups: ["discovery.k8s.io"] - apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"] resources: ["endpointslices"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch", "patch", "update", "create"]
--- ---
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding kind: RoleBinding

View File

@ -16,6 +16,9 @@ rules:
- apiGroups: [""] - apiGroups: [""]
resources: ["secrets"] resources: ["secrets"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
--- ---
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding kind: RoleBinding

View File

@ -0,0 +1,5 @@
addressType: IPv4
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
namespace: tailscale

View File

@ -0,0 +1,51 @@
# This file is not a complete manifest, it's a skeleton that the operator embeds
# at build time and then uses to construct Tailscale proxy pods.
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: tailscale
spec:
replicas: 2 # hardcoded for this prototype
template:
metadata:
deletionGracePeriodSeconds: 10
spec:
serviceAccountName: fwegress
readinessGates:
- conditionType: TailscaleRoutesReady
initContainers:
- name: sysctler
image: tailscale/alpine-base:3.18
securityContext:
privileged: true
command: ["/bin/sh", "-c"]
args: [sysctl -w net.ipv4.ip_forward=1 && if sysctl net.ipv6.conf.all.forwarding; then sysctl -w net.ipv6.conf.all.forwarding=1; fi]
resources:
requests:
cpu: 1m
memory: 1Mi
containers:
- name: tailscale
image: gcr.io/csi-test-290908/k8s-fwegress:v0.0.22
imagePullPolicy: IfNotPresent
env:
- name: TS_DEBUG_FIREWALL_MODE
value: "auto"
- name: TS_EGRESS_SVC
value: "kuard"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
securityContext:
capabilities:
add:
- NET_ADMIN

View File

@ -0,0 +1,155 @@
//go:build !plan9
package main
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// reconciles egress proxy state secret.
type EgressSvcConfigReconciler struct {
client.Client
logger *zap.SugaredLogger
}
// For this prototype only (make it determine what has change more intelligently later):
// on any state Secret change:
// - read the service status hints
// - for each status hint:
// - get all Pods for that Service
// - get EndpointSlice for that Service
// - if the status hint suggests that this proxy routes from fwegress Pods to backend, ensure endpoint addresses contain it
// - apply any changes to EndpointSlice
// - TODO: add finalizer to the proxy Secret to ensure cleanup when proxy deleted
func (er *EgressSvcConfigReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
logger := er.logger.With("secret-ns", req.Namespace, "secret-name", req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
// request should be for a state Secret
s := new(corev1.Secret)
err = er.Get(ctx, req.NamespacedName, s)
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("service not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get Secret: %w", err)
}
if !s.DeletionTimestamp.IsZero() {
logger.Debugf("Secret is being deleted")
return
}
// For now - we reconcile all Secrets in tailscale namespace and ignore those without statusHint
// get status hint
if _, ok := s.Data["statusHint"]; !ok {
logger.Debugf("secret does not have a statusHint field, ignore %+#v", s.Data)
return
}
statusHint := make(map[string][]string)
if err := json.Unmarshal(s.Data["statusHint"], &statusHint); err != nil {
return res, fmt.Errorf("error unmarshalling status hint: %w\n status hint is %q", err, s.Data["statusHint"])
}
if len(statusHint) == 0 {
logger.Debugf("no status hint")
return
}
// get the associated Pod
proxyPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: s.Name,
Namespace: "tailscale",
}}
err = er.Get(ctx, client.ObjectKeyFromObject(proxyPod), proxyPod)
if apierrors.IsNotFound(err) {
logger.Debugf("Pod %s does not yet exist", s.Name)
return
}
if err != nil {
return res, fmt.Errorf("error retrieving Pod %s: %w", s.Name, err)
}
// for each of the services in status hint
for svcName, clusterSources := range statusHint {
eps := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Namespace: "tailscale",
},
}
err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps)
if apierrors.IsNotFound(err) {
logger.Debugf("EndpointSlice %s not found", svcName)
return res, nil
}
if err != nil {
return res, fmt.Errorf("error retrieving EndpointSlice %s: %w", svcName, err)
}
fweEgressPods := &corev1.PodList{}
if err := er.List(ctx, fweEgressPods, client.InNamespace("tailscale"), client.MatchingLabels(map[string]string{"app": svcName})); err != nil {
return res, fmt.Errorf("error listing fwegress Pods for %s", svcName)
}
if len(fweEgressPods.Items) == 0 {
logger.Debugf("no fwegress pods for %s yet", svcName)
}
oldEps := eps.DeepCopy()
for _, pod := range fweEgressPods.Items {
podIP := pod.Status.PodIP
podUID := pod.UID
var ep *discoveryv1.Endpoint
for _, maybeEP := range eps.Endpoints {
if strings.EqualFold(string(podUID), *maybeEP.Hostname) {
ep = &maybeEP
break
}
}
if ep == nil {
logger.Debugf("no endpoint created for Pod with uid %s yet", podUID)
break
}
hasIP := false
for _, ip := range clusterSources {
if strings.EqualFold(ip, podIP) {
hasIP = true
break
}
}
if !hasIP {
logger.Debugf("proxy has NOT set up route from Pod %s, do nothing", podUID)
break
}
logger.Debugf("proxy has set up route from Pod %s, ensuring this is refected in EndpointSlice", podUID)
hasProxyPodIP := false
for _, addr := range ep.Addresses {
if strings.EqualFold(addr, proxyPod.Status.PodIP) {
hasProxyPodIP = true
break
}
}
if hasProxyPodIP {
logger.Debugf("proxy IP already present in EndpointSlice endpoint %s", proxyPod.Status.PodIP, podUID)
break
}
logger.Debugf("proxy IP not yet present in EndpointSlice endpoint %s", proxyPod.Status.PodIP, podUID)
ep.Addresses = append(ep.Addresses, proxyPod.Status.PodIP)
}
if !reflect.DeepEqual(oldEps, eps) {
if err := er.Update(ctx, eps); err != nil {
return res, fmt.Errorf("error updating EndpointSlice")
}
}
}
return
}

View File

@ -0,0 +1,225 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"
kube "tailscale.com/k8s-operator"
"tailscale.com/types/ptr"
"tailscale.com/util/mak"
)
// This reconciler reconciles Tailscale egress Services configured to be exposed
// on a pre-existing ProxyGroup. For each it:
// - sets up a forwarding StatefulSet
// - updates egress service config for the ProxyGroup with a mapping of
// forwarding StatefulSet Pod IPs to tailnet target IP.
type EgressHAReconciler struct {
client.Client
ssr *tailscaleSTSReconciler
logger *zap.SugaredLogger
mu sync.Mutex // protects following
// Temporary for this prototype - the amount of replicas for a StatefulSet
tempCurrentProxyGroupReplicas int
}
func (er *EgressHAReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
logger := er.logger.With("service-ns", req.Namespace, "service-name", req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
svc := new(corev1.Service)
err = er.Get(ctx, req.NamespacedName, svc)
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("service not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err)
}
// For this prototype only IP target is supported
tsIP := tailnetTargetAnnotation(svc)
if tsIP == "" {
return res, nil
}
// For this prototype only.
// Otherwise services will need tailscale.com/proxy-group label
// ensure a proxy group fronted with a headless Service (?)
if err := er.tempCreateProxyGroup(ctx); err != nil {
return res, fmt.Errorf("error ensuring proxy group: %v", err)
}
egressSvcName := fmt.Sprintf("%s-%s", svc.Name, svc.Namespace)
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "egress-proxies",
Namespace: "tailscale",
},
}
if err := er.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil {
return res, fmt.Errorf("error getting egress-proxies cm: %w", err)
}
svcCfg := &kube.EgressServices{}
if err := json.Unmarshal([]byte(cm.Data["services"]), svcCfg); err != nil {
return res, fmt.Errorf("error unmarshalling config: %w", err)
}
_, ok := svcCfg.Services[egressSvcName]
if !ok {
mak.Set(&svcCfg.Services, egressSvcName, kube.EgressService{TailnetTargetIP: tsIP})
}
bs, err := json.Marshal(svcCfg)
if err != nil {
return res, fmt.Errorf("error marhalling service config: %w", err)
}
cm.Data["services"] = string(bs)
if err := er.Update(ctx, cm); err != nil {
return res, fmt.Errorf("error updating configmap: %w", err)
}
if _, err := er.fwegressHeadlessSvc(ctx, egressSvcName); err != nil {
return res, fmt.Errorf("error reconciling headless svc:%w", err)
}
if _, err := er.fwegressSTS(ctx, egressSvcName); err != nil {
return res, fmt.Errorf("error reconciling StatefulSet: %w", err)
}
return
}
//go:embed deploy/manifests/fwegress.yaml
var fwegressDeploy []byte
func (er *EgressHAReconciler) fwegressSTS(ctx context.Context, name string) (*appsv1.StatefulSet, error) {
ss := new(appsv1.StatefulSet)
if err := yaml.Unmarshal(fwegressDeploy, &ss); err != nil {
return nil, fmt.Errorf("failed to unmarshal fwegress STS: %w", err)
}
ss.ObjectMeta.Name = name
ss.Spec.Selector = &metav1.LabelSelector{
MatchLabels: map[string]string{"app": name},
}
pod := &ss.Spec.Template
pod.Labels = map[string]string{"app": name, "name": name, "tailscale.com/fwegress": name}
return createOrUpdate(ctx, er.Client, "tailscale", ss, func(s *appsv1.StatefulSet) { s.Spec = ss.Spec })
}
func (er *EgressHAReconciler) fwegressHeadlessSvc(ctx context.Context, name string) (*corev1.Service, error) {
hsvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "tailscale",
Labels: map[string]string{"app": name},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"app": name,
},
IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicyPreferDualStack),
},
}
return createOrUpdate(ctx, er.Client, "tailscale", hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec })
}
// create or get
func (er *EgressHAReconciler) fwegressEPS(ctx context.Context, name string) (*discoveryv1.EndpointSlice, error) {
eps := &discoveryv1.EndpointSlice{
AddressType: discoveryv1.AddressTypeIPv4, // for this prototype
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "tailscale",
Labels: map[string]string{"tailscale.com/fwegress": name},
},
}
// only create if not exists as the other reconciler will be updating this
err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps)
if apierrors.IsNotFound(err) {
if err := er.Create(ctx, eps); err != nil {
return nil, fmt.Errorf("error creating Endpointslice: %w", err)
}
}
if err != nil {
return nil, fmt.Errorf("error getting EndpointSlice: %w", err)
}
return eps, nil
}
func (er *EgressHAReconciler) tempCreateProxyGroup(ctx context.Context) error {
er.mu.Lock()
defer er.mu.Unlock()
replicas := defaultIntEnv("REPLICAS", 3)
if replicas == er.tempCurrentProxyGroupReplicas {
er.logger.Debugf("Proxy group with %d replicas already exists", replicas)
}
er.logger.Debugf("Wants a proxy group with %d replicas, currently has %d replicas, updating", replicas, er.tempCurrentProxyGroupReplicas)
conf := &tailscaleSTSConfig{
name: "egress-proxies",
replicas: int32(replicas),
}
if err := er.createConfigMap(ctx, "egress-proxies"); err != nil {
return fmt.Errorf("error creating ConfigMap: %w", err)
}
if _, err := er.ssr.Provision(ctx, er.logger, conf); err != nil {
return fmt.Errorf("error provision proxy group: %w", err)
}
er.tempCurrentProxyGroupReplicas = replicas
return nil
}
// create if not exists only, no update as another reconciler updates spec
// TODO: SSA
func (er *EgressHAReconciler) createConfigMap(ctx context.Context, name string) error {
cfg := kube.EgressServices{
Version: "v1alpha1",
}
cfgBS, err := json.Marshal(cfg)
if err != nil {
return fmt.Errorf("error marshalling config: %w", err)
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "tailscale",
Labels: map[string]string{"tailscale.com/proxy-group": name},
},
Data: map[string]string{"services": string(cfgBS)},
}
err = er.Get(ctx, client.ObjectKeyFromObject(cm), cm)
if apierrors.IsNotFound(err) {
return er.Create(ctx, cm)
}
if err != nil {
return fmt.Errorf("error creating ConfigMap: %w", err)
}
return nil
}
// defaultEnv returns the value of the given envvar name, or defVal if
// unset.
func defaultIntEnv(name string, defVal int) int {
v := os.Getenv(name)
i, err := strconv.Atoi(v)
if err != nil {
return defVal
}
return i
}

View File

@ -0,0 +1,127 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kube "tailscale.com/k8s-operator"
"tailscale.com/types/ptr"
)
// reconciles fwegress Pods
type FWEgressReconciler struct {
client.Client
logger *zap.SugaredLogger
}
func (er *FWEgressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
logger := er.logger.With("service-ns", req.Namespace, "service-name", req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
p := new(corev1.Pod)
err = er.Get(ctx, req.NamespacedName, p)
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("Pod not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get pod: %w", err)
}
if !p.DeletionTimestamp.IsZero() {
logger.Debugf("Pod is being deleted")
return
}
egressSvcName := p.Labels["tailscale.com/fwegress"]
if egressSvcName == "" {
logger.Debugf("[unexpected] Pod is not for egress service")
}
eps, err := er.fwegressEPS(ctx, egressSvcName)
if err != nil {
return res, fmt.Errorf("error ensuring EndpointSlice: %w", err)
}
oldEndpoints := eps.DeepCopy()
found := false
for _, e := range eps.Endpoints {
if strings.EqualFold(*e.Hostname, string(p.UID)) {
found = true
break
}
}
if !found {
eps.Endpoints = append(eps.Endpoints, discoveryv1.Endpoint{
Hostname: ptr.To(string(p.UID)),
})
}
if !reflect.DeepEqual(oldEndpoints, eps) {
if err := er.Update(ctx, eps); err != nil {
return res, fmt.Errorf("error updating EndpointSlice: %w", err)
}
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "egress-proxies",
Namespace: "tailscale",
},
}
if err := er.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil {
return res, fmt.Errorf("error getting egress-proxies cm: %w", err)
}
svcCfg := &kube.EgressServices{}
if err := json.Unmarshal([]byte(cm.Data["services"]), svcCfg); err != nil {
return res, fmt.Errorf("error unmarshalling config: %w", err)
}
found = false
egSvc := svcCfg.Services[egressSvcName]
for _, ip := range egSvc.ClusterSources {
if strings.EqualFold(ip, p.Status.PodIP) {
found = true
break
}
}
if !found {
egSvc.ClusterSources = append(egSvc.ClusterSources, p.Status.PodIP)
svcCfg.Services[egressSvcName] = egSvc
if err := er.Update(ctx, cm); err != nil {
return res, fmt.Errorf("error updating ConfigMap: %w", err)
}
}
return res, nil
}
func (er *FWEgressReconciler) fwegressEPS(ctx context.Context, name string) (*discoveryv1.EndpointSlice, error) {
eps := &discoveryv1.EndpointSlice{
AddressType: discoveryv1.AddressTypeIPv4, // for this prototype
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "tailscale",
Labels: map[string]string{"tailscale.com/fwegress": name},
},
}
// only create if not exists as the other reconciler will be updating this
err := er.Get(ctx, client.ObjectKeyFromObject(eps), eps)
if apierrors.IsNotFound(err) {
if err := er.Create(ctx, eps); err != nil {
return nil, fmt.Errorf("error creating Endpointslice: %w", err)
}
}
if err != nil {
return nil, fmt.Errorf("error getting EndpointSlice: %w", err)
}
return eps, nil
}

View File

@ -9,7 +9,6 @@ import (
"context" "context"
"fmt" "fmt"
"slices" "slices"
"strings"
"sync" "sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -249,63 +248,63 @@ func (a *IngressReconciler) maybeProvision(ctx context.Context, logger *zap.Suga
return nil return nil
} }
crl := childResourceLabels(ing.Name, ing.Namespace, "ingress") // crl := childResourceLabels(ing.Name, ing.Namespace, "ingress")
var tags []string // var tags []string
if tstr, ok := ing.Annotations[AnnotationTags]; ok { // if tstr, ok := ing.Annotations[AnnotationTags]; ok {
tags = strings.Split(tstr, ",") // tags = strings.Split(tstr, ",")
} // }
hostname := ing.Namespace + "-" + ing.Name + "-ingress" // hostname := ing.Namespace + "-" + ing.Name + "-ingress"
if tlsHost != "" { // if tlsHost != "" {
hostname, _, _ = strings.Cut(tlsHost, ".") // hostname, _, _ = strings.Cut(tlsHost, ".")
} // }
sts := &tailscaleSTSConfig{ // sts := &tailscaleSTSConfig{
Hostname: hostname, // Hostname: hostname,
ParentResourceName: ing.Name, // ParentResourceName: ing.Name,
ParentResourceUID: string(ing.UID), // ParentResourceUID: string(ing.UID),
ServeConfig: sc, // ServeConfig: sc,
Tags: tags, // Tags: tags,
ChildResourceLabels: crl, // ChildResourceLabels: crl,
ProxyClassName: proxyClass, // ProxyClassName: proxyClass,
} // }
if val := ing.GetAnnotations()[AnnotationExperimentalForwardClusterTrafficViaL7IngresProxy]; val == "true" { // if val := ing.GetAnnotations()[AnnotationExperimentalForwardClusterTrafficViaL7IngresProxy]; val == "true" {
sts.ForwardClusterTrafficViaL7IngressProxy = true // sts.ForwardClusterTrafficViaL7IngressProxy = true
} // }
if _, err := a.ssr.Provision(ctx, logger, sts); err != nil { // if _, err := a.ssr.Provision(ctx, logger, sts); err != nil {
return fmt.Errorf("failed to provision: %w", err) // return fmt.Errorf("failed to provision: %w", err)
} // }
_, tsHost, _, err := a.ssr.DeviceInfo(ctx, crl) // _, tsHost, _, err := a.ssr.DeviceInfo(ctx, crl)
if err != nil { // if err != nil {
return fmt.Errorf("failed to get device ID: %w", err) // return fmt.Errorf("failed to get device ID: %w", err)
} // }
if tsHost == "" { // if tsHost == "" {
logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth") // logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth")
// No hostname yet. Wait for the proxy pod to auth. // // No hostname yet. Wait for the proxy pod to auth.
ing.Status.LoadBalancer.Ingress = nil // ing.Status.LoadBalancer.Ingress = nil
if err := a.Status().Update(ctx, ing); err != nil { // if err := a.Status().Update(ctx, ing); err != nil {
return fmt.Errorf("failed to update ingress status: %w", err) // return fmt.Errorf("failed to update ingress status: %w", err)
} // }
return nil // return nil
} // }
logger.Debugf("setting ingress hostname to %q", tsHost) // logger.Debugf("setting ingress hostname to %q", tsHost)
ing.Status.LoadBalancer.Ingress = []networkingv1.IngressLoadBalancerIngress{ // ing.Status.LoadBalancer.Ingress = []networkingv1.IngressLoadBalancerIngress{
{ // {
Hostname: tsHost, // Hostname: tsHost,
Ports: []networkingv1.IngressPortStatus{ // Ports: []networkingv1.IngressPortStatus{
{ // {
Protocol: "TCP", // Protocol: "TCP",
Port: 443, // Port: 443,
}, // },
}, // },
}, // },
} // }
if err := a.Status().Update(ctx, ing); err != nil { // if err := a.Status().Update(ctx, ing); err != nil {
return fmt.Errorf("failed to update ingress status: %w", err) // return fmt.Errorf("failed to update ingress status: %w", err)
} // }
return nil return nil
} }

View File

@ -40,7 +40,6 @@ import (
"tailscale.com/ipn/store/kubestore" "tailscale.com/ipn/store/kubestore"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1" tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/tsnet" "tailscale.com/tsnet"
"tailscale.com/tstime"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/version" "tailscale.com/version"
) )
@ -240,6 +239,7 @@ func runReconcilers(opts reconcilerOpts) {
&appsv1.StatefulSet{}: nsFilter, &appsv1.StatefulSet{}: nsFilter,
&appsv1.Deployment{}: nsFilter, &appsv1.Deployment{}: nsFilter,
&discoveryv1.EndpointSlice{}: nsFilter, &discoveryv1.EndpointSlice{}: nsFilter,
&corev1.Pod{}: nsFilter,
}, },
}, },
Scheme: tsapi.GlobalScheme, Scheme: tsapi.GlobalScheme,
@ -249,13 +249,13 @@ func runReconcilers(opts reconcilerOpts) {
startlog.Fatalf("could not create manager: %v", err) startlog.Fatalf("could not create manager: %v", err)
} }
svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler) // svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler)
svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc")) // svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc"))
// If a ProxyClass changes, enqueue all Services labeled with that // // If a ProxyClass changes, enqueue all Services labeled with that
// ProxyClass's name. // // ProxyClass's name.
proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog)) // proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog))
eventRecorder := mgr.GetEventRecorderFor("tailscale-operator") // eventRecorder := mgr.GetEventRecorderFor("tailscale-operator")
ssr := &tailscaleSTSReconciler{ ssr := &tailscaleSTSReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
tsnetServer: opts.tsServer, tsnetServer: opts.tsServer,
@ -268,125 +268,37 @@ func runReconcilers(opts reconcilerOpts) {
} }
err = builder. err = builder.
ControllerManagedBy(mgr). ControllerManagedBy(mgr).
Named("service-reconciler"). Named("ha-egress-svc-reconciler").
Watches(&corev1.Service{}, svcFilter). Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(egressHAServiceFilter)).
Watches(&appsv1.StatefulSet{}, svcChildFilter). Complete(&EgressHAReconciler{
Watches(&corev1.Secret{}, svcChildFilter).
Watches(&tsapi.ProxyClass{}, proxyClassFilterForSvc).
Complete(&ServiceReconciler{
ssr: ssr,
Client: mgr.GetClient(), Client: mgr.GetClient(),
logger: opts.log.Named("service-reconciler"), ssr: ssr,
isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer, logger: opts.log.Named("egress-ha-svc-reconciler"),
recorder: eventRecorder,
tsNamespace: opts.tailscaleNamespace,
clock: tstime.DefaultClock{},
proxyDefaultClass: opts.proxyDefaultClass,
}) })
if err != nil { if err != nil {
startlog.Fatalf("could not create service reconciler: %v", err) startlog.Fatalf("could not create egress-ha service reconciler: %v", err)
} }
ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress"))
// If a ProxyClassChanges, enqueue all Ingresses labeled with that
// ProxyClass's name.
proxyClassFilterForIngress := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForIngress(mgr.GetClient(), startlog))
// Enque Ingress if a managed Service or backend Service associated with a tailscale Ingress changes.
svcHandlerForIngress := handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngress(mgr.GetClient(), startlog))
err = builder. err = builder.
ControllerManagedBy(mgr). ControllerManagedBy(mgr).
For(&networkingv1.Ingress{}). Named("egress-ha-state-reconciler").
Watches(&appsv1.StatefulSet{}, ingressChildFilter). For(&corev1.Secret{}).
Watches(&corev1.Secret{}, ingressChildFilter). Complete(&EgressSvcConfigReconciler{
Watches(&corev1.Service{}, svcHandlerForIngress).
Watches(&tsapi.ProxyClass{}, proxyClassFilterForIngress).
Complete(&IngressReconciler{
ssr: ssr,
recorder: eventRecorder,
Client: mgr.GetClient(), Client: mgr.GetClient(),
logger: opts.log.Named("ingress-reconciler"), logger: opts.log.Named("egress-ha-state-reconciler"),
proxyDefaultClass: opts.proxyDefaultClass,
}) })
if err != nil { if err != nil {
startlog.Fatalf("could not create ingress reconciler: %v", err) startlog.Fatalf("could not create egress-ha-state service reconciler: %v", err)
} }
err = builder.
connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector")) ControllerManagedBy(mgr).
// If a ProxyClassChanges, enqueue all Connectors that have Named("egress-fw-pods-reconciler").
// .spec.proxyClass set to the name of this ProxyClass. Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(fwegressPodFilter)).
proxyClassFilterForConnector := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForConnector(mgr.GetClient(), startlog)) Complete(&FWEgressReconciler{
err = builder.ControllerManagedBy(mgr).
For(&tsapi.Connector{}).
Watches(&appsv1.StatefulSet{}, connectorFilter).
Watches(&corev1.Secret{}, connectorFilter).
Watches(&tsapi.ProxyClass{}, proxyClassFilterForConnector).
Complete(&ConnectorReconciler{
ssr: ssr,
recorder: eventRecorder,
Client: mgr.GetClient(), Client: mgr.GetClient(),
logger: opts.log.Named("connector-reconciler"), logger: opts.log.Named("fwegress-pod-reconciler"),
clock: tstime.DefaultClock{},
}) })
if err != nil { if err != nil {
startlog.Fatalf("could not create connector reconciler: %v", err) startlog.Fatalf("could not create fwegress-pod reconciler: %v", err)
}
// TODO (irbekrm): switch to metadata-only watches for resources whose
// spec we don't need to inspect to reduce memory consumption.
// https://github.com/kubernetes-sigs/controller-runtime/issues/1159
nameserverFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("nameserver"))
err = builder.ControllerManagedBy(mgr).
For(&tsapi.DNSConfig{}).
Watches(&appsv1.Deployment{}, nameserverFilter).
Watches(&corev1.ConfigMap{}, nameserverFilter).
Watches(&corev1.Service{}, nameserverFilter).
Watches(&corev1.ServiceAccount{}, nameserverFilter).
Complete(&NameserverReconciler{
recorder: eventRecorder,
tsNamespace: opts.tailscaleNamespace,
Client: mgr.GetClient(),
logger: opts.log.Named("nameserver-reconciler"),
clock: tstime.DefaultClock{},
})
if err != nil {
startlog.Fatalf("could not create nameserver reconciler: %v", err)
}
err = builder.ControllerManagedBy(mgr).
For(&tsapi.ProxyClass{}).
Complete(&ProxyClassReconciler{
Client: mgr.GetClient(),
recorder: eventRecorder,
logger: opts.log.Named("proxyclass-reconciler"),
clock: tstime.DefaultClock{},
})
if err != nil {
startlog.Fatal("could not create proxyclass reconciler: %v", err)
}
logger := startlog.Named("dns-records-reconciler-event-handlers")
// On EndpointSlice events, if it is an EndpointSlice for an
// ingress/egress proxy headless Service, reconcile the headless
// Service.
dnsRREpsOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerEndpointSliceHandler)
// On DNSConfig changes, reconcile all headless Services for
// ingress/egress proxies in operator namespace.
dnsRRDNSConfigOpts := handler.EnqueueRequestsFromMapFunc(enqueueAllIngressEgressProxySvcsInNS(opts.tailscaleNamespace, mgr.GetClient(), logger))
// On Service events, if it is an ingress/egress proxy headless Service, reconcile it.
dnsRRServiceOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerServiceHandler)
// On Ingress events, if it is a tailscale Ingress or if tailscale is the default ingress controller, reconcile the proxy
// headless Service.
dnsRRIngressOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerIngressHandler(opts.tailscaleNamespace, opts.proxyActAsDefaultLoadBalancer, mgr.GetClient(), logger))
err = builder.ControllerManagedBy(mgr).
Named("dns-records-reconciler").
Watches(&corev1.Service{}, dnsRRServiceOpts).
Watches(&networkingv1.Ingress{}, dnsRRIngressOpts).
Watches(&discoveryv1.EndpointSlice{}, dnsRREpsOpts).
Watches(&tsapi.DNSConfig{}, dnsRRDNSConfigOpts).
Complete(&dnsRecordsReconciler{
Client: mgr.GetClient(),
tsNamespace: opts.tailscaleNamespace,
logger: opts.log.Named("dns-records-reconciler"),
isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer,
})
if err != nil {
startlog.Fatalf("could not create DNS records reconciler: %v", err)
} }
startlog.Infof("Startup complete, operator running, version: %s", version.Long()) startlog.Infof("Startup complete, operator running, version: %s", version.Long())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil { if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
@ -659,6 +571,25 @@ func serviceHandlerForIngress(cl client.Client, logger *zap.SugaredLogger) handl
} }
} }
func egressHAServiceFilter(_ context.Context, o client.Object) []reconcile.Request {
if o.GetAnnotations()["tailscale.com/tailnet-ip"] != "" {
return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}}
}
return nil
}
func fwegressPodFilter(_ context.Context, o client.Object) []reconcile.Request {
if o.GetLabels()["tailscale.com/fwegress"] != "" {
return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}}
}
return nil
}
func egressProxyStateSecretFilter(_ context.Context, o client.Object) []reconcile.Request {
if o.GetLabels()["tailscale.com/proxy-group-egress"] != "" {
return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(o)}}
}
return nil
}
func serviceHandler(_ context.Context, o client.Object) []reconcile.Request { func serviceHandler(_ context.Context, o client.Object) []reconcile.Request {
if isManagedByType(o, "svc") { if isManagedByType(o, "svc") {
// If this is a Service managed by a Service we want to enqueue its parent // If this is a Service managed by a Service we want to enqueue its parent

View File

@ -31,7 +31,6 @@ import (
"tailscale.com/ipn" "tailscale.com/ipn"
tsoperator "tailscale.com/k8s-operator" tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1" tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/net/netutil"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/opt" "tailscale.com/types/opt"
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
@ -101,27 +100,29 @@ var (
) )
type tailscaleSTSConfig struct { type tailscaleSTSConfig struct {
ParentResourceName string // ParentResourceName string
ParentResourceUID string // ParentResourceUID string
ChildResourceLabels map[string]string // ChildResourceLabels map[string]string
ServeConfig *ipn.ServeConfig // if serve config is set, this is a proxy for Ingress // ServeConfig *ipn.ServeConfig // if serve config is set, this is a proxy for Ingress
ClusterTargetIP string // ingress target IP // ClusterTargetIP string // ingress target IP
ClusterTargetDNSName string // ingress target DNS name // ClusterTargetDNSName string // ingress target DNS name
// If set to true, operator should configure containerboot to forward // // If set to true, operator should configure containerboot to forward
// cluster traffic via the proxy set up for Kubernetes Ingress. // // cluster traffic via the proxy set up for Kubernetes Ingress.
ForwardClusterTrafficViaL7IngressProxy bool // ForwardClusterTrafficViaL7IngressProxy bool
TailnetTargetIP string // egress target IP // TailnetTargetIP string // egress target IP
TailnetTargetFQDN string // egress target FQDN // TailnetTargetFQDN string // egress target FQDN
replicas int32
name string
Hostname string // Hostname string
Tags []string // if empty, use defaultTags Tags []string // if empty, use defaultTags
// Connector specifies a configuration of a Connector instance if that's // Connector specifies a configuration of a Connector instance if that's
// what this StatefulSet should be created for. // what this StatefulSet should be created for.
Connector *connector // Connector *connector
ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy
@ -282,17 +283,16 @@ func statefulSetNameBase(parent string) string {
} }
func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig) (*corev1.Service, error) { func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig) (*corev1.Service, error) {
nameBase := statefulSetNameBase(sts.ParentResourceName)
hsvc := &corev1.Service{ hsvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
GenerateName: nameBase, Name: sts.name,
Namespace: a.operatorNamespace, Namespace: a.operatorNamespace,
Labels: sts.ChildResourceLabels, Labels: map[string]string{"app": sts.name},
}, },
Spec: corev1.ServiceSpec{ Spec: corev1.ServiceSpec{
ClusterIP: "None", ClusterIP: "None",
Selector: map[string]string{ Selector: map[string]string{
"app": sts.ParentResourceUID, "app": sts.name,
}, },
IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicyPreferDualStack), IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicyPreferDualStack),
}, },
@ -307,9 +307,9 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *
// Hardcode a -0 suffix so that in future, if we support // Hardcode a -0 suffix so that in future, if we support
// multiple StatefulSet replicas, we can provision -N for // multiple StatefulSet replicas, we can provision -N for
// those. // those.
Name: hsvc.Name + "-0", Name: hsvc.Name,
Namespace: a.operatorNamespace, Namespace: a.operatorNamespace,
Labels: stsC.ChildResourceLabels, Labels: map[string]string{"name": stsC.name},
}, },
} }
var orig *corev1.Secret // unmodified copy of secret var orig *corev1.Secret // unmodified copy of secret
@ -325,7 +325,7 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *
// Initially it contains only tailscaled config, but when the // Initially it contains only tailscaled config, but when the
// proxy starts, it will also store there the state, certs and // proxy starts, it will also store there the state, certs and
// ACME account key. // ACME account key.
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels) sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, map[string]string{"name": stsC.name})
if err != nil { if err != nil {
return "", "", nil, err return "", "", nil, err
} }
@ -371,13 +371,13 @@ func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *
} }
} }
if stsC.ServeConfig != nil { // if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig) // j, err := json.Marshal(stsC.ServeConfig)
if err != nil { // if err != nil {
return "", "", nil, err // return "", "", nil, err
} // }
mak.Set(&secret.StringData, "serve-config", string(j)) // mak.Set(&secret.StringData, "serve-config", string(j))
} // }
if orig != nil { if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig)) logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
@ -445,7 +445,7 @@ func (a *tailscaleSTSReconciler) newAuthKey(ctx context.Context, tags []string)
caps := tailscale.KeyCapabilities{ caps := tailscale.KeyCapabilities{
Devices: tailscale.KeyDeviceCapabilities{ Devices: tailscale.KeyDeviceCapabilities{
Create: tailscale.KeyDeviceCreateCapabilities{ Create: tailscale.KeyDeviceCreateCapabilities{
Reusable: false, Reusable: true,
Preauthorized: true, Preauthorized: true,
Tags: tags, Tags: tags,
}, },
@ -467,11 +467,11 @@ var userspaceProxyYaml []byte
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) { func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) {
ss := new(appsv1.StatefulSet) ss := new(appsv1.StatefulSet)
if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding // if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding
if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil { // if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil {
return nil, fmt.Errorf("failed to unmarshal userspace proxy spec: %v", err) // return nil, fmt.Errorf("failed to unmarshal userspace proxy spec: %v", err)
} // }
} else { // } else {
if err := yaml.Unmarshal(proxyYaml, &ss); err != nil { if err := yaml.Unmarshal(proxyYaml, &ss); err != nil {
return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err) return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err)
} }
@ -482,7 +482,9 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
break break
} }
} }
} // }
ss.Spec.Replicas = ptr.To(sts.replicas)
pod := &ss.Spec.Template pod := &ss.Spec.Template
container := &pod.Spec.Containers[0] container := &pod.Spec.Containers[0]
container.Image = a.proxyImage container.Image = a.proxyImage
@ -490,25 +492,29 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Name: headlessSvc.Name, Name: headlessSvc.Name,
Namespace: a.operatorNamespace, Namespace: a.operatorNamespace,
} }
for key, val := range sts.ChildResourceLabels { mak.Set(&ss.ObjectMeta.Labels, "tailscale.com/proxy-group", "egress-ha")
mak.Set(&ss.ObjectMeta.Labels, key, val)
}
ss.Spec.ServiceName = headlessSvc.Name ss.Spec.ServiceName = headlessSvc.Name
ss.Spec.Selector = &metav1.LabelSelector{ ss.Spec.Selector = &metav1.LabelSelector{
MatchLabels: map[string]string{ MatchLabels: map[string]string{
"app": sts.ParentResourceUID, "app": sts.name,
}, },
} }
mak.Set(&pod.Labels, "app", sts.ParentResourceUID) mak.Set(&pod.Labels, "app", sts.name)
for key, val := range sts.ChildResourceLabels { mak.Set(&pod.Labels, "name", sts.name)
pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod mak.Set(&pod.Labels, "tailscale.com/proxy-group", "egress-ha")
} // for key, val := range sts.ChildResourceLabels {
// pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod
// }
// Generic containerboot configuration options. // Generic containerboot configuration options.
container.Env = append(container.Env, container.Env = append(container.Env,
corev1.EnvVar{ corev1.EnvVar{
Name: "TS_KUBE_SECRET", Name: "TS_KUBE_SECRET",
Value: proxySecret, ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
}, },
corev1.EnvVar{ corev1.EnvVar{
// Old tailscaled config key is still used for backwards compatibility. // Old tailscaled config key is still used for backwards compatibility.
@ -520,16 +526,29 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR",
Value: "/etc/tsconfig", Value: "/etc/tsconfig",
}, },
corev1.EnvVar{
Name: "TS_EGRESS_SERVICES_CONFIGMAP_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
corev1.EnvVar{
Name: "TS_EGRESS_SERVICES_CONFIG_PATH",
Value: "/etc/egress-proxies/services",
},
) )
if sts.ForwardClusterTrafficViaL7IngressProxy { // if sts.ForwardClusterTrafficViaL7IngressProxy {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS", // Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS",
Value: "true", // Value: "true",
}) // })
} // }
// Configure containeboot to run tailscaled with a configfile read from the state Secret. // Configure containeboot to run tailscaled with a configfile read from the state Secret.
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash) mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash)
// Tailscaled config
configVolume := corev1.Volume{ configVolume := corev1.Volume{
Name: "tailscaledconfig", Name: "tailscaledconfig",
VolumeSource: corev1.VolumeSource{ VolumeSource: corev1.VolumeSource{
@ -545,6 +564,22 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
MountPath: "/etc/tsconfig", MountPath: "/etc/tsconfig",
}) })
// Egress services config
egressSvcCfg := corev1.Volume{
Name: "egress-proxies",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{Name: "egress-proxies"},
},
},
}
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, egressSvcCfg)
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "egress-proxies",
ReadOnly: true,
MountPath: "/etc/egress-proxies",
})
if a.tsFirewallMode != "" { if a.tsFirewallMode != "" {
container.Env = append(container.Env, corev1.EnvVar{ container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE", Name: "TS_DEBUG_FIREWALL_MODE",
@ -554,50 +589,50 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
pod.Spec.PriorityClassName = a.proxyPriorityClassName pod.Spec.PriorityClassName = a.proxyPriorityClassName
// Ingress/egress proxy configuration options. // Ingress/egress proxy configuration options.
if sts.ClusterTargetIP != "" { // if sts.ClusterTargetIP != "" {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEST_IP", // Name: "TS_DEST_IP",
Value: sts.ClusterTargetIP, // Value: sts.ClusterTargetIP,
}) // })
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterIP, sts.ClusterTargetIP) // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterIP, sts.ClusterTargetIP)
} else if sts.ClusterTargetDNSName != "" { // } else if sts.ClusterTargetDNSName != "" {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_EXPERIMENTAL_DEST_DNS_NAME", // Name: "TS_EXPERIMENTAL_DEST_DNS_NAME",
Value: sts.ClusterTargetDNSName, // Value: sts.ClusterTargetDNSName,
}) // })
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterDNSName, sts.ClusterTargetDNSName) // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetClusterDNSName, sts.ClusterTargetDNSName)
} else if sts.TailnetTargetIP != "" { // } else if sts.TailnetTargetIP != "" {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_TAILNET_TARGET_IP", // Name: "TS_TAILNET_TARGET_IP",
Value: sts.TailnetTargetIP, // Value: sts.TailnetTargetIP,
}) // })
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetIP, sts.TailnetTargetIP) // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetIP, sts.TailnetTargetIP)
} else if sts.TailnetTargetFQDN != "" { // } else if sts.TailnetTargetFQDN != "" {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_TAILNET_TARGET_FQDN", // Name: "TS_TAILNET_TARGET_FQDN",
Value: sts.TailnetTargetFQDN, // Value: sts.TailnetTargetFQDN,
}) // })
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN) // mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN)
} else if sts.ServeConfig != nil { // } else if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{ // container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG", // Name: "TS_SERVE_CONFIG",
Value: "/etc/tailscaled/serve-config", // Value: "/etc/tailscaled/serve-config",
}) // })
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ // container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "serve-config", // Name: "serve-config",
ReadOnly: true, // ReadOnly: true,
MountPath: "/etc/tailscaled", // MountPath: "/etc/tailscaled",
}) // })
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{ // pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "serve-config", // Name: "serve-config",
VolumeSource: corev1.VolumeSource{ // VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{ // Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret, // SecretName: proxySecret,
Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}}, // Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}},
}, // },
}, // },
}) // })
} // }
logger.Debugf("reconciling statefulset %s/%s", ss.GetNamespace(), ss.GetName()) logger.Debugf("reconciling statefulset %s/%s", ss.GetNamespace(), ss.GetName())
if sts.ProxyClassName != "" { if sts.ProxyClassName != "" {
logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName) logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName)
@ -636,22 +671,22 @@ func applyProxyClassToStatefulSet(pc *tsapi.ProxyClass, ss *appsv1.StatefulSet,
if pc == nil || ss == nil { if pc == nil || ss == nil {
return ss return ss
} }
if pc.Spec.Metrics != nil && pc.Spec.Metrics.Enable { // if pc.Spec.Metrics != nil && pc.Spec.Metrics.Enable {
if stsCfg.TailnetTargetFQDN == "" && stsCfg.TailnetTargetIP == "" && !stsCfg.ForwardClusterTrafficViaL7IngressProxy { // if stsCfg.TailnetTargetFQDN == "" && stsCfg.TailnetTargetIP == "" && !stsCfg.ForwardClusterTrafficViaL7IngressProxy {
enableMetrics(ss, pc) // enableMetrics(ss, pc)
} else if stsCfg.ForwardClusterTrafficViaL7IngressProxy { // } else if stsCfg.ForwardClusterTrafficViaL7IngressProxy {
// TODO (irbekrm): fix this // // TODO (irbekrm): fix this
// For Ingress proxies that have been configured with // // For Ingress proxies that have been configured with
// tailscale.com/experimental-forward-cluster-traffic-via-ingress // // tailscale.com/experimental-forward-cluster-traffic-via-ingress
// annotation, all cluster traffic is forwarded to the // // annotation, all cluster traffic is forwarded to the
// Ingress backend(s). // // Ingress backend(s).
logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") // logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.")
} else { // } else {
// TODO (irbekrm): fix this // // TODO (irbekrm): fix this
// For egress proxies, currently all cluster traffic is forwarded to the tailnet target. // // For egress proxies, currently all cluster traffic is forwarded to the tailnet target.
logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.") // logger.Info("ProxyClass specifies that metrics should be enabled, but this is currently not supported for Ingress proxies that accept cluster traffic.")
} // }
} // }
if pc.Spec.StatefulSet == nil { if pc.Spec.StatefulSet == nil {
return ss return ss
@ -764,23 +799,23 @@ func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *co
AcceptDNS: "false", AcceptDNS: "false",
AcceptRoutes: "false", // AcceptRoutes defaults to true AcceptRoutes: "false", // AcceptRoutes defaults to true
Locked: "false", Locked: "false",
Hostname: &stsC.Hostname, Hostname: &stsC.name,
NoStatefulFiltering: "false", NoStatefulFiltering: "false",
} }
// For egress proxies only, we need to ensure that stateful filtering is // For egress proxies only, we need to ensure that stateful filtering is
// not in place so that traffic from cluster can be forwarded via // not in place so that traffic from cluster can be forwarded via
// Tailscale IPs. // Tailscale IPs.
if stsC.TailnetTargetFQDN != "" || stsC.TailnetTargetIP != "" { // if stsC.TailnetTargetFQDN != "" || stsC.TailnetTargetIP != "" {
conf.NoStatefulFiltering = "true" // conf.NoStatefulFiltering = "true"
} // }
if stsC.Connector != nil { // if stsC.Connector != nil {
routes, err := netutil.CalcAdvertiseRoutes(stsC.Connector.routes, stsC.Connector.isExitNode) // routes, err := netutil.CalcAdvertiseRoutes(stsC.Connector.routes, stsC.Connector.isExitNode)
if err != nil { // if err != nil {
return nil, fmt.Errorf("error calculating routes: %w", err) // return nil, fmt.Errorf("error calculating routes: %w", err)
} // }
conf.AdvertiseRoutes = routes // conf.AdvertiseRoutes = routes
} // }
if shouldAcceptRoutes(stsC.ProxyClass) { if shouldAcceptRoutes(stsC.ProxyClass) {
conf.AcceptRoutes = "true" conf.AcceptRoutes = "true"
} }

View File

@ -9,7 +9,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/netip"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@ -237,109 +236,109 @@ func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.Suga
return errMsg return errMsg
} }
} }
crl := childResourceLabels(svc.Name, svc.Namespace, "svc") // crl := childResourceLabels(svc.Name, svc.Namespace, "svc")
var tags []string // var tags []string
if tstr, ok := svc.Annotations[AnnotationTags]; ok { // if tstr, ok := svc.Annotations[AnnotationTags]; ok {
tags = strings.Split(tstr, ",") // tags = strings.Split(tstr, ",")
} // }
sts := &tailscaleSTSConfig{ // sts := &tailscaleSTSConfig{
ParentResourceName: svc.Name, // ParentResourceName: svc.Name,
ParentResourceUID: string(svc.UID), // ParentResourceUID: string(svc.UID),
Hostname: nameForService(svc), // Hostname: nameForService(svc),
Tags: tags, // Tags: tags,
ChildResourceLabels: crl, // ChildResourceLabels: crl,
ProxyClassName: proxyClass, // ProxyClassName: proxyClass,
} // }
a.mu.Lock() // a.mu.Lock()
if a.shouldExposeClusterIP(svc) { // if a.shouldExposeClusterIP(svc) {
sts.ClusterTargetIP = svc.Spec.ClusterIP // sts.ClusterTargetIP = svc.Spec.ClusterIP
a.managedIngressProxies.Add(svc.UID) // a.managedIngressProxies.Add(svc.UID)
gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) // gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
} else if a.shouldExposeDNSName(svc) { // } else if a.shouldExposeDNSName(svc) {
sts.ClusterTargetDNSName = svc.Spec.ExternalName // sts.ClusterTargetDNSName = svc.Spec.ExternalName
a.managedIngressProxies.Add(svc.UID) // a.managedIngressProxies.Add(svc.UID)
gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) // gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
} else if ip := tailnetTargetAnnotation(svc); ip != "" { // } else if ip := tailnetTargetAnnotation(svc); ip != "" {
sts.TailnetTargetIP = ip // sts.TailnetTargetIP = ip
a.managedEgressProxies.Add(svc.UID) // a.managedEgressProxies.Add(svc.UID)
gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) // gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len()))
} else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" { // } else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
fqdn := svc.Annotations[AnnotationTailnetTargetFQDN] // fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]
if !strings.HasSuffix(fqdn, ".") { // if !strings.HasSuffix(fqdn, ".") {
fqdn = fqdn + "." // fqdn = fqdn + "."
} // }
sts.TailnetTargetFQDN = fqdn // sts.TailnetTargetFQDN = fqdn
a.managedEgressProxies.Add(svc.UID) // a.managedEgressProxies.Add(svc.UID)
gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) // gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len()))
} // }
a.mu.Unlock() // a.mu.Unlock()
var hsvc *corev1.Service // var hsvc *corev1.Service
if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil { // if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil {
errMsg := fmt.Errorf("failed to provision: %w", err) // errMsg := fmt.Errorf("failed to provision: %w", err)
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger)
return errMsg // return errMsg
} // }
if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" { // if an egress proxy // if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" { // if an egress proxy
clusterDomain := retrieveClusterDomain(a.tsNamespace, logger) // clusterDomain := retrieveClusterDomain(a.tsNamespace, logger)
headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain // headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain
if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName { // if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName {
svc.Spec.ExternalName = headlessSvcName // svc.Spec.ExternalName = headlessSvcName
svc.Spec.Selector = nil // svc.Spec.Selector = nil
svc.Spec.Type = corev1.ServiceTypeExternalName // svc.Spec.Type = corev1.ServiceTypeExternalName
if err := a.Update(ctx, svc); err != nil { // if err := a.Update(ctx, svc); err != nil {
errMsg := fmt.Errorf("failed to update service: %w", err) // errMsg := fmt.Errorf("failed to update service: %w", err)
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, errMsg.Error(), a.clock, logger)
return errMsg // return errMsg
} // }
} // }
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger)
return nil // return nil
} // }
if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) { // if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) {
logger.Debugf("service is not a LoadBalancer, so not updating ingress") // logger.Debugf("service is not a LoadBalancer, so not updating ingress")
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger)
return nil // return nil
} // }
_, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl) // _, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl)
if err != nil { // if err != nil {
return fmt.Errorf("failed to get device ID: %w", err) // return fmt.Errorf("failed to get device ID: %w", err)
} // }
if tsHost == "" { // if tsHost == "" {
msg := "no Tailscale hostname known yet, waiting for proxy pod to finish auth" // msg := "no Tailscale hostname known yet, waiting for proxy pod to finish auth"
logger.Debug(msg) // logger.Debug(msg)
// No hostname yet. Wait for the proxy pod to auth. // // No hostname yet. Wait for the proxy pod to auth.
svc.Status.LoadBalancer.Ingress = nil // svc.Status.LoadBalancer.Ingress = nil
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyPending, msg, a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyPending, msg, a.clock, logger)
return nil // return nil
} // }
logger.Debugf("setting Service LoadBalancer status to %q, %s", tsHost, strings.Join(tsIPs, ", ")) // logger.Debugf("setting Service LoadBalancer status to %q, %s", tsHost, strings.Join(tsIPs, ", "))
ingress := []corev1.LoadBalancerIngress{ // ingress := []corev1.LoadBalancerIngress{
{Hostname: tsHost}, // {Hostname: tsHost},
} // }
clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP) // clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP)
if err != nil { // if err != nil {
msg := fmt.Sprintf("failed to parse cluster IP: %v", err) // msg := fmt.Sprintf("failed to parse cluster IP: %v", err)
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, msg, a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionFalse, reasonProxyFailed, msg, a.clock, logger)
return errors.New(msg) // return errors.New(msg)
} // }
for _, ip := range tsIPs { // for _, ip := range tsIPs {
addr, err := netip.ParseAddr(ip) // addr, err := netip.ParseAddr(ip)
if err != nil { // if err != nil {
continue // continue
} // }
if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family // if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family
ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip}) // ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip})
} // }
} // }
svc.Status.LoadBalancer.Ingress = ingress // svc.Status.LoadBalancer.Ingress = ingress
tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger) // tsoperator.SetServiceCondition(svc, tsapi.ProxyReady, metav1.ConditionTrue, reasonProxyCreated, reasonProxyCreated, a.clock, logger)
return nil return nil
} }

View File

@ -58,11 +58,8 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
secret, err := s.client.GetSecret(ctx, s.secretName) secret, err := s.client.GetSecret(ctx, s.secretName)
if err != nil { if err != nil {
if st, ok := err.(*kube.Status); ok && st.Code == 404 {
return nil, ipn.ErrStateNotExist return nil, ipn.ErrStateNotExist
} }
return nil, err
}
b, ok := secret.Data[sanitizeKey(id)] b, ok := secret.Data[sanitizeKey(id)]
if !ok { if !ok {
return nil, ipn.ErrStateNotExist return nil, ipn.ErrStateNotExist
@ -88,7 +85,6 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
secret, err := s.client.GetSecret(ctx, s.secretName) secret, err := s.client.GetSecret(ctx, s.secretName)
if err != nil { if err != nil {
if kube.IsNotFoundErr(err) {
return s.client.CreateSecret(ctx, &kube.Secret{ return s.client.CreateSecret(ctx, &kube.Secret{
TypeMeta: kube.TypeMeta{ TypeMeta: kube.TypeMeta{
APIVersion: "v1", APIVersion: "v1",
@ -102,8 +98,6 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
}, },
}) })
} }
return err
}
if s.canPatch { if s.canPatch {
if len(secret.Data) == 0 { // if user has pre-created a blank Secret if len(secret.Data) == 0 { // if user has pre-created a blank Secret
m := []kube.JSONPatch{ m := []kube.JSONPatch{

View File

@ -0,0 +1,20 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package kube contains types and utilities for the Tailscale Kubernetes Operator.
package kube
const EgressServiceAlphaV = "v1alpha1"
type EgressServices struct {
Version string `json:"version"`
Services map[string]EgressService `json:"services"`
}
type EgressService struct {
// fwegress pod IPs
ClusterSources []string `json:"clusterSources"`
TailnetTargetIP string `json:"tailnetTargetIP"`
}

View File

@ -146,6 +146,13 @@ type Secret struct {
// +optional // +optional
Data map[string][]byte `json:"data,omitempty"` Data map[string][]byte `json:"data,omitempty"`
} }
type ConfigMap struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata"`
// +optional
Data map[string][]byte `json:"data,omitempty"`
}
// Status is a return value for calls that don't return other objects. // Status is a return value for calls that don't return other objects.
type Status struct { type Status struct {

View File

@ -20,6 +20,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -55,6 +56,7 @@ type Client interface {
CreateSecret(context.Context, *Secret) error CreateSecret(context.Context, *Secret) error
StrategicMergePatchSecret(context.Context, string, *Secret, string) error StrategicMergePatchSecret(context.Context, string, *Secret, string) error
JSONPatchSecret(context.Context, string, []JSONPatch) error JSONPatchSecret(context.Context, string, []JSONPatch) error
JSONPatchConfigMap(context.Context, string, []JSONPatch) error
CheckSecretPermissions(context.Context, string) (bool, bool, error) CheckSecretPermissions(context.Context, string) (bool, bool, error)
SetDialer(dialer func(context.Context, string, string) (net.Conn, error)) SetDialer(dialer func(context.Context, string, string) (net.Conn, error))
SetURL(string) SetURL(string)
@ -138,6 +140,10 @@ func (c *client) secretURL(name string) string {
return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets/%s", c.url, c.ns, name) return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets/%s", c.url, c.ns, name)
} }
func (c *client) configMapURL(name string) string {
return fmt.Sprintf("%s/api/v1/namespaces/%s/configmaps/%s", c.url, c.ns, name)
}
func getError(resp *http.Response) error { func getError(resp *http.Response) error {
if resp.StatusCode == 200 || resp.StatusCode == 201 { if resp.StatusCode == 200 || resp.StatusCode == 201 {
// These are the only success codes returned by the Kubernetes API. // These are the only success codes returned by the Kubernetes API.
@ -167,21 +173,21 @@ func setHeader(key, value string) func(*http.Request) {
func (c *client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error { func (c *client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error {
req, err := c.newRequest(ctx, method, url, in) req, err := c.newRequest(ctx, method, url, in)
if err != nil { if err != nil {
return err return fmt.Errorf("error making new request url %s in %v out %v method %v: %v", url, in, out, method, err)
} }
for _, opt := range opts { for _, opt := range opts {
opt(req) opt(req)
} }
resp, err := c.client.Do(req) resp, err := c.client.Do(req)
if err != nil { if err != nil {
return err return fmt.Errorf("error sending request: %+#v: %v", req, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
if err := getError(resp); err != nil { if err := getError(resp); err != nil {
if st, ok := err.(*Status); ok && st.Code == 401 { if st, ok := err.(*Status); ok && st.Code == 401 {
c.expireToken() c.expireToken()
} }
return err return fmt.Errorf("error in response: url %s in %s req \n%+#v\n %v", url, in, req, err)
} }
if out != nil { if out != nil {
return json.NewDecoder(resp.Body).Decode(out) return json.NewDecoder(resp.Body).Decode(out)
@ -253,13 +259,24 @@ type JSONPatch struct {
// It currently (2023-03-02) only supports "add" and "remove" operations. // It currently (2023-03-02) only supports "add" and "remove" operations.
func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error {
for _, p := range patch { for _, p := range patch {
if p.Op != "remove" && p.Op != "add" { if p.Op != "remove" && p.Op != "add" && p.Op != "replace" {
panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op))
} }
} }
return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json"))
} }
// JSONPatchConfigMap updates a configmap in the Kubernetes API using a JSON patch.
// It currently (2023-03-02) only supports "add" and "remove" operations.
func (c *client) JSONPatchConfigMap(ctx context.Context, name string, patch []JSONPatch) error {
for _, p := range patch {
if p.Op != "replace" {
panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op))
}
}
return c.doRequest(ctx, "PATCH", c.configMapURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json"))
}
// StrategicMergePatchSecret updates a secret in the Kubernetes API using a // StrategicMergePatchSecret updates a secret in the Kubernetes API using a
// strategic merge patch. // strategic merge patch.
// If a fieldManager is provided, it will be used to track the patch. // If a fieldManager is provided, it will be used to track the patch.
@ -343,5 +360,8 @@ func IsNotFoundErr(err error) bool {
if st, ok := err.(*Status); ok && st.Code == 404 { if st, ok := err.(*Status); ok && st.Code == 404 {
return true return true
} }
if strings.Contains(err.Error(), "not found") {
return true
}
return false return false
} }

View File

@ -30,5 +30,8 @@ func (fc *FakeClient) StrategicMergePatchSecret(context.Context, string, *Secret
func (fc *FakeClient) JSONPatchSecret(context.Context, string, []JSONPatch) error { func (fc *FakeClient) JSONPatchSecret(context.Context, string, []JSONPatch) error {
return nil return nil
} }
func (fc *FakeClient) JSONPatchConfigMap(context.Context, string, []JSONPatch) error {
return nil
}
func (fc *FakeClient) UpdateSecret(context.Context, *Secret) error { return nil } func (fc *FakeClient) UpdateSecret(context.Context, *Secret) error { return nil }
func (fc *FakeClient) CreateSecret(context.Context, *Secret) error { return nil } func (fc *FakeClient) CreateSecret(context.Context, *Secret) error { return nil }

View File

@ -371,6 +371,11 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error {
return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String()) return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String())
} }
func (i *iptablesRunner) AddDNATRuleWithSrc(src, dst netip.Addr) error {
table := i.getIPTByAddr(dst)
return table.Insert("nat", "PREROUTING", 1, "--source", src.String(), "-j", "DNAT", "--to-destination", dst.String())
}
func (i *iptablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error { func (i *iptablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error {
table := i.getIPTByAddr(dst) table := i.getIPTByAddr(dst)
return table.Insert("nat", "POSTROUTING", 1, "--destination", dst.String(), "-j", "SNAT", "--to-source", src.String()) return table.Insert("nat", "POSTROUTING", 1, "--destination", dst.String(), "-j", "SNAT", "--to-source", src.String())

View File

@ -102,6 +102,52 @@ func (n *nftablesRunner) ensurePreroutingChain(dst netip.Addr) (*nftables.Table,
return nat, preroutingCh, nil return nat, preroutingCh, nil
} }
// DNAT traffic from src to dst
func (n *nftablesRunner) AddDNATRuleWithSrc(src, dst netip.Addr) error {
nat, preroutingCh, err := n.ensurePreroutingChain(dst)
if err != nil {
return err
}
var saddrOffset, fam, saddrLen uint32
if src.Is4() {
saddrOffset = 8
saddrLen = 4
fam = unix.NFPROTO_IPV4
} else {
saddrOffset = 8
saddrLen = 16
fam = unix.NFPROTO_IPV6
}
dnatRule := &nftables.Rule{
Table: nat,
Chain: preroutingCh,
Exprs: []expr.Any{
&expr.Payload{
DestRegister: 1,
Base: expr.PayloadBaseNetworkHeader,
Offset: saddrOffset,
Len: saddrLen,
},
&expr.Cmp{
Op: expr.CmpOpEq,
Register: 1,
Data: src.AsSlice(),
},
&expr.Immediate{
Register: 1,
Data: dst.AsSlice(),
},
&expr.NAT{
Type: expr.NATTypeDestNAT,
Family: fam,
RegAddrMin: 1,
},
},
}
n.conn.InsertRule(dnatRule)
return n.conn.Flush()
}
func (n *nftablesRunner) AddDNATRule(origDst netip.Addr, dst netip.Addr) error { func (n *nftablesRunner) AddDNATRule(origDst netip.Addr, dst netip.Addr) error {
nat, preroutingCh, err := n.ensurePreroutingChain(dst) nat, preroutingCh, err := n.ensurePreroutingChain(dst)
if err != nil { if err != nil {
@ -563,6 +609,9 @@ type NetfilterRunner interface {
// the Tailscale interface, as used in the Kubernetes egress proxies. // the Tailscale interface, as used in the Kubernetes egress proxies.
AddSNATRuleForDst(src, dst netip.Addr) error AddSNATRuleForDst(src, dst netip.Addr) error
// DNAT traffic from src to dst
AddDNATRuleWithSrc(src, dst netip.Addr) error
// DNATNonTailscaleTraffic adds a rule to the nat/PREROUTING chain to DNAT // DNATNonTailscaleTraffic adds a rule to the nat/PREROUTING chain to DNAT
// all traffic inbound from any interface except exemptInterface to dst. // all traffic inbound from any interface except exemptInterface to dst.
// This is used to forward traffic destined for the local machine over // This is used to forward traffic destined for the local machine over