mirror of
synced 2025-03-27 03:22:25 +00:00
254 lines
7.7 KiB
254 lines
7.7 KiB
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
xslices "golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"
kzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
func main() {
var opts []kzap.Opts
opts = append(opts, kzap.UseDevMode(true), kzap.Level(zapcore.DebugLevel))
zlog := kzap.NewRaw(opts...).Sugar()
restConfig := config.GetConfigOrDie()
// One EndpointSlice marked by egress service name for the Service
egressSvc := os.Getenv("TS_EGRESS_SVC")
if egressSvc == "" {
zlog.Fatalf("empty egress service name")
podIP := os.Getenv("POD_IP")
if podIP == "" {
zlog.Fatalf("empty POD_IP")
podUID := os.Getenv("POD_UID")
if podUID == "" {
zlog.Fatalf("empty Pod UID")
labelReq, err := labels.NewRequirement("tailscale.com/fwegress", selection.Equals, []string{egressSvc})
if err != nil {
zlog.Fatalf("error creating a label requirement: %v", err)
labelFilter := cache.ByObject{
Label: labels.NewSelector().Add(*labelReq),
nsFilter := cache.ByObject{
Namespaces: map[string]cache.Config{"tailscale": {LabelSelector: labelFilter.Label}},
nsFilter1 := cache.ByObject{
Field: client.InNamespace("tailscale").AsSelector(),
mgr, err := manager.New(restConfig, manager.Options{Scheme: tsapi.GlobalScheme,
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&discoveryv1.EndpointSlice{}: nsFilter,
&corev1.Pod{}: nsFilter1,
if err != nil {
zlog.Fatalf("could not create manager: %v", err)
// TODO: does this result in setting up unnecessary default firewall
// rules for tailscale?
nfRunner, err := linuxfw.New(zlog.Debugf, "")
if err != nil {
zlog.Fatalf("could not create netfilter runner: %v", err)
podName := os.Getenv("POD_NAME")
if podName == "" {
zlog.Fatal("empty Pod name")
err = builder.
For(&discoveryv1.EndpointSlice{}). // label filter
Client: mgr.GetClient(),
logger: zlog.Named("FWEgress-reconciler"),
nfRunner: nfRunner,
podIP: netip.MustParseAddr(podIP),
podName: podName,
podUID: podUID,
state: &state{routes: make([]netip.Addr, 0)},
egressSvc: egressSvc,
if err != nil {
zlog.Fatalf("error creating FWEgress reconciler: %v", err)
if mgr.Start(signals.SetupSignalHandler()); err != nil {
zlog.Fatalf("error starting controller manager: %v", err)
type FWEgressReconciler struct {
state *state
logger *zap.SugaredLogger
nfRunner linuxfw.NetfilterRunner
podIP netip.Addr
podName string
podUID string
egressSvc string
// The operator creates the EndpointSlice as that makes it easier to co-ordinate the IP family thing.
func (r *FWEgressReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
r.logger.Debugf("starting reconcile")
defer r.logger.Debugf("reconcile finished")
newRoutes := make([]netip.Addr, 0)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: r.podName,
Namespace: "tailscale",
if err := r.Get(ctx, client.ObjectKeyFromObject(pod), pod); err != nil {
return res, err
ready := corev1.ConditionFalse
defer func() {
r.logger.Debugf("setting new routes %v", newRoutes)
oldPodStatus := pod.Status.DeepCopy()
podSetTailscaleReady(ready, pod)
if !apiequality.Semantic.DeepEqual(pod.Status, oldPodStatus) {
r.logger.Debugf("updating Pod status", newRoutes)
if updateErr := r.Status().Update(ctx, pod); updateErr != nil {
err = errors.Join(err, fmt.Errorf("error updating proxy headless Service metadata: %w", err))
// custom pod status condition is only used for readiness check, it is not reliable for internal use because it is possible that container restarted and we lost routes, but pod status is still the same
eps := new(discoveryv1.EndpointSlice)
err = r.Get(ctx, req.NamespacedName, eps)
if apierrors.IsNotFound(err) {
r.logger.Debugf("EndpointSlice not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err)
egressSvc := eps.Labels["tailscale.com/fwegress"]
if !strings.EqualFold(egressSvc, r.egressSvc) {
r.logger.Debugf("got EndpointSlice for service %s interested in %s", egressSvc, r.egressSvc)
return res, nil
// TODO: this is round robin for iptables, but not nftables- must fix
// nftables
addrs := make([]netip.Addr, 0)
for _, ep := range eps.Endpoints {
if !strings.EqualFold(*ep.Hostname, r.podUID) {
r.logger.Debugf("skip endpoint for fwegress Pod %s", *ep.Hostname)
for _, addrS := range ep.Addresses {
addr, err := netip.ParseAddr(addrS)
if err != nil {
return res, fmt.Errorf("error parsing EndpointSlice address %s: %v", addrS, err)
// duplicates aren't expected
addrs = append(addrs, addr)
if !r.state.routesNeedUpdate(addrs) {
r.logger.Debugf("routes don't need update")
ready = corev1.ConditionTrue
r.logger.Debugf("routes need update, new routes are %v", addrs)
// TODO: also add a mark
// we could mark packets for this service so don't have to reconfigure as these Pods go up and down
if err := r.nfRunner.DNATWithLoadBalancer(r.podIP, addrs); err != nil {
r.logger.Errorf("error updating routes: %v", err)
return res, fmt.Errorf("error setting up load balancer rules: %v", err)
for _, addr := range addrs {
if err := r.nfRunner.AddSNATRuleForDst(r.podIP, addr); err != nil {
return res, fmt.Errorf("error setting up SNAT rules %w", err)
newRoutes = addrs
ready = corev1.ConditionTrue
return res, nil
type state struct {
routes []netip.Addr
func (s *state) routesNeedUpdate(newRoutes []netip.Addr) bool {
defer s.Unlock()
if len(newRoutes) != len(s.routes) {
return true
// TODO: bart.Table would be more efficient maybe
// Routes should be sorted
for i, r := range s.routes {
if newRoutes[i].Compare(r) != 0 {
return true
return false
// we need to store routes internally - they can be lost during container
// restarts and container restarts can happen in a way that cannot be tied to
// resource garbage collection etc
func (s *state) set(routes []netip.Addr) {
s.routes = routes
func podSetTailscaleReady(status corev1.ConditionStatus, pod *corev1.Pod) {
newCondition := corev1.PodCondition{
Type: corev1.PodConditionType("TailscaleRoutesReady"),
Status: status,
idx := xslices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool {
return cond.Type == corev1.PodConditionType("TailscaleRoutesReady")
if idx == -1 {
pod.Status.Conditions = append(pod.Status.Conditions, newCondition)
pod.Status.Conditions[idx] = newCondition