all-kube: create Tailscale Service for HA kube-apiserver ProxyGroup

Adds a new reconciler for ProxyGroups of type kube-apiserver that will
provision a Tailscale Service for each replica to advertise. Adds two
new condition types to the ProxyGroup, TailscaleServiceValid and
TailscaleServiceConfigured, to post updates on the state of that
reconciler in a way that's consistent with the service-pg reconciler.
The created Tailscale Service name is configurable via a new ProxyGroup
field spec.kubeAPISserver.ServiceName, which expects a string of the
form "svc:<dns-label>".

Lots of supporting changes were needed to implement this in a way that's
consistent with other operator workflows, including:

* Pulled containerboot's ensureServicesUnadvertised and certManager into
  kube/ libraries to be shared with k8s-proxy. Use those in k8s-proxy to
  aid Service cert sharing between replicas and graceful Service shutdown.
* For certManager, add an initial wait to the cert loop to wait until
  the domain appears in the devices's netmap to avoid a guaranteed error
  on the first issue attempt when it's quick to start.
* Made several methods in ingress-for-pg.go and svc-for-pg.go into
  functions to share with the new reconciler
* Added a Resource struct to the owner refs stored in Tailscale Service
  annotations to be able to distinguish between Ingress- and ProxyGroup-
  based Services that need cleaning up in the Tailscale API.
* Added a ListVIPServices method to the internal tailscale client to aid
  cleaning up orphaned Services
* Support for reading config from a kube Secret, and partial support for
  config reloading, to prevent us having to force Pod restarts when
  config changes.
* Fixed up the zap logger so it's possible to set debug log level.

Updates #13358

Change-Id: Ia9607441157dd91fb9b6ecbc318eecbef446e116
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Tom Proctor
2025-07-14 23:21:38 +01:00
parent b63f8a457d
commit 68652fcd27
37 changed files with 1848 additions and 327 deletions

View File

@@ -122,6 +122,7 @@ import (
"tailscale.com/ipn"
kubeutils "tailscale.com/k8s-operator"
"tailscale.com/kube/kubetypes"
"tailscale.com/kube/services"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/types/ptr"
@@ -210,7 +211,7 @@ func run() error {
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()
if err := ensureServicesNotAdvertised(ctx, client); err != nil {
if err := services.EnsureServicesNotAdvertised(ctx, client, log.Printf); err != nil {
log.Printf("Error ensuring services are not advertised: %v", err)
}

View File

@@ -19,7 +19,9 @@ import (
"github.com/fsnotify/fsnotify"
"tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/kube/certs"
"tailscale.com/kube/kubetypes"
klc "tailscale.com/kube/localclient"
"tailscale.com/types/netmap"
)
@@ -52,11 +54,9 @@ func watchServeConfigChanges(ctx context.Context, cdChanged <-chan bool, certDom
var certDomain string
var prevServeConfig *ipn.ServeConfig
var cm certManager
var cm *certs.CertManager
if cfg.CertShareMode == "rw" {
cm = certManager{
lc: lc,
}
cm = certs.NewCertManager(klc.New(lc), log.Printf)
}
for {
select {
@@ -93,7 +93,7 @@ func watchServeConfigChanges(ctx context.Context, cdChanged <-chan bool, certDom
if cfg.CertShareMode != "rw" {
continue
}
if err := cm.ensureCertLoops(ctx, sc); err != nil {
if err := cm.EnsureCertLoops(ctx, sc); err != nil {
log.Fatalf("serve proxy: error ensuring cert loops: %v", err)
}
}

View File

@@ -0,0 +1,471 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"slices"
"strings"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"tailscale.com/internal/client/tailscale"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
)
const (
proxyPGFinalizerName = "tailscale.com/kube-apiserver-finalizer"
// Reasons for PGTailscaleServiceValid condition.
reasonPGTailscaleServiceInvalid = "TailscaleServiceInvalid"
reasonPGTailscaleServiceValid = "TailscaleServiceValid"
// Reasons for PGTailscaleServiceConfigured condition.
reasonPGTailscaleServiceConfigured = "TailscaleServiceConfigured"
reasonPGTailscaleServiceNoBackends = "TailscaleServiceNoBackends"
)
// APIServerProxyServiceReconciler reconciles the Tailscale Services required for an
// HA deployment of the API Server Proxy.
type APIServerProxyServiceReconciler struct {
client.Client
recorder record.EventRecorder
logger *zap.SugaredLogger
tsClient tsClient
tsNamespace string
lc localClient
defaultTags []string
operatorID string // stableID of the operator's Tailscale device
clock tstime.Clock
}
// Reconcile is the entry point for the controller.
func (r *APIServerProxyServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
logger := r.logger.With("ProxyGroup", req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
pg := new(tsapi.ProxyGroup)
err = r.Get(ctx, req.NamespacedName, pg)
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("ProxyGroup not found, assuming it was deleted")
return res, nil
} else if err != nil {
return res, fmt.Errorf("failed to get ProxyGroup: %w", err)
}
serviceName := serviceNameForAPIServerProxy(pg)
logger = logger.With("Tailscale Service", serviceName)
if markedForDeletion(pg) {
logger.Debugf("ProxyGroup is being deleted, ensuring any created resources are cleaned up")
return res, r.maybeCleanup(ctx, serviceName, pg, logger)
}
err = r.maybeProvision(ctx, serviceName, pg, logger)
if err != nil {
if strings.Contains(err.Error(), optimisticLockErrorMsg) {
logger.Infof("optimistic lock error, retrying: %s", err)
} else {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
// maybeProvision ensures that a Tailscale Service for this ProxyGroup exists
// and is up to date.
//
// Returns true if the operation resulted in a Tailscale Service update.
func (r *APIServerProxyServiceReconciler) maybeProvision(ctx context.Context, serviceName tailcfg.ServiceName, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (err error) {
oldPGStatus := pg.Status.DeepCopy()
defer func() {
podsAdvertising, podsErr := numberPodsAdvertising(ctx, r.Client, r.tsNamespace, pg.Name, serviceName)
if podsErr != nil {
err = errors.Join(err, fmt.Errorf("failed to get number of advertised Pods: %w", podsErr))
// Continue, updating the status with the best available information.
}
// Update the ProxyGroup status with the Tailscale Service information
// Update the condition based on how many pods are advertising the service
conditionStatus := metav1.ConditionFalse
conditionReason := reasonPGTailscaleServiceNoBackends
conditionMessage := fmt.Sprintf("%d/%d proxy backends ready and advertising", podsAdvertising, pgReplicas(pg))
if podsAdvertising > 0 {
// At least one pod is advertising the service, consider it configured
conditionStatus = metav1.ConditionTrue
conditionReason = reasonPGTailscaleServiceConfigured
}
tsoperator.SetProxyGroupCondition(pg, tsapi.PGTailscaleServiceConfigured, conditionStatus, conditionReason, conditionMessage, pg.Generation, r.clock, logger)
if !apiequality.Semantic.DeepEqual(oldPGStatus, &pg.Status) {
// An error encountered here should get returned by the Reconcile function.
err = errors.Join(err, r.Client.Status().Update(ctx, pg))
}
}()
if !tsoperator.ProxyGroupAvailable(pg) {
return nil
}
if !slices.Contains(pg.Finalizers, proxyPGFinalizerName) {
// This log line is printed exactly once during initial provisioning,
// because once the finalizer is in place this block gets skipped. So,
// this is a nice place to tell the operator that the high level,
// multi-reconcile operation is underway.
logger.Info("provisioning Tailscale Service for ProxyGroup")
pg.Finalizers = append(pg.Finalizers, proxyPGFinalizerName)
if err := r.Update(ctx, pg); err != nil {
return fmt.Errorf("failed to add finalizer: %w", err)
}
}
// 1. Check there isn't a Tailscale Service with the same hostname
// already created and not owned by this ProxyGroup.
existingTSSvc, err := r.tsClient.GetVIPService(ctx, serviceName)
if isErrorFeatureFlagNotEnabled(err) {
logger.Warn(msgFeatureFlagNotEnabled)
r.recorder.Event(pg, corev1.EventTypeWarning, warningTailscaleServiceFeatureFlagNotEnabled, msgFeatureFlagNotEnabled)
tsoperator.SetProxyGroupCondition(pg, tsapi.PGTailscaleServiceValid, metav1.ConditionFalse, reasonPGTailscaleServiceInvalid, msgFeatureFlagNotEnabled, pg.Generation, r.clock, logger)
return nil
}
if err != nil && !isErrorTailscaleServiceNotFound(err) {
return fmt.Errorf("error getting Tailscale Service %q: %w", serviceName, err)
}
updatedAnnotations, err := exclusiveOwnerAnnotations(pg, r.operatorID, existingTSSvc)
if err != nil {
const instr = "To proceed, you can either manually delete the existing Tailscale Service or choose a different Service name in the ProxyGroup's spec.kubeAPIServer.serviceName field"
msg := fmt.Sprintf("error ensuring exclusive ownership of Tailscale Service %s: %v. %s", serviceName, err, instr)
logger.Warn(msg)
r.recorder.Event(pg, corev1.EventTypeWarning, "InvalidTailscaleService", msg)
tsoperator.SetProxyGroupCondition(pg, tsapi.PGTailscaleServiceValid, metav1.ConditionFalse, reasonPGTailscaleServiceInvalid, msg, pg.Generation, r.clock, logger)
return nil
}
// After getting this far, we know the Tailscale Service is valid.
tsoperator.SetProxyGroupCondition(pg, tsapi.PGTailscaleServiceValid, metav1.ConditionTrue, reasonPGTailscaleServiceValid, reasonPGTailscaleServiceValid, pg.Generation, r.clock, logger)
// Service tags are limited to matching the ProxyGroup's tags until we have
// support for querying peer caps for a Service-bound request.
serviceTags := r.defaultTags
if len(pg.Spec.Tags) > 0 {
serviceTags = pg.Spec.Tags.Stringify()
}
tsSvc := &tailscale.VIPService{
Name: serviceName,
Tags: serviceTags,
Ports: []string{"tcp:443"},
Comment: managedTSServiceComment,
Annotations: updatedAnnotations,
}
if existingTSSvc != nil {
tsSvc.Addrs = existingTSSvc.Addrs
}
// 2. Ensure the Tailscale Service exists and is up to date.
if existingTSSvc == nil ||
!slices.Equal(tsSvc.Tags, existingTSSvc.Tags) ||
!ownersAreSetAndEqual(tsSvc, existingTSSvc) ||
!slices.Equal(tsSvc.Ports, existingTSSvc.Ports) {
logger.Infof("Ensuring Tailscale Service exists and is up to date")
if err := r.tsClient.CreateOrUpdateVIPService(ctx, tsSvc); err != nil {
return fmt.Errorf("error creating Tailscale Service: %w", err)
}
}
// 3. Ensure that TLS Secret and RBAC exists.
tcd, err := tailnetCertDomain(ctx, r.lc)
if err != nil {
return fmt.Errorf("error determining DNS name base: %w", err)
}
dnsName := serviceName.WithoutPrefix() + "." + tcd
if err = r.ensureCertResources(ctx, pg, dnsName); err != nil {
return fmt.Errorf("error ensuring cert resources: %w", err)
}
// 4. Configure the Pods to advertise the Tailscale Service.
if err = r.maybeAdvertiseServices(ctx, pg, serviceName, logger); err != nil {
return fmt.Errorf("error updating advertised Tailscale Services: %w", err)
}
// 5. Clean up any stale Tailscale Services from previous resource versions.
if err = r.maybeDeleteStaleServices(ctx, pg, logger); err != nil {
return fmt.Errorf("failed to delete old Tailscale Services: %w", err)
}
return nil
}
// maybeCleanup ensures that any resources, such as a Tailscale Service created for this Service, are cleaned up when the
// Service is being deleted or is unexposed. The cleanup is safe for a multi-cluster setup- the Tailscale Service is only
// deleted if it does not contain any other owner references. If it does the cleanup only removes the owner reference
// corresponding to this Service.
func (r *APIServerProxyServiceReconciler) maybeCleanup(ctx context.Context, serviceName tailcfg.ServiceName, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (err error) {
ix := slices.Index(pg.Finalizers, proxyPGFinalizerName)
if ix < 0 {
logger.Debugf("no finalizer, nothing to do")
return nil
}
logger.Infof("Ensuring that Service %q is cleaned up", serviceName)
defer func() {
if err == nil {
err = r.deleteFinalizer(ctx, pg, logger)
}
}()
if _, err = cleanupTailscaleService(ctx, r.tsClient, serviceName, r.operatorID, logger); err != nil {
return fmt.Errorf("error deleting Tailscale Service: %w", err)
}
if err = cleanupCertResources(ctx, r.Client, r.lc, r.tsNamespace, pg.Name, serviceName); err != nil {
return fmt.Errorf("failed to clean up cert resources: %w", err)
}
return nil
}
// maybeDeleteStaleServices deletes Services that have previously been created for
// this ProxyGroup but are no longer needed.
func (r *APIServerProxyServiceReconciler) maybeDeleteStaleServices(ctx context.Context, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) error {
serviceName := serviceNameForAPIServerProxy(pg)
svcs, err := r.tsClient.ListVIPServices(ctx)
if err != nil {
return fmt.Errorf("error listing Tailscale Services: %w", err)
}
for _, svc := range svcs.VIPServices {
if svc.Name == serviceName {
continue
}
owners, err := parseOwnerAnnotation(&svc)
if err != nil {
logger.Warnf("error parsing owner annotation for Tailscale Service %s: %v", svc.Name, err)
continue
}
if owners == nil || len(owners.OwnerRefs) != 1 || owners.OwnerRefs[0].OperatorID != r.operatorID {
continue
}
owner := owners.OwnerRefs[0]
if owner.Resource.Kind != "ProxyGroup" || owner.Resource.UID != string(pg.UID) {
continue
}
logger.Infof("Deleting Tailscale Service %s owned by ProxyGroup %s", svc.Name, pg.Name)
if err := r.tsClient.DeleteVIPService(ctx, svc.Name); err != nil && !isErrorTailscaleServiceNotFound(err) {
return fmt.Errorf("error deleting Tailscale Service %s: %w", svc.Name, err)
}
}
return nil
}
func (r *APIServerProxyServiceReconciler) deleteFinalizer(ctx context.Context, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) error {
pg.Finalizers = slices.DeleteFunc(pg.Finalizers, func(f string) bool {
return f == proxyPGFinalizerName
})
logger.Debugf("ensure %q finalizer is removed", proxyPGFinalizerName)
if err := r.Update(ctx, pg); err != nil {
return fmt.Errorf("failed to remove finalizer %q: %w", proxyPGFinalizerName, err)
}
return nil
}
func (r *APIServerProxyServiceReconciler) ensureCertResources(ctx context.Context, pg *tsapi.ProxyGroup, domain string) error {
secret := certSecret(pg.Name, r.tsNamespace, domain, pg)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, secret, func(s *corev1.Secret) {
s.Labels = secret.Labels
}); err != nil {
return fmt.Errorf("failed to create or update Secret %s: %w", secret.Name, err)
}
role := certSecretRole(pg.Name, r.tsNamespace, domain)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
r.Labels = role.Labels
r.Rules = role.Rules
}); err != nil {
return fmt.Errorf("failed to create or update Role %s: %w", role.Name, err)
}
rolebinding := certSecretRoleBinding(pg, r.tsNamespace, domain)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, rolebinding, func(rb *rbacv1.RoleBinding) {
rb.Labels = rolebinding.Labels
rb.Subjects = rolebinding.Subjects
rb.RoleRef = rolebinding.RoleRef
}); err != nil {
return fmt.Errorf("failed to create or update RoleBinding %s: %w", rolebinding.Name, err)
}
return nil
}
func (r *APIServerProxyServiceReconciler) maybeAdvertiseServices(ctx context.Context, pg *tsapi.ProxyGroup, serviceName tailcfg.ServiceName, logger *zap.SugaredLogger) error {
// Get all config Secrets for this ProxyGroup
cfgSecrets := &corev1.SecretList{}
if err := r.List(ctx, cfgSecrets, client.InNamespace(r.tsNamespace), client.MatchingLabels(pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeConfig))); err != nil {
return fmt.Errorf("failed to list config Secrets: %w", err)
}
// Only advertise a Tailscale Service once the TLS certs required for
// serving it are available.
shouldBeAdvertised, err := hasCerts(ctx, r.Client, r.lc, r.tsNamespace, serviceName)
if err != nil {
return fmt.Errorf("error checking TLS credentials provisioned for Tailscale Service %q: %w", serviceName, err)
}
for _, s := range cfgSecrets.Items {
if len(s.Data[kubetypes.KubeAPIServerConfigFile]) == 0 {
continue
}
// Parse the existing config.
cfg, err := conf.Load(s.Data[kubetypes.KubeAPIServerConfigFile])
if err != nil {
return fmt.Errorf("error loading config from Secret %q: %w", s.Name, err)
}
if cfg.Parsed.APIServerProxy == nil {
return fmt.Errorf("[unexpected] config Secret %q does not contain APIServerProxy config", s.Name)
}
existingCfgSecret := s.DeepCopy()
var updated bool
if cfg.Parsed.APIServerProxy.ServiceName == nil || *cfg.Parsed.APIServerProxy.ServiceName != serviceName {
cfg.Parsed.APIServerProxy.ServiceName = &serviceName
updated = true
}
// Update the services to advertise if required.
idx := slices.Index(cfg.Parsed.AdvertiseServices, serviceName.String())
isAdvertised := idx >= 0
switch {
case isAdvertised == shouldBeAdvertised:
// Already up to date.
case isAdvertised:
// Needs to be removed.
cfg.Parsed.AdvertiseServices = slices.Delete(cfg.Parsed.AdvertiseServices, idx, idx+1)
updated = true
case shouldBeAdvertised:
// Needs to be added.
cfg.Parsed.AdvertiseServices = append(cfg.Parsed.AdvertiseServices, serviceName.String())
updated = true
}
if !updated {
continue
}
// Update the config Secret.
cfgB, err := json.Marshal(conf.VersionedConfig{
Version: "v1alpha1",
ConfigV1Alpha1: &cfg.Parsed,
})
if err != nil {
return err
}
s.Data[kubetypes.KubeAPIServerConfigFile] = cfgB
if !apiequality.Semantic.DeepEqual(existingCfgSecret, s) {
logger.Debugf("Updating the Tailscale Services in ProxyGroup config Secret %s", s.Name)
if err := r.Update(ctx, &s); err != nil {
return err
}
}
}
return nil
}
func serviceNameForAPIServerProxy(pg *tsapi.ProxyGroup) tailcfg.ServiceName {
if pg.Spec.KubeAPIServer != nil && pg.Spec.KubeAPIServer.ServiceName != "" {
return tailcfg.ServiceName(pg.Spec.KubeAPIServer.ServiceName)
}
return tailcfg.ServiceName("svc:" + pg.Name)
}
// exclusiveOwnerAnnotations returns the updated annotations required to ensure this
// instance of the operator is the exclusive owner. If the Tailscale Service is not
// nil, but does not contain an owner reference we return an error as this likely means
// that the Service was created by somthing other than a Tailscale Kubernetes operator.
// We also error if it is already owned by another operator instance, as we do not
// want to load balance a kube-apiserver ProxyGroup across multiple clusters.
func exclusiveOwnerAnnotations(pg *tsapi.ProxyGroup, operatorID string, svc *tailscale.VIPService) (map[string]string, error) {
ref := OwnerRef{
OperatorID: operatorID,
Resource: &Resource{
Kind: "ProxyGroup",
Name: pg.Name,
UID: string(pg.UID),
},
}
if svc == nil {
c := ownerAnnotationValue{OwnerRefs: []OwnerRef{ref}}
json, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf("[unexpected] unable to marshal Tailscale Service's owner annotation contents: %w, please report this", err)
}
return map[string]string{
ownerAnnotation: string(json),
}, nil
}
o, err := parseOwnerAnnotation(svc)
if err != nil {
return nil, err
}
if o == nil || len(o.OwnerRefs) == 0 {
return nil, fmt.Errorf("Tailscale Service %s exists, but does not contain owner annotation with owner references; not proceeding as this is likely a resource created by something other than the Tailscale Kubernetes operator", svc.Name)
}
if len(o.OwnerRefs) > 1 || o.OwnerRefs[0].OperatorID != operatorID {
return nil, fmt.Errorf("Tailscale Service %s is already owned by other operator(s) and cannot be shared across multiple clusters; configure a difference Service name to continue", svc.Name)
}
if o.OwnerRefs[0].Resource == nil {
return nil, fmt.Errorf("Tailscale Service %s exists, but does not reference an owning resource; not proceeding as this is likely a Service already owned by an Ingress", svc.Name)
}
if o.OwnerRefs[0].Resource.Kind != "ProxyGroup" || o.OwnerRefs[0].Resource.UID != string(pg.UID) {
return nil, fmt.Errorf("Tailscale Service %s is already owned by another resource: %#v; configure a difference Service name to continue", svc.Name, o.OwnerRefs[0].Resource)
}
if o.OwnerRefs[0].Resource.Name != pg.Name {
// ProxyGroup name can be updated in place.
o.OwnerRefs[0].Resource.Name = pg.Name
}
oBytes, err := json.Marshal(o)
if err != nil {
return nil, err
}
newAnnots := make(map[string]string, len(svc.Annotations)+1)
maps.Copy(newAnnots, svc.Annotations)
newAnnots[ownerAnnotation] = string(oBytes)
return newAnnots, nil
}

View File

@@ -0,0 +1,108 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package main
import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"tailscale.com/internal/client/tailscale"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
)
func TestExclusiveOwnerAnnotations(t *testing.T) {
pg := &tsapi.ProxyGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
UID: "pg1-uid",
},
}
const (
selfOperatorID = "self-id"
pg1Owner = `{"ownerRefs":[{"operatorID":"self-id","resource":{"kind":"ProxyGroup","name":"pg1","uid":"pg1-uid"}}]}`
)
for name, tc := range map[string]struct {
svc *tailscale.VIPService
wantErr string
}{
"no_svc": {
svc: nil,
},
"empty_svc": {
svc: &tailscale.VIPService{},
wantErr: "likely a resource created by something other than the Tailscale Kubernetes operator",
},
"already_owner": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: pg1Owner,
},
},
},
"already_owner_name_updated": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"self-id","resource":{"kind":"ProxyGroup","name":"old-pg1-name","uid":"pg1-uid"}}]}`,
},
},
},
"preserves_existing_annotations": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
"existing": "annotation",
ownerAnnotation: pg1Owner,
},
},
},
"owned_by_another_operator": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"operator-2"}]}`,
},
},
wantErr: "already owned by other operator(s)",
},
"owned_by_an_ingress": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"self-id"}]}`, // Ingress doesn't set Resource field (yet).
},
},
wantErr: "does not reference an owning resource",
},
"owned_by_another_pg": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"self-id","resource":{"kind":"ProxyGroup","name":"pg2","uid":"pg2-uid"}}]}`,
},
},
wantErr: "already owned by another resource",
},
} {
t.Run(name, func(t *testing.T) {
got, err := exclusiveOwnerAnnotations(pg, "self-id", tc.svc)
if tc.wantErr != "" {
if !strings.Contains(err.Error(), tc.wantErr) {
t.Errorf("exclusiveOwnerAnnotations() error = %v, wantErr %v", err, tc.wantErr)
}
} else if diff := cmp.Diff(pg1Owner, got[ownerAnnotation]); diff != "" {
t.Errorf("exclusiveOwnerAnnotations() mismatch (-want +got):\n%s", diff)
}
if tc.svc == nil {
return // Don't check annotations being preserved.
}
for k, v := range tc.svc.Annotations {
if k == ownerAnnotation {
continue
}
if got[k] != v {
t.Errorf("exclusiveOwnerAnnotations() did not preserve annotation %q: got %q, want %q", k, got[k], v)
}
}
})
}
}

View File

@@ -93,6 +93,14 @@ spec:
enum:
- auth
- noauth
serviceName:
description: |-
ServiceName is the name of the Tailscale Service to create. Must have a
prefix of "svc:" and the remaining characters must be a valid DNS label
no longer than 63 characters. If not specified, a name will be generated
based on the ProxyGroup name.
type: string
pattern: ^svc:[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$
proxyClass:
description: |-
ProxyClass is the name of the ProxyClass custom resource that contains

View File

@@ -2939,6 +2939,14 @@ spec:
- auth
- noauth
type: string
serviceName:
description: |-
ServiceName is the name of the Tailscale Service to create. Must have a
prefix of "svc:" and the remaining characters must be a valid DNS label
no longer than 63 characters. If not specified, a name will be generated
based on the ProxyGroup name.
pattern: ^svc:[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$
type: string
type: object
proxyClass:
description: |-

View File

@@ -20,6 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tstest"
"tailscale.com/util/mak"
)
@@ -200,7 +201,7 @@ func podAndSecretForProxyGroup(pg string) (*corev1.Pod, *corev1.Secret) {
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", pg),
Namespace: "operator-ns",
Labels: pgSecretLabels(pg, "state"),
Labels: pgSecretLabels(pg, kubetypes.LabelSecretTypeState),
},
}
return p, s

View File

@@ -248,7 +248,7 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin
return false, nil
}
// 3. Ensure that TLS Secret and RBAC exists
tcd, err := r.tailnetCertDomain(ctx)
tcd, err := tailnetCertDomain(ctx, r.lc)
if err != nil {
return false, fmt.Errorf("error determining DNS name base: %w", err)
}
@@ -358,7 +358,7 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin
}
// 6. Update Ingress status if ProxyGroup Pods are ready.
count, err := r.numberPodsAdvertising(ctx, pg.Name, serviceName)
count, err := numberPodsAdvertising(ctx, r.Client, r.tsNamespace, pg.Name, serviceName)
if err != nil {
return false, fmt.Errorf("failed to check if any Pods are configured: %w", err)
}
@@ -370,7 +370,7 @@ func (r *HAIngressReconciler) maybeProvision(ctx context.Context, hostname strin
ing.Status.LoadBalancer.Ingress = nil
default:
var ports []networkingv1.IngressPortStatus
hasCerts, err := r.hasCerts(ctx, serviceName)
hasCerts, err := hasCerts(ctx, r.Client, r.lc, r.tsNamespace, serviceName)
if err != nil {
return false, fmt.Errorf("error checking TLS credentials provisioned for Ingress: %w", err)
}
@@ -481,7 +481,7 @@ func (r *HAIngressReconciler) maybeCleanupProxyGroup(ctx context.Context, proxyG
delete(cfg.Services, tsSvcName)
serveConfigChanged = true
}
if err := r.cleanupCertResources(ctx, proxyGroupName, tsSvcName); err != nil {
if err := cleanupCertResources(ctx, r.Client, r.lc, r.tsNamespace, proxyGroupName, tsSvcName); err != nil {
return false, fmt.Errorf("failed to clean up cert resources: %w", err)
}
}
@@ -557,7 +557,7 @@ func (r *HAIngressReconciler) maybeCleanup(ctx context.Context, hostname string,
}
// 3. Clean up any cluster resources
if err := r.cleanupCertResources(ctx, pg, serviceName); err != nil {
if err := cleanupCertResources(ctx, r.Client, r.lc, r.tsNamespace, pg, serviceName); err != nil {
return false, fmt.Errorf("failed to clean up cert resources: %w", err)
}
@@ -634,8 +634,8 @@ type localClient interface {
}
// tailnetCertDomain returns the base domain (TCD) of the current tailnet.
func (r *HAIngressReconciler) tailnetCertDomain(ctx context.Context) (string, error) {
st, err := r.lc.StatusWithoutPeers(ctx)
func tailnetCertDomain(ctx context.Context, lc localClient) (string, error) {
st, err := lc.StatusWithoutPeers(ctx)
if err != nil {
return "", fmt.Errorf("error getting tailscale status: %w", err)
}
@@ -761,7 +761,7 @@ const (
func (a *HAIngressReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Context, pgName string, serviceName tailcfg.ServiceName, mode serviceAdvertisementMode, logger *zap.SugaredLogger) (err error) {
// Get all config Secrets for this ProxyGroup.
secrets := &corev1.SecretList{}
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "config"))); err != nil {
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, kubetypes.LabelSecretTypeConfig))); err != nil {
return fmt.Errorf("failed to list config Secrets: %w", err)
}
@@ -773,7 +773,7 @@ func (a *HAIngressReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
// The only exception is Ingresses with an HTTP endpoint enabled - if an
// Ingress has an HTTP endpoint enabled, it will be advertised even if the
// TLS cert is not yet provisioned.
hasCert, err := a.hasCerts(ctx, serviceName)
hasCert, err := hasCerts(ctx, a.Client, a.lc, a.tsNamespace, serviceName)
if err != nil {
return fmt.Errorf("error checking TLS credentials provisioned for service %q: %w", serviceName, err)
}
@@ -822,10 +822,10 @@ func (a *HAIngressReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
return nil
}
func (a *HAIngressReconciler) numberPodsAdvertising(ctx context.Context, pgName string, serviceName tailcfg.ServiceName) (int, error) {
func numberPodsAdvertising(ctx context.Context, cl client.Client, tsNamespace, pgName string, serviceName tailcfg.ServiceName) (int, error) {
// Get all state Secrets for this ProxyGroup.
secrets := &corev1.SecretList{}
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "state"))); err != nil {
if err := cl.List(ctx, secrets, client.InNamespace(tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, kubetypes.LabelSecretTypeState))); err != nil {
return 0, fmt.Errorf("failed to list ProxyGroup %q state Secrets: %w", pgName, err)
}
@@ -859,7 +859,14 @@ type ownerAnnotationValue struct {
// Kubernetes operator instance.
type OwnerRef struct {
// OperatorID is the stable ID of the operator's Tailscale device.
OperatorID string `json:"operatorID,omitempty"`
OperatorID string `json:"operatorID,omitempty"`
Resource *Resource `json:"resource,omitempty"` // optional, used to identify the ProxyGroup that owns this Tailscale Service.
}
type Resource struct {
Kind string `json:"kind,omitempty"` // "ProxyGroup"
Name string `json:"name,omitempty"` // Name of the ProxyGroup that owns this Tailscale Service. Informational only.
UID string `json:"uid,omitempty"` // UID of the ProxyGroup that owns this Tailscale Service.
}
// ownerAnnotations returns the updated annotations required to ensure this
@@ -891,6 +898,9 @@ func ownerAnnotations(operatorID string, svc *tailscale.VIPService) (map[string]
if slices.Contains(o.OwnerRefs, ref) { // up to date
return svc.Annotations, nil
}
if o.OwnerRefs[0].Resource != nil {
return nil, fmt.Errorf("Tailscale Service %s is owned by another resource: %#v; cannot be reused for an Ingress", svc.Name, o.OwnerRefs[0].Resource)
}
o.OwnerRefs = append(o.OwnerRefs, ref)
json, err := json.Marshal(o)
if err != nil {
@@ -949,7 +959,7 @@ func (r *HAIngressReconciler) ensureCertResources(ctx context.Context, pg *tsapi
}); err != nil {
return fmt.Errorf("failed to create or update Role %s: %w", role.Name, err)
}
rolebinding := certSecretRoleBinding(pg.Name, r.tsNamespace, domain)
rolebinding := certSecretRoleBinding(pg, r.tsNamespace, domain)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, rolebinding, func(rb *rbacv1.RoleBinding) {
// Labels and subjects might have changed if the Ingress has been updated to use a
// different ProxyGroup.
@@ -963,19 +973,19 @@ func (r *HAIngressReconciler) ensureCertResources(ctx context.Context, pg *tsapi
// cleanupCertResources ensures that the TLS Secret and associated RBAC
// resources that allow proxies to read/write to the Secret are deleted.
func (r *HAIngressReconciler) cleanupCertResources(ctx context.Context, pgName string, name tailcfg.ServiceName) error {
domainName, err := r.dnsNameForService(ctx, tailcfg.ServiceName(name))
func cleanupCertResources(ctx context.Context, cl client.Client, lc localClient, tsNamespace, pgName string, serviceName tailcfg.ServiceName) error {
domainName, err := dnsNameForService(ctx, lc, serviceName)
if err != nil {
return fmt.Errorf("error getting DNS name for Tailscale Service %s: %w", name, err)
return fmt.Errorf("error getting DNS name for Tailscale Service %s: %w", serviceName, err)
}
labels := certResourceLabels(pgName, domainName)
if err := r.DeleteAllOf(ctx, &rbacv1.RoleBinding{}, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels)); err != nil {
if err := cl.DeleteAllOf(ctx, &rbacv1.RoleBinding{}, client.InNamespace(tsNamespace), client.MatchingLabels(labels)); err != nil {
return fmt.Errorf("error deleting RoleBinding for domain name %s: %w", domainName, err)
}
if err := r.DeleteAllOf(ctx, &rbacv1.Role{}, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels)); err != nil {
if err := cl.DeleteAllOf(ctx, &rbacv1.Role{}, client.InNamespace(tsNamespace), client.MatchingLabels(labels)); err != nil {
return fmt.Errorf("error deleting Role for domain name %s: %w", domainName, err)
}
if err := r.DeleteAllOf(ctx, &corev1.Secret{}, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels)); err != nil {
if err := cl.DeleteAllOf(ctx, &corev1.Secret{}, client.InNamespace(tsNamespace), client.MatchingLabels(labels)); err != nil {
return fmt.Errorf("error deleting Secret for domain name %s: %w", domainName, err)
}
return nil
@@ -1018,17 +1028,17 @@ func certSecretRole(pgName, namespace, domain string) *rbacv1.Role {
// certSecretRoleBinding creates a RoleBinding for Role that will allow proxies
// to manage the TLS Secret for the given domain. Domain must be a valid
// Kubernetes resource name.
func certSecretRoleBinding(pgName, namespace, domain string) *rbacv1.RoleBinding {
func certSecretRoleBinding(pg *tsapi.ProxyGroup, namespace, domain string) *rbacv1.RoleBinding {
return &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: domain,
Namespace: namespace,
Labels: certResourceLabels(pgName, domain),
Labels: certResourceLabels(pg.Name, domain),
},
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: pgName,
Name: pgServiceAccountName(pg),
Namespace: namespace,
},
},
@@ -1041,14 +1051,17 @@ func certSecretRoleBinding(pgName, namespace, domain string) *rbacv1.RoleBinding
// certSecret creates a Secret that will store the TLS certificate and private
// key for the given domain. Domain must be a valid Kubernetes resource name.
func certSecret(pgName, namespace, domain string, ing *networkingv1.Ingress) *corev1.Secret {
func certSecret(pgName, namespace, domain string, parent client.Object) *corev1.Secret {
labels := certResourceLabels(pgName, domain)
labels[kubetypes.LabelSecretType] = "certs"
labels[kubetypes.LabelSecretType] = kubetypes.LabelSecretTypeCerts
// Labels that let us identify the Ingress resource lets us reconcile
// the Ingress when the TLS Secret is updated (for example, when TLS
// certs have been provisioned).
labels[LabelParentName] = ing.Name
labels[LabelParentNamespace] = ing.Namespace
labels[LabelParentType] = strings.ToLower(parent.GetObjectKind().GroupVersionKind().Kind)
labels[LabelParentName] = parent.GetName()
if ns := parent.GetNamespace(); ns != "" {
labels[LabelParentNamespace] = ns
}
return &corev1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
@@ -1076,9 +1089,9 @@ func certResourceLabels(pgName, domain string) map[string]string {
}
// dnsNameForService returns the DNS name for the given Tailscale Service's name.
func (r *HAIngressReconciler) dnsNameForService(ctx context.Context, svc tailcfg.ServiceName) (string, error) {
func dnsNameForService(ctx context.Context, lc localClient, svc tailcfg.ServiceName) (string, error) {
s := svc.WithoutPrefix()
tcd, err := r.tailnetCertDomain(ctx)
tcd, err := tailnetCertDomain(ctx, lc)
if err != nil {
return "", fmt.Errorf("error determining DNS name base: %w", err)
}
@@ -1086,14 +1099,14 @@ func (r *HAIngressReconciler) dnsNameForService(ctx context.Context, svc tailcfg
}
// hasCerts checks if the TLS Secret for the given service has non-zero cert and key data.
func (r *HAIngressReconciler) hasCerts(ctx context.Context, svc tailcfg.ServiceName) (bool, error) {
domain, err := r.dnsNameForService(ctx, svc)
func hasCerts(ctx context.Context, cl client.Client, lc localClient, ns string, svc tailcfg.ServiceName) (bool, error) {
domain, err := dnsNameForService(ctx, lc, svc)
if err != nil {
return false, fmt.Errorf("failed to get DNS name for service: %w", err)
}
secret := &corev1.Secret{}
err = r.Get(ctx, client.ObjectKey{
Namespace: r.tsNamespace,
err = cl.Get(ctx, client.ObjectKey{
Namespace: ns,
Name: domain,
}, secret)
if err != nil {

View File

@@ -75,8 +75,13 @@ func TestIngressPGReconciler(t *testing.T) {
// Verify that Role and RoleBinding have been created for the first Ingress.
// Do not verify the cert Secret as that was already verified implicitly above.
pg := &tsapi.ProxyGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pg",
},
}
expectEqual(t, fc, certSecretRole("test-pg", "operator-ns", "my-svc.ts.net"))
expectEqual(t, fc, certSecretRoleBinding("test-pg", "operator-ns", "my-svc.ts.net"))
expectEqual(t, fc, certSecretRoleBinding(pg, "operator-ns", "my-svc.ts.net"))
mustUpdate(t, fc, "default", "test-ingress", func(ing *networkingv1.Ingress) {
ing.Annotations["tailscale.com/tags"] = "tag:custom,tag:test"
@@ -137,7 +142,7 @@ func TestIngressPGReconciler(t *testing.T) {
// Verify that Role and RoleBinding have been created for the second Ingress.
// Do not verify the cert Secret as that was already verified implicitly above.
expectEqual(t, fc, certSecretRole("test-pg", "operator-ns", "my-other-svc.ts.net"))
expectEqual(t, fc, certSecretRoleBinding("test-pg", "operator-ns", "my-other-svc.ts.net"))
expectEqual(t, fc, certSecretRoleBinding(pg, "operator-ns", "my-other-svc.ts.net"))
// Verify first Ingress is still working
verifyServeConfig(t, fc, "svc:my-svc", false)
@@ -186,7 +191,12 @@ func TestIngressPGReconciler(t *testing.T) {
})
expectReconciled(t, ingPGR, "default", "test-ingress")
expectEqual(t, fc, certSecretRole("test-pg-second", "operator-ns", "my-svc.ts.net"))
expectEqual(t, fc, certSecretRoleBinding("test-pg-second", "operator-ns", "my-svc.ts.net"))
pg = &tsapi.ProxyGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pg-second",
},
}
expectEqual(t, fc, certSecretRoleBinding(pg, "operator-ns", "my-svc.ts.net"))
// Delete the first Ingress and verify cleanup
if err := fc.Delete(context.Background(), ing); err != nil {
@@ -515,7 +525,7 @@ func TestIngressPGReconciler_HTTPEndpoint(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "test-pg-0",
Namespace: "operator-ns",
Labels: pgSecretLabels("test-pg", "state"),
Labels: pgSecretLabels("test-pg", kubetypes.LabelSecretTypeState),
},
Data: map[string][]byte{
"_current-profile": []byte("profile-foo"),
@@ -686,6 +696,14 @@ func TestOwnerAnnotations(t *testing.T) {
ownerAnnotation: `{"ownerRefs":[{"operatorID":"operator-2"},{"operatorID":"self-id"}]}`,
},
},
"owned_by_proxygroup": {
svc: &tailscale.VIPService{
Annotations: map[string]string{
ownerAnnotation: `{"ownerRefs":[{"operatorID":"self-id","resource":{"kind":"ProxyGroup","name":"test-pg","uid":"1234-UID"}}]}`,
},
},
wantErr: "owned by another resource",
},
} {
t.Run(name, func(t *testing.T) {
got, err := ownerAnnotations("self-id", tc.svc)
@@ -708,7 +726,7 @@ func populateTLSSecret(ctx context.Context, c client.Client, pgName, domain stri
kubetypes.LabelManaged: "true",
labelProxyGroup: pgName,
labelDomain: domain,
kubetypes.LabelSecretType: "certs",
kubetypes.LabelSecretType: kubetypes.LabelSecretTypeCerts,
},
},
Type: corev1.SecretTypeTLS,
@@ -806,7 +824,7 @@ func verifyTailscaledConfig(t *testing.T, fc client.Client, pgName string, expec
ObjectMeta: metav1.ObjectMeta{
Name: pgConfigSecretName(pgName, 0),
Namespace: "operator-ns",
Labels: pgSecretLabels(pgName, "config"),
Labels: pgSecretLabels(pgName, kubetypes.LabelSecretTypeConfig),
},
Data: map[string][]byte{
tsoperator.TailscaledConfigFileName(pgMinCapabilityVersion): []byte(fmt.Sprintf(`{"Version":""%s}`, expected)),
@@ -845,7 +863,7 @@ func createPGResources(t *testing.T, fc client.Client, pgName string) {
ObjectMeta: metav1.ObjectMeta{
Name: pgConfigSecretName(pgName, 0),
Namespace: "operator-ns",
Labels: pgSecretLabels(pgName, "config"),
Labels: pgSecretLabels(pgName, kubetypes.LabelSecretTypeConfig),
},
Data: map[string][]byte{
tsoperator.TailscaledConfigFileName(pgMinCapabilityVersion): []byte("{}"),

View File

@@ -123,7 +123,7 @@ func main() {
defer s.Close()
restConfig := config.GetConfigOrDie()
if mode != apiServerProxyModeDisabled {
ap, err := apiproxy.NewAPIServerProxy(zlog, restConfig, s, mode == apiServerProxyModeEnabled)
ap, err := apiproxy.NewAPIServerProxy(zlog, restConfig, s, mode == apiServerProxyModeEnabled, true)
if err != nil {
zlog.Fatalf("error creating API server proxy: %v", err)
}
@@ -633,6 +633,32 @@ func runReconcilers(opts reconcilerOpts) {
startlog.Fatalf("could not create Recorder reconciler: %v", err)
}
// API Server Proxy HA Reconciler.
err = builder.
ControllerManagedBy(mgr).
For(&tsapi.ProxyGroup{}, builder.WithPredicates(
predicate.NewPredicateFuncs(func(obj client.Object) bool {
pg, ok := obj.(*tsapi.ProxyGroup)
return ok && pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer
}),
)).
Named("apiserver-proxy-service-reconciler").
Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(kubeAPIServerPGsFromSecret(mgr.GetClient(), startlog))).
Complete(&APIServerProxyServiceReconciler{
Client: mgr.GetClient(),
recorder: eventRecorder,
logger: opts.log.Named("apiserver-proxy-service-reconciler"),
tsClient: opts.tsClient,
tsNamespace: opts.tailscaleNamespace,
lc: lc,
defaultTags: strings.Split(opts.proxyTags, ","),
operatorID: id,
clock: tstime.DefaultClock{},
})
if err != nil {
startlog.Fatalf("could not create API server proxy HA reconciler: %v", err)
}
// ProxyGroup reconciler.
ownedByProxyGroupFilter := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &tsapi.ProxyGroup{})
proxyClassFilterForProxyGroup := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForProxyGroup(mgr.GetClient(), startlog))
@@ -1214,7 +1240,7 @@ func egressEpsFromPGStateSecrets(cl client.Client, ns string) handler.MapFunc {
if parentType := o.GetLabels()[LabelParentType]; parentType != "proxygroup" {
return nil
}
if secretType := o.GetLabels()[kubetypes.LabelSecretType]; secretType != "state" {
if secretType := o.GetLabels()[kubetypes.LabelSecretType]; secretType != kubetypes.LabelSecretTypeState {
return nil
}
pg, ok := o.GetLabels()[LabelParentName]
@@ -1304,7 +1330,7 @@ func reconcileRequestsForPG(pg string, cl client.Client, ns string) []reconcile.
func isTLSSecret(secret *corev1.Secret) bool {
return secret.Type == corev1.SecretTypeTLS &&
secret.ObjectMeta.Labels[kubetypes.LabelManaged] == "true" &&
secret.ObjectMeta.Labels[kubetypes.LabelSecretType] == "certs" &&
secret.ObjectMeta.Labels[kubetypes.LabelSecretType] == kubetypes.LabelSecretTypeCerts &&
secret.ObjectMeta.Labels[labelDomain] != "" &&
secret.ObjectMeta.Labels[labelProxyGroup] != ""
}
@@ -1312,7 +1338,7 @@ func isTLSSecret(secret *corev1.Secret) bool {
func isPGStateSecret(secret *corev1.Secret) bool {
return secret.ObjectMeta.Labels[kubetypes.LabelManaged] == "true" &&
secret.ObjectMeta.Labels[LabelParentType] == "proxygroup" &&
secret.ObjectMeta.Labels[kubetypes.LabelSecretType] == "state"
secret.ObjectMeta.Labels[kubetypes.LabelSecretType] == kubetypes.LabelSecretTypeState
}
// HAIngressesFromSecret returns a handler that returns reconcile requests for
@@ -1394,6 +1420,42 @@ func HAServicesFromSecret(cl client.Client, logger *zap.SugaredLogger) handler.M
}
}
// kubeAPIServerPGsFromSecret finds ProxyGroups of type "kube-apiserver" that
// need to be reconciled after a ProxyGroup-owned Secret is updated.
func kubeAPIServerPGsFromSecret(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
secret, ok := o.(*corev1.Secret)
if !ok {
logger.Infof("[unexpected] Secret handler triggered for an object that is not a Secret")
return nil
}
if secret.ObjectMeta.Labels[kubetypes.LabelManaged] != "true" ||
secret.ObjectMeta.Labels[LabelParentType] != "proxygroup" {
return nil
}
var pg tsapi.ProxyGroup
if err := cl.Get(ctx, types.NamespacedName{Name: secret.ObjectMeta.Labels[LabelParentName]}, &pg); err != nil {
logger.Infof("error getting ProxyGroup %s: %v", secret.ObjectMeta.Labels[LabelParentName], err)
return nil
}
if pg.Spec.Type != tsapi.ProxyGroupTypeKubernetesAPIServer {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: secret.ObjectMeta.Labels[LabelParentNamespace],
Name: secret.ObjectMeta.Labels[LabelParentName],
},
},
}
}
}
// egressSvcsFromEgressProxyGroup is an event handler for egress ProxyGroups. It returns reconcile requests for all
// user-created ExternalName Services that should be exposed on this ProxyGroup.
func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {

View File

@@ -68,8 +68,7 @@ const (
//
// tailcfg.CurrentCapabilityVersion was 106 when the ProxyGroup controller was
// first introduced.
pgMinCapabilityVersion = 106
kubeAPIServerConfigFile = "config.hujson"
pgMinCapabilityVersion = 106
)
var (
@@ -714,7 +713,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
ObjectMeta: metav1.ObjectMeta{
Name: pgConfigSecretName(pg.Name, i),
Namespace: r.tsNamespace,
Labels: pgSecretLabels(pg.Name, "config"),
Labels: pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeConfig),
OwnerReferences: pgOwnerReference(pg),
},
}
@@ -775,13 +774,6 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
}
// AdvertiseServices config is set by ingress-pg-reconciler, so make sure we
// don't overwrite it if already set.
existingAdvertiseServices, err := extractAdvertiseServicesConfig(existingCfgSecret)
if err != nil {
return nil, err
}
if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
hostname := pgHostname(pg, i)
@@ -795,7 +787,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
if !deviceAuthed {
existingCfg := conf.ConfigV1Alpha1{}
if err := json.Unmarshal(existingCfgSecret.Data[kubeAPIServerConfigFile], &existingCfg); err != nil {
if err := json.Unmarshal(existingCfgSecret.Data[kubetypes.KubeAPIServerConfigFile], &existingCfg); err != nil {
return nil, fmt.Errorf("error unmarshalling existing config: %w", err)
}
if existingCfg.AuthKey != nil {
@@ -803,19 +795,40 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
}
}
cfg := conf.VersionedConfig{
Version: "v1alpha1",
ConfigV1Alpha1: &conf.ConfigV1Alpha1{
Hostname: &hostname,
AuthKey: authKey,
State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))),
App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer),
AuthKey: authKey,
KubeAPIServer: &conf.KubeAPIServer{
AuthMode: opt.NewBool(isAuthAPIServerProxy(pg)),
LogLevel: ptr.To(logger.Level().String()),
// Reloadable fields.
Hostname: &hostname,
APIServerProxy: &conf.APIServerProxyConfig{
Enabled: opt.NewBool(true),
AuthMode: opt.NewBool(isAuthAPIServerProxy(pg)),
IssueCerts: opt.NewBool(i == 0),
},
},
}
// Copy over config that the apiserver-proxy-service-reconciler sets.
if existingCfgSecret != nil {
if k8sProxyCfg, ok := cfgSecret.Data[kubetypes.KubeAPIServerConfigFile]; ok {
k8sCfg := &conf.ConfigV1Alpha1{}
if err := json.Unmarshal(k8sProxyCfg, k8sCfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal kube-apiserver config: %w", err)
}
cfg.AdvertiseServices = k8sCfg.AdvertiseServices
if k8sCfg.APIServerProxy != nil {
cfg.APIServerProxy.ServiceName = k8sCfg.APIServerProxy.ServiceName
}
}
}
if r.loginServer != "" {
cfg.ServerURL = &r.loginServer
}
@@ -832,8 +845,15 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
if err != nil {
return nil, fmt.Errorf("error marshalling k8s-proxy config: %w", err)
}
mak.Set(&cfgSecret.Data, kubeAPIServerConfigFile, cfgB)
mak.Set(&cfgSecret.Data, kubetypes.KubeAPIServerConfigFile, cfgB)
} else {
// AdvertiseServices config is set by ingress-pg-reconciler, so make sure we
// don't overwrite it if already set.
existingAdvertiseServices, err := extractAdvertiseServicesConfig(existingCfgSecret)
if err != nil {
return nil, err
}
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[nodePortSvcName], existingAdvertiseServices, r.loginServer)
if err != nil {
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
@@ -1024,16 +1044,16 @@ func extractAdvertiseServicesConfig(cfgSecret *corev1.Secret) ([]string, error)
return nil, nil
}
conf, err := latestConfigFromSecret(cfgSecret)
cfg, err := latestConfigFromSecret(cfgSecret)
if err != nil {
return nil, err
}
if conf == nil {
if cfg == nil {
return nil, nil
}
return conf.AdvertiseServices, nil
return cfg.AdvertiseServices, nil
}
// getNodeMetadata gets metadata for all the pods owned by this ProxyGroup by
@@ -1045,7 +1065,7 @@ func extractAdvertiseServicesConfig(cfgSecret *corev1.Secret) ([]string, error)
func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.ProxyGroup) (metadata []nodeMetadata, _ error) {
// List all state Secrets owned by this ProxyGroup.
secrets := &corev1.SecretList{}
if err := r.List(ctx, secrets, client.InNamespace(r.tsNamespace), client.MatchingLabels(pgSecretLabels(pg.Name, "state"))); err != nil {
if err := r.List(ctx, secrets, client.InNamespace(r.tsNamespace), client.MatchingLabels(pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeState))); err != nil {
return nil, fmt.Errorf("failed to list state Secrets: %w", err)
}
for _, secret := range secrets.Items {

View File

@@ -7,7 +7,6 @@ package main
import (
"fmt"
"path/filepath"
"slices"
"strconv"
"strings"
@@ -342,7 +341,7 @@ func kubeAPIServerStatefulSet(pg *tsapi.ProxyGroup, namespace, image string, por
},
{
Name: "TS_K8S_PROXY_CONFIG",
Value: filepath.Join("/etc/tsconfig/$(POD_NAME)/", kubeAPIServerConfigFile),
Value: "kube:$(POD_NAME)-config",
},
}
@@ -355,20 +354,6 @@ func kubeAPIServerStatefulSet(pg *tsapi.ProxyGroup, namespace, image string, por
return envs
}(),
VolumeMounts: func() []corev1.VolumeMount {
var mounts []corev1.VolumeMount
// TODO(tomhjp): Read config directly from the Secret instead.
for i := range pgReplicas(pg) {
mounts = append(mounts, corev1.VolumeMount{
Name: fmt.Sprintf("k8s-proxy-config-%d", i),
ReadOnly: true,
MountPath: fmt.Sprintf("/etc/tsconfig/%s-%d", pg.Name, i),
})
}
return mounts
}(),
Ports: []corev1.ContainerPort{
{
Name: "k8s-proxy",
@@ -378,21 +363,6 @@ func kubeAPIServerStatefulSet(pg *tsapi.ProxyGroup, namespace, image string, por
},
},
},
Volumes: func() []corev1.Volume {
var volumes []corev1.Volume
for i := range pgReplicas(pg) {
volumes = append(volumes, corev1.Volume{
Name: fmt.Sprintf("k8s-proxy-config-%d", i),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: pgConfigSecretName(pg.Name, i),
},
},
})
}
return volumes
}(),
},
},
},
@@ -426,6 +396,7 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role {
Resources: []string{"secrets"},
Verbs: []string{
"list",
"watch", // For k8s-proxy.
},
},
{
@@ -508,7 +479,7 @@ func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.S
ObjectMeta: metav1.ObjectMeta{
Name: pgStateSecretName(pg.Name, i),
Namespace: namespace,
Labels: pgSecretLabels(pg.Name, "state"),
Labels: pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeState),
OwnerReferences: pgOwnerReference(pg),
},
})

View File

@@ -41,7 +41,7 @@ import (
)
const (
finalizerName = "tailscale.com/service-pg-finalizer"
svcPGFinalizerName = "tailscale.com/service-pg-finalizer"
reasonIngressSvcInvalid = "IngressSvcInvalid"
reasonIngressSvcValid = "IngressSvcValid"
@@ -174,13 +174,13 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
return false, nil
}
if !slices.Contains(svc.Finalizers, finalizerName) {
if !slices.Contains(svc.Finalizers, svcPGFinalizerName) {
// This log line is printed exactly once during initial provisioning,
// because once the finalizer is in place this block gets skipped. So,
// this is a nice place to tell the operator that the high level,
// multi-reconcile operation is underway.
logger.Infof("exposing Service over tailscale")
svc.Finalizers = append(svc.Finalizers, finalizerName)
svc.Finalizers = append(svc.Finalizers, svcPGFinalizerName)
if err := r.Update(ctx, svc); err != nil {
return false, fmt.Errorf("failed to add finalizer: %w", err)
}
@@ -378,7 +378,7 @@ func (r *HAServiceReconciler) maybeProvision(ctx context.Context, hostname strin
// corresponding to this Service.
func (r *HAServiceReconciler) maybeCleanup(ctx context.Context, hostname string, svc *corev1.Service, logger *zap.SugaredLogger) (svcChanged bool, err error) {
logger.Debugf("Ensuring any resources for Service are cleaned up")
ix := slices.Index(svc.Finalizers, finalizerName)
ix := slices.Index(svc.Finalizers, svcPGFinalizerName)
if ix < 0 {
logger.Debugf("no finalizer, nothing to do")
return false, nil
@@ -485,12 +485,12 @@ func (r *HAServiceReconciler) maybeCleanupProxyGroup(ctx context.Context, proxyG
func (r *HAServiceReconciler) deleteFinalizer(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error {
svc.Finalizers = slices.DeleteFunc(svc.Finalizers, func(f string) bool {
return f == finalizerName
return f == svcPGFinalizerName
})
logger.Debugf("ensure %q finalizer is removed", finalizerName)
logger.Debugf("ensure %q finalizer is removed", svcPGFinalizerName)
if err := r.Update(ctx, svc); err != nil {
return fmt.Errorf("failed to remove finalizer %q: %w", finalizerName, err)
return fmt.Errorf("failed to remove finalizer %q: %w", svcPGFinalizerName, err)
}
r.mu.Lock()
defer r.mu.Unlock()
@@ -653,7 +653,7 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
// Get all config Secrets for this ProxyGroup.
// Get all Pods
secrets := &corev1.SecretList{}
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "config"))); err != nil {
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, kubetypes.LabelSecretTypeConfig))); err != nil {
return fmt.Errorf("failed to list config Secrets: %w", err)
}
@@ -720,7 +720,7 @@ func (a *HAServiceReconciler) maybeUpdateAdvertiseServicesConfig(ctx context.Con
func (a *HAServiceReconciler) numberPodsAdvertising(ctx context.Context, pgName string, serviceName tailcfg.ServiceName) (int, error) {
// Get all state Secrets for this ProxyGroup.
secrets := &corev1.SecretList{}
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, "state"))); err != nil {
if err := a.List(ctx, secrets, client.InNamespace(a.tsNamespace), client.MatchingLabels(pgSecretLabels(pgName, kubetypes.LabelSecretTypeState))); err != nil {
return 0, fmt.Errorf("failed to list ProxyGroup %q state Secrets: %w", pgName, err)
}

View File

@@ -26,6 +26,7 @@ import (
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tstest"
"tailscale.com/types/ptr"
"tailscale.com/util/mak"
@@ -139,7 +140,7 @@ func setupServiceTest(t *testing.T) (*HAServiceReconciler, *corev1.Secret, clien
ObjectMeta: metav1.ObjectMeta{
Name: pgConfigSecretName("test-pg", 0),
Namespace: "operator-ns",
Labels: pgSecretLabels("test-pg", "config"),
Labels: pgSecretLabels("test-pg", kubetypes.LabelSecretTypeConfig),
},
Data: map[string][]byte{
tsoperator.TailscaledConfigFileName(pgMinCapabilityVersion): []byte(`{"Version":""}`),
@@ -298,12 +299,12 @@ func TestServicePGReconciler_MultiCluster(t *testing.T) {
t.Fatalf("getting Tailscale Service: %v", err)
}
if len(tsSvcs) != 1 {
t.Fatalf("unexpected number of Tailscale Services (%d)", len(tsSvcs))
if len(tsSvcs.VIPServices) != 1 {
t.Fatalf("unexpected number of Tailscale Services (%d)", len(tsSvcs.VIPServices))
}
for name := range tsSvcs {
t.Logf("found Tailscale Service with name %q", name.String())
for _, svc := range tsSvcs.VIPServices {
t.Logf("found Tailscale Service with name %q", svc.Name)
}
}
}
@@ -336,7 +337,7 @@ func TestIgnoreRegularService(t *testing.T) {
tsSvcs, err := ft.ListVIPServices(context.Background())
if err == nil {
if len(tsSvcs) > 0 {
if len(tsSvcs.VIPServices) > 0 {
t.Fatal("unexpected Tailscale Services found")
}
}

View File

@@ -891,13 +891,17 @@ func (c *fakeTSClient) GetVIPService(ctx context.Context, name tailcfg.ServiceNa
return svc, nil
}
func (c *fakeTSClient) ListVIPServices(ctx context.Context) (map[tailcfg.ServiceName]*tailscale.VIPService, error) {
func (c *fakeTSClient) ListVIPServices(ctx context.Context) (*tailscale.VIPServiceList, error) {
c.Lock()
defer c.Unlock()
if c.vipServices == nil {
return nil, &tailscale.ErrResponse{Status: http.StatusNotFound}
}
return c.vipServices, nil
result := &tailscale.VIPServiceList{}
for _, svc := range c.vipServices {
result.VIPServices = append(result.VIPServices, *svc)
}
return result, nil
}
func (c *fakeTSClient) CreateOrUpdateVIPService(ctx context.Context, svc *tailscale.VIPService) error {

View File

@@ -56,6 +56,8 @@ type tsClient interface {
DeleteDevice(ctx context.Context, nodeStableID string) error
// GetVIPService is a method for getting a Tailscale Service. VIPService is the original name for Tailscale Service.
GetVIPService(ctx context.Context, name tailcfg.ServiceName) (*tailscale.VIPService, error)
// ListVIPServices is a method for list all Tailscale Services. VIPService is the original name for Tailscale Service.
ListVIPServices(ctx context.Context) (*tailscale.VIPServiceList, error)
// CreateOrUpdateVIPService is a method for creating or updating a Tailscale Service.
CreateOrUpdateVIPService(ctx context.Context, svc *tailscale.VIPService) error
// DeleteVIPService is a method for deleting a Tailscale Service.

View File

@@ -0,0 +1,243 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package config provides watchers for the various supported ways to load a
// config file for k8s-proxy; currently file or Kubernetes Secret.
package config
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/util/testenv"
)
const namespacePath = "var/run/secrets/kubernetes.io/serviceaccount/namespace"
type configLoader struct {
logger *zap.SugaredLogger
client clientcorev1.CoreV1Interface
root string // Root path, field exists for testing.
cfgChan chan<- *conf.Config
previous []byte
once sync.Once // For use in tests. To close cfgIgnored.
cfgIgnored chan struct{} // For use in tests.
}
func NewConfigLoader(logger *zap.SugaredLogger, client clientcorev1.CoreV1Interface, cfgChan chan<- *conf.Config) *configLoader {
return &configLoader{
logger: logger,
client: client,
root: "/",
cfgChan: cfgChan,
}
}
func (l *configLoader) WatchConfig(ctx context.Context, path string) error {
secretName, isKubeSecret := strings.CutPrefix(path, "kube:")
if isKubeSecret {
if err := l.watchConfigSecretChanges(ctx, secretName); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error watching config Secret %q: %w", secretName, err)
}
return nil
}
if err := l.watchConfigFileChanges(ctx, path); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error watching config file %q: %w", path, err)
}
return nil
}
func (l *configLoader) reloadConfig(ctx context.Context, raw []byte) error {
if bytes.Equal(raw, l.previous) {
if l.cfgIgnored != nil && testenv.InTest() {
l.once.Do(func() {
close(l.cfgIgnored)
})
}
return nil
}
cfg, err := conf.Load(raw)
if err != nil {
return fmt.Errorf("error loading config: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case l.cfgChan <- &cfg:
}
l.previous = raw
return nil
}
func (l *configLoader) watchConfigFileChanges(ctx context.Context, path string) error {
var (
tickChan <-chan time.Time
eventChan <-chan fsnotify.Event
errChan <-chan error
)
if w, err := fsnotify.NewWatcher(); err != nil {
// Creating a new fsnotify watcher would fail for example if inotify was not able to create a new file descriptor.
// See https://github.com/tailscale/tailscale/issues/15081
l.logger.Infof("Failed to create fsnotify watcher on config file %q; watching for changes on 5s timer: %v", path, err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
dir := filepath.Dir(path)
file := filepath.Base(path)
l.logger.Infof("Watching directory %q for changes to config file %q", dir, file)
defer w.Close()
if err := w.Add(dir); err != nil {
return fmt.Errorf("failed to add fsnotify watch: %w", err)
}
eventChan = w.Events
errChan = w.Errors
}
// Read the initial config file, but after the watcher is already set up to
// avoid an unlucky race condition if the config file is edited in between.
b, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading config file %q: %w", path, err)
}
if err := l.reloadConfig(ctx, b); err != nil {
return fmt.Errorf("error loading initial config file %q: %w", path, err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-errChan:
if !ok {
// Watcher was closed.
return nil
}
return fmt.Errorf("watcher error: %w", err)
case <-tickChan:
case ev, ok := <-eventChan:
if !ok {
// Watcher was closed.
return nil
}
if ev.Name != path || ev.Op&fsnotify.Write == 0 {
// Ignore irrelevant events.
continue
}
}
b, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading config file: %w", err)
}
// Writers such as os.WriteFile may truncate the file before writing
// new contents, so it's possible to read an empty file if we read before
// the write has completed.
if len(b) == 0 {
continue
}
if err := l.reloadConfig(ctx, b); err != nil {
return fmt.Errorf("error reloading config file %q: %v", path, err)
}
}
}
func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretName string) error {
namespace, err := os.ReadFile(filepath.Join(l.root, namespacePath))
if err != nil {
return fmt.Errorf("error reading namespace from %q: %w", namespacePath, err)
}
secrets := l.client.Secrets(string(namespace))
w, err := secrets.Watch(ctx, metav1.ListOptions{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err)
}
defer w.Stop()
// Get the initial config Secret now we've got the watcher set up.
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get config Secret %q: %w", secretName, err)
}
if err := l.configFromSecret(ctx, secret); err != nil {
return fmt.Errorf("error loading initial config: %w", err)
}
l.logger.Infof("Watching config Secret %q for changes", secretName)
for {
var secret *corev1.Secret
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-w.ResultChan():
switch ev.Type {
case watch.Added, watch.Modified:
// New config available to load.
var ok bool
secret, ok = ev.Object.(*corev1.Secret)
if !ok {
return fmt.Errorf("unexpected object type %T in watch event for config Secret %q", ev.Object, secretName)
}
if secret == nil || secret.Data == nil {
continue
}
case watch.Deleted, watch.Bookmark:
// Ignore, no action required.
continue
case watch.Error:
return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object)
}
}
if err := l.configFromSecret(ctx, secret); err != nil {
return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err)
}
}
}
func (l *configLoader) configFromSecret(ctx context.Context, s *corev1.Secret) error {
b := s.Data[kubetypes.KubeAPIServerConfigFile]
if len(b) == 0 {
return fmt.Errorf("config Secret %q does not contain expected config in key %q", s.Name, kubetypes.KubeAPIServerConfigFile)
}
if err := l.reloadConfig(ctx, b); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,191 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package config
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/types/ptr"
)
func TestWatchConfig(t *testing.T) {
type phase struct {
config string
cancel bool
expectedConf *conf.ConfigV1Alpha1
expectedErr string
}
// Same set of behaviour tests for each config source.
for _, env := range []string{"file", "kube"} {
t.Run(env, func(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
initialConfig string
phases []phase
}{
{
name: "no_config",
phases: []phase{{
expectedErr: "error loading initial config",
}},
},
{
name: "valid_config",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
}},
},
{
name: "can_cancel",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
cancel: true,
},
},
},
{
name: "can_reload",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
config: `{"version": "v1alpha1", "authKey": "def456"}`,
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("def456"),
},
},
},
},
{
name: "ignores_events_with_no_changes",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
config: `{"version": "v1alpha1", "authKey": "abc123"}`,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
root := t.TempDir()
cl := fake.NewClientset()
var cfgPath string
var writeFile func(*testing.T, string)
if env == "file" {
cfgPath = filepath.Join(root, kubetypes.KubeAPIServerConfigFile)
writeFile = func(t *testing.T, content string) {
if err := os.WriteFile(cfgPath, []byte(content), 0o644); err != nil {
t.Fatalf("error writing config file %q: %v", cfgPath, err)
}
}
} else {
cfgPath = "kube:config-secret"
nsFilePath := filepath.Join(root, namespacePath)
if err := os.MkdirAll(filepath.Dir(nsFilePath), 0o755); err != nil {
t.Fatalf("error creating namespace directory: %v", err)
}
if err := os.WriteFile(nsFilePath, []byte("default"), 0o644); err != nil {
t.Fatalf("error writing namespace file: %v", err)
}
writeFile = func(t *testing.T, content string) {
s := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "config-secret",
},
Data: map[string][]byte{
kubetypes.KubeAPIServerConfigFile: []byte(content),
},
}
if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil {
if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil {
t.Fatalf("error writing config Secret %q: %v", cfgPath, updateErr)
}
}
}
}
configChan := make(chan *conf.Config)
l := NewConfigLoader(zap.Must(zap.NewDevelopment()).Sugar(), cl.CoreV1(), configChan)
l.root = root
l.cfgIgnored = make(chan struct{})
errs := make(chan error)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
writeFile(t, tc.initialConfig)
go func() {
errs <- l.WatchConfig(ctx, cfgPath)
}()
for i, p := range tc.phases {
if p.config != "" {
writeFile(t, p.config)
}
if p.cancel {
cancel()
}
select {
case cfg := <-configChan:
if diff := cmp.Diff(*p.expectedConf, cfg.Parsed); diff != "" {
t.Errorf("unexpected config (-want +got):\n%s", diff)
}
case err := <-errs:
if p.cancel {
if err != nil {
t.Fatalf("unexpected error after cancel: %v", err)
}
} else if p.expectedErr == "" {
t.Fatalf("unexpected error: %v", err)
} else if !strings.Contains(err.Error(), p.expectedErr) {
t.Fatalf("expected error to contain %q, got %q", p.expectedErr, err.Error())
}
case <-l.cfgIgnored:
if p.expectedConf != nil {
t.Fatalf("expected config to be reloaded, but got ignored signal")
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for expected event in phase: %d", i)
}
}
})
}
})
}
}

View File

@@ -14,6 +14,7 @@ import (
"fmt"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"
@@ -21,20 +22,37 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/strings/slices"
"tailscale.com/client/local"
"tailscale.com/cmd/k8s-proxy/internal/config"
"tailscale.com/hostinfo"
"tailscale.com/ipn"
"tailscale.com/ipn/store"
apiproxy "tailscale.com/k8s-operator/api-proxy"
"tailscale.com/kube/certs"
"tailscale.com/kube/k8s-proxy/conf"
klc "tailscale.com/kube/localclient"
"tailscale.com/kube/services"
"tailscale.com/kube/state"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
)
func main() {
logger := zap.Must(zap.NewProduction()).Sugar()
encoderCfg := zap.NewProductionEncoderConfig()
encoderCfg.EncodeTime = zapcore.RFC3339TimeEncoder
logger := zap.Must(zap.Config{
Level: zap.NewAtomicLevelAt(zap.DebugLevel),
Encoding: "json",
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
EncoderConfig: encoderCfg,
}.Build()).Sugar()
defer logger.Sync()
if err := run(logger); err != nil {
logger.Fatal(err.Error())
}
@@ -42,18 +60,58 @@ func main() {
func run(logger *zap.SugaredLogger) error {
var (
configFile = os.Getenv("TS_K8S_PROXY_CONFIG")
configPath = os.Getenv("TS_K8S_PROXY_CONFIG")
podUID = os.Getenv("POD_UID")
)
if configFile == "" {
if configPath == "" {
return errors.New("TS_K8S_PROXY_CONFIG unset")
}
// TODO(tomhjp): Support reloading config.
// TODO(tomhjp): Support reading config from a Secret.
cfg, err := conf.Load(configFile)
// serveCtx to live for the lifetime of the process, only gets cancelled
// once the Tailscale Service has been drained
serveCtx, serveCancel := context.WithCancel(context.Background())
defer serveCancel()
// ctx to cancel to start the shutdown process.
ctx, cancel := context.WithCancel(serveCtx)
defer cancel()
sigsChan := make(chan os.Signal, 1)
signal.Notify(sigsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case <-ctx.Done():
case s := <-sigsChan:
logger.Infof("Received shutdown signal %s, exiting", s)
cancel()
}
}()
var group *errgroup.Group
group, ctx = errgroup.WithContext(ctx)
restConfig, err := getRestConfig(logger)
if err != nil {
return fmt.Errorf("error loading config file %q: %w", configFile, err)
return fmt.Errorf("error getting rest config: %w", err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error creating Kubernetes clientset: %w", err)
}
// Load and watch config.
cfgChan := make(chan *conf.Config)
cfgLoader := config.NewConfigLoader(logger, clientset.CoreV1(), cfgChan)
group.Go(func() error {
return cfgLoader.WatchConfig(ctx, configPath)
})
// Get initial config.
var cfg *conf.Config
select {
case <-ctx.Done():
return group.Wait()
case cfg = <-cfgChan:
}
if cfg.Parsed.LogLevel != nil {
@@ -82,6 +140,14 @@ func run(logger *zap.SugaredLogger) error {
hostinfo.SetApp(*cfg.Parsed.App)
}
// TODO(tomhjp): Pass this setting directly into the store instead of using
// environment variables.
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
os.Setenv("TS_CERT_SHARE_MODE", "rw")
} else {
os.Setenv("TS_CERT_SHARE_MODE", "ro")
}
st, err := getStateStore(cfg.Parsed.State, logger)
if err != nil {
return err
@@ -115,10 +181,6 @@ func run(logger *zap.SugaredLogger) error {
ts.Hostname = *cfg.Parsed.Hostname
}
// ctx to live for the lifetime of the process.
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Make sure we crash loop if Up doesn't complete in reasonable time.
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
defer upCancel()
@@ -126,9 +188,6 @@ func run(logger *zap.SugaredLogger) error {
return fmt.Errorf("error starting tailscale server: %w", err)
}
defer ts.Close()
group, groupCtx := errgroup.WithContext(ctx)
lc, err := ts.LocalClient()
if err != nil {
return fmt.Errorf("error getting local client: %w", err)
@@ -136,23 +195,13 @@ func run(logger *zap.SugaredLogger) error {
// Setup for updating state keys.
if podUID != "" {
w, err := lc.WatchIPNBus(groupCtx, ipn.NotifyInitialNetMap)
if err != nil {
return fmt.Errorf("error watching IPN bus: %w", err)
}
defer w.Close()
group.Go(func() error {
if err := state.KeepKeysUpdated(st, w.Next); err != nil && err != groupCtx.Err() {
return fmt.Errorf("error keeping state keys updated: %w", err)
}
return nil
return state.KeepKeysUpdated(ctx, st, klc.New(lc))
})
}
if cfg.Parsed.AcceptRoutes != nil {
_, err = lc.EditPrefs(groupCtx, &ipn.MaskedPrefs{
_, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{
RouteAllSet: true,
Prefs: ipn.Prefs{RouteAll: *cfg.Parsed.AcceptRoutes},
})
@@ -161,34 +210,97 @@ func run(logger *zap.SugaredLogger) error {
}
}
// Setup for the API server proxy.
restConfig, err := getRestConfig(logger)
if err != nil {
return fmt.Errorf("error getting rest config: %w", err)
// TODO(tomhjp): There seems to be a bug that on restart the device does
// not get reassigned it's already working Service IPs unless we clear and
// reset the serve config.
if err := lc.SetServeConfig(ctx, &ipn.ServeConfig{}); err != nil {
return fmt.Errorf("error clearing existing ServeConfig: %w", err)
}
authMode := true
if cfg.Parsed.KubeAPIServer != nil {
v, ok := cfg.Parsed.KubeAPIServer.AuthMode.Get()
if ok {
authMode = v
var cm *certs.CertManager
if shouldIssueCerts(cfg) {
logger.Infof("Will issue TLS certs for Tailscale Service")
cm = certs.NewCertManager(klc.New(lc), logger.Infof)
}
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return err
}
if cfg.Parsed.AdvertiseServices != nil {
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{
AdvertiseServices: cfg.Parsed.AdvertiseServices,
},
}); err != nil {
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
}
}
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, authMode)
// Setup for the API server proxy.
authMode := true
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.AuthMode.EqualBool(false) {
authMode = false
}
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, authMode, false)
if err != nil {
return fmt.Errorf("error creating api server proxy: %w", err)
}
// TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not,
// and possibly issue certs upfront here before serving.
group.Go(func() error {
if err := ap.Run(groupCtx); err != nil {
if err := ap.Run(serveCtx); err != nil {
return fmt.Errorf("error running API server proxy: %w", err)
}
return nil
})
return group.Wait()
for {
select {
case <-ctx.Done():
// Context cancelled, exit.
logger.Info("Context cancelled, exiting")
shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second)
unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof)
shutdownCancel()
serveCancel()
return errors.Join(unadvertiseErr, group.Wait())
case cfg = <-cfgChan:
// Handle config reload.
// TODO(tomhjp): Make auth mode reloadable.
var prefs ipn.MaskedPrefs
cfgLogger := logger
currentPrefs, err := lc.GetPrefs(ctx)
if err != nil {
return fmt.Errorf("error getting current prefs: %w", err)
}
if !slices.Equal(currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices) {
cfgLogger = cfgLogger.With("AdvertiseServices", fmt.Sprintf("%v -> %v", currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices))
prefs.AdvertiseServicesSet = true
prefs.Prefs.AdvertiseServices = cfg.Parsed.AdvertiseServices
}
if cfg.Parsed.Hostname != nil && *cfg.Parsed.Hostname != currentPrefs.Hostname {
cfgLogger = cfgLogger.With("Hostname", fmt.Sprintf("%s -> %s", currentPrefs.Hostname, *cfg.Parsed.Hostname))
prefs.HostnameSet = true
prefs.Hostname = *cfg.Parsed.Hostname
}
if cfg.Parsed.AcceptRoutes != nil && *cfg.Parsed.AcceptRoutes != currentPrefs.RouteAll {
cfgLogger = cfgLogger.With("AcceptRoutes", fmt.Sprintf("%v -> %v", currentPrefs.RouteAll, *cfg.Parsed.AcceptRoutes))
prefs.RouteAllSet = true
prefs.Prefs.RouteAll = *cfg.Parsed.AcceptRoutes
}
if !prefs.IsEmpty() {
if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
return fmt.Errorf("error editing prefs: %w", err)
}
}
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return fmt.Errorf("error setting serve config: %w", err)
}
cfgLogger.Infof("Config reloaded")
}
}
}
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
@@ -226,3 +338,79 @@ func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
return restConfig, nil
}
func apiServerProxyService(cfg *conf.Config) tailcfg.ServiceName {
if cfg.Parsed.APIServerProxy != nil &&
cfg.Parsed.APIServerProxy.Enabled.EqualBool(true) &&
cfg.Parsed.APIServerProxy.ServiceName != nil &&
*cfg.Parsed.APIServerProxy.ServiceName != "" {
return tailcfg.ServiceName(*cfg.Parsed.APIServerProxy.ServiceName)
}
return ""
}
func shouldIssueCerts(cfg *conf.Config) bool {
return cfg.Parsed.APIServerProxy != nil &&
cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true)
}
// setServeConfig sets up serve config such that it's serving for the passed in
// Tailscale Service, and does nothing if it's already up to date.
func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager, name tailcfg.ServiceName) error {
existingServeConfig, err := lc.GetServeConfig(ctx)
if err != nil {
return fmt.Errorf("error getting existing serve config: %w", err)
}
// Ensure serve config is cleared if no Tailscale Service.
if name == "" {
if reflect.DeepEqual(*existingServeConfig, ipn.ServeConfig{}) {
// Already up to date.
return nil
}
if cm != nil {
cm.EnsureCertLoops(ctx, &ipn.ServeConfig{})
}
return lc.SetServeConfig(ctx, &ipn.ServeConfig{})
}
status, err := lc.StatusWithoutPeers(ctx)
if err != nil {
return fmt.Errorf("error getting local client status: %w", err)
}
serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))
serveConfig := ipn.ServeConfig{
// Configure for the Service hostname.
Services: map[tailcfg.ServiceName]*ipn.ServiceConfig{
name: {
TCP: map[uint16]*ipn.TCPPortHandler{
443: {
HTTPS: true,
},
},
Web: map[ipn.HostPort]*ipn.WebServerConfig{
serviceHostPort: {
Handlers: map[string]*ipn.HTTPHandler{
"/": {
Proxy: fmt.Sprintf("http://%s:80", strings.TrimSuffix(status.Self.DNSName, ".")),
},
},
},
},
},
},
}
if reflect.DeepEqual(*existingServeConfig, serveConfig) {
// Already up to date.
return nil
}
if cm != nil {
cm.EnsureCertLoops(ctx, &serveConfig)
}
return lc.SetServeConfig(ctx, &serveConfig)
}

View File

@@ -36,6 +36,11 @@ type VIPService struct {
Tags []string `json:"tags,omitempty"`
}
// VIPServiceList represents the JSON response to the list VIP Services API.
type VIPServiceList struct {
VIPServices []VIPService `json:"vipServices"`
}
// GetVIPService retrieves a VIPService by its name. It returns 404 if the VIPService is not found.
func (client *Client) GetVIPService(ctx context.Context, name tailcfg.ServiceName) (*VIPService, error) {
path := client.BuildTailnetURL("vip-services", name.String())
@@ -59,6 +64,29 @@ func (client *Client) GetVIPService(ctx context.Context, name tailcfg.ServiceNam
return svc, nil
}
// ListVIPServices retrieves lists all existing Services.
func (client *Client) ListVIPServices(ctx context.Context) (*VIPServiceList, error) {
path := client.BuildTailnetURL("vip-services")
req, err := http.NewRequestWithContext(ctx, httpm.GET, path, nil)
if err != nil {
return nil, fmt.Errorf("error creating new HTTP request: %w", err)
}
b, resp, err := SendRequest(client, req)
if err != nil {
return nil, fmt.Errorf("error making Tailsale API request: %w", err)
}
// If status code was not successful, return the error.
// TODO: Change the check for the StatusCode to include other 2XX success codes.
if resp.StatusCode != http.StatusOK {
return nil, HandleErrorResponse(b, resp)
}
result := &VIPServiceList{}
if err := json.Unmarshal(b, result); err != nil {
return nil, err
}
return result, nil
}
// CreateOrUpdateVIPService creates or updates a VIPService by its name. Caller must ensure that, if the
// VIPService already exists, the VIPService is fetched first to ensure that any auto-allocated IP addresses are not
// lost during the update. If the VIPService was created without any IP addresses explicitly set (so that they were

View File

@@ -394,8 +394,8 @@ func (s *Store) canPatchSecret(secret string) bool {
// certSecretSelector returns a label selector that can be used to list all
// Secrets that aren't Tailscale state Secrets and contain TLS certificates for
// HTTPS endpoints that this node serves.
// Currently (3/2025) this only applies to the Kubernetes Operator's ingress
// ProxyGroup.
// Currently (7/2025) this only applies to the Kubernetes Operator's ProxyGroup
// when spec.Type is "ingress" or "kube-apiserver".
func (s *Store) certSecretSelector() map[string]string {
if s.podName == "" {
return map[string]string{}
@@ -406,7 +406,7 @@ func (s *Store) certSecretSelector() map[string]string {
}
pgName := s.podName[:p]
return map[string]string{
kubetypes.LabelSecretType: "certs",
kubetypes.LabelSecretType: kubetypes.LabelSecretTypeCerts,
kubetypes.LabelManaged: "true",
"tailscale.com/proxy-group": pgName,
}

View File

@@ -17,6 +17,7 @@ import (
"tailscale.com/ipn/store/mem"
"tailscale.com/kube/kubeapi"
"tailscale.com/kube/kubeclient"
"tailscale.com/kube/kubetypes"
)
func TestWriteState(t *testing.T) {
@@ -516,7 +517,7 @@ func TestNewWithClient(t *testing.T) {
)
certSecretsLabels := map[string]string{
"tailscale.com/secret-type": "certs",
"tailscale.com/secret-type": kubetypes.LabelSecretTypeCerts,
"tailscale.com/managed": "true",
"tailscale.com/proxy-group": "ingress-proxies",
}
@@ -582,7 +583,7 @@ func TestNewWithClient(t *testing.T) {
makeSecret("app2.tailnetxyz.ts.net", certSecretsLabels, "2"),
makeSecret("some-other-secret", nil, "3"),
makeSecret("app3.other-proxies.ts.net", map[string]string{
"tailscale.com/secret-type": "certs",
"tailscale.com/secret-type": kubetypes.LabelSecretTypeCerts,
"tailscale.com/managed": "true",
"tailscale.com/proxy-group": "some-other-proxygroup",
}, "4"),
@@ -606,7 +607,7 @@ func TestNewWithClient(t *testing.T) {
makeSecret("app2.tailnetxyz.ts.net", certSecretsLabels, "2"),
makeSecret("some-other-secret", nil, "3"),
makeSecret("app3.other-proxies.ts.net", map[string]string{
"tailscale.com/secret-type": "certs",
"tailscale.com/secret-type": kubetypes.LabelSecretTypeCerts,
"tailscale.com/managed": "true",
"tailscale.com/proxy-group": "some-other-proxygroup",
}, "4"),

View File

@@ -10,6 +10,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/netip"
@@ -46,7 +47,7 @@ var (
// caller's Tailscale identity and the rules defined in the tailnet ACLs.
// - false: the proxy is started and requests are passed through to the
// Kubernetes API without any auth modifications.
func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsnet.Server, authMode bool) (*APIServerProxy, error) {
func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsnet.Server, authMode bool, https bool) (*APIServerProxy, error) {
if !authMode {
restConfig = rest.AnonymousClientConfig(restConfig)
}
@@ -85,6 +86,7 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn
log: zlog,
lc: lc,
authMode: authMode,
https: https,
upstreamURL: u,
ts: ts,
}
@@ -104,11 +106,6 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn
//
// It return when ctx is cancelled or ServeTLS fails.
func (ap *APIServerProxy) Run(ctx context.Context) error {
ln, err := ap.ts.Listen("tcp", ":443")
if err != nil {
return fmt.Errorf("could not listen on :443: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("/", ap.serveDefault)
mux.HandleFunc("POST /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecSPDY)
@@ -117,32 +114,61 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachWS)
ap.hs = &http.Server{
Handler: mux,
ErrorLog: zap.NewStdLog(ap.log.Desugar()),
}
mode := "noauth"
if ap.authMode {
mode = "auth"
}
var tsLn net.Listener
var serve func(ln net.Listener) error
if ap.https {
var err error
tsLn, err = ap.ts.Listen("tcp", ":443")
if err != nil {
return fmt.Errorf("could not listen on :443: %w", err)
}
serve = func(ln net.Listener) error {
return ap.hs.ServeTLS(ln, "", "")
}
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
// incompatible with HTTP/2; so disable HTTP/2 in the proxy.
TLSConfig: &tls.Config{
ap.hs.TLSConfig = &tls.Config{
GetCertificate: ap.lc.GetCertificate,
NextProtos: []string{"http/1.1"},
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
Handler: mux,
}
ap.hs.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler))
} else {
var err error
tsLn, err = ap.ts.Listen("tcp", ":80")
if err != nil {
return fmt.Errorf("could not listen on :80: %w", err)
}
serve = ap.hs.Serve
}
errs := make(chan error)
go func() {
ap.log.Infof("API server proxy is listening on %s with auth mode: %v", ln.Addr(), ap.authMode)
if err := ap.hs.ServeTLS(ln, "", ""); err != nil && err != http.ErrServerClosed {
errs <- fmt.Errorf("failed to serve: %w", err)
ap.log.Infof("API server proxy in %q mode is listening on %s", mode, tsLn.Addr())
if err := serve(tsLn); err != nil && err != http.ErrServerClosed {
errs <- fmt.Errorf("error serving: %w", err)
}
}()
select {
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return ap.hs.Shutdown(shutdownCtx)
case err := <-errs:
ap.hs.Close()
return err
}
// Graceful shutdown with a timeout of 10s.
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return ap.hs.Shutdown(shutdownCtx)
}
// APIServerProxy is an [net/http.Handler] that authenticates requests using the Tailscale
@@ -152,7 +178,8 @@ type APIServerProxy struct {
lc *local.Client
rp *httputil.ReverseProxy
authMode bool
authMode bool // Whether to run with impersonation using caller's tailnet identity.
https bool // Whether to serve on https for the device hostname; true for k8s-operator, false for k8s-proxy.
ts *tsnet.Server
hs *http.Server
upstreamURL *url.URL
@@ -181,13 +208,13 @@ func (ap *APIServerProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
ap.sessionForProto(w, r, ksr.ExecSessionType, ksr.WSProtocol)
}
// serveExecSPDY serves '/attach' requests for sessions streamed over SPDY,
// serveAttachSPDY serves '/attach' requests for sessions streamed over SPDY,
// optionally configuring the kubectl exec sessions to be recorded.
func (ap *APIServerProxy) serveAttachSPDY(w http.ResponseWriter, r *http.Request) {
ap.sessionForProto(w, r, ksr.AttachSessionType, ksr.SPDYProtocol)
}
// serveExecWS serves '/attach' requests for sessions streamed over WebSocket,
// serveAttachWS serves '/attach' requests for sessions streamed over WebSocket,
// optionally configuring the kubectl exec sessions to be recorded.
func (ap *APIServerProxy) serveAttachWS(w http.ResponseWriter, r *http.Request) {
ap.sessionForProto(w, r, ksr.AttachSessionType, ksr.WSProtocol)

View File

@@ -342,6 +342,7 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `mode` _[APIServerProxyMode](#apiserverproxymode)_ | Mode to run the API server proxy in. Supported modes are auth and noauth.<br />In auth mode, requests from the tailnet proxied over to the Kubernetes<br />API server are additionally impersonated using the sender's tailnet identity.<br />If not specified, defaults to auth mode. | | Enum: [auth noauth] <br />Type: string <br /> |
| `serviceName` _string_ | ServiceName is the name of the Tailscale Service to create. Must have a<br />prefix of "svc:" and the remaining characters must be a valid DNS label<br />no longer than 63 characters. If not specified, a name will be generated<br />based on the ProxyGroup name. | | Pattern: `^svc:[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$` <br />Type: string <br /> |
#### LabelValue

View File

@@ -226,4 +226,7 @@ const (
IngressSvcValid ConditionType = `TailscaleIngressSvcValid`
IngressSvcConfigured ConditionType = `TailscaleIngressSvcConfigured`
PGTailscaleServiceValid ConditionType = `TailscaleServiceValid` // The Tailscale Service for the ProxyGroup is valid.
PGTailscaleServiceConfigured ConditionType = `TailscaleServiceConfigured` // At least one of the ProxyGroup's Pods is advertising the Tailscale Service.
)

View File

@@ -157,4 +157,13 @@ type KubeAPIServerConfig struct {
// If not specified, defaults to auth mode.
// +optional
Mode *APIServerProxyMode `json:"mode,omitempty"`
// ServiceName is the name of the Tailscale Service to create. Must have a
// prefix of "svc:" and the remaining characters must be a valid DNS label
// no longer than 63 characters. If not specified, a name will be generated
// based on the ProxyGroup name.
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Pattern=`^svc:[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$`
// +optional
ServiceName string `json:"serviceName,omitempty"`
}

View File

@@ -1,29 +1,32 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
// Package certs implements logic to help multiple Kubernetes replicas share TLS
// certs for a common Tailscale Service.
package certs
import (
"context"
"fmt"
"log"
"net"
"slices"
"sync"
"time"
"tailscale.com/ipn"
"tailscale.com/kube/localclient"
"tailscale.com/types/logger"
"tailscale.com/util/goroutines"
"tailscale.com/util/mak"
)
// certManager is responsible for issuing certificates for known domains and for
// CertManager is responsible for issuing certificates for known domains and for
// maintaining a loop that re-attempts issuance daily.
// Currently cert manager logic is only run on ingress ProxyGroup replicas that are responsible for managing certs for
// HA Ingress HTTPS endpoints ('write' replicas).
type certManager struct {
lc localClient
type CertManager struct {
lc localclient.LocalClient
logf logger.Logf
tracker goroutines.Tracker // tracks running goroutines
mu sync.Mutex // guards the following
// certLoops contains a map of DNS names, for which we currently need to
@@ -32,11 +35,18 @@ type certManager struct {
certLoops map[string]context.CancelFunc
}
// ensureCertLoops ensures that, for all currently managed Service HTTPS
func NewCertManager(lc localclient.LocalClient, logf logger.Logf) *CertManager {
return &CertManager{
lc: lc,
logf: logf,
}
}
// EnsureCertLoops ensures that, for all currently managed Service HTTPS
// endpoints, there is a cert loop responsible for issuing and ensuring the
// renewal of the TLS certs.
// ServeConfig must not be nil.
func (cm *certManager) ensureCertLoops(ctx context.Context, sc *ipn.ServeConfig) error {
func (cm *CertManager) EnsureCertLoops(ctx context.Context, sc *ipn.ServeConfig) error {
if sc == nil {
return fmt.Errorf("[unexpected] ensureCertLoops called with nil ServeConfig")
}
@@ -87,12 +97,18 @@ func (cm *certManager) ensureCertLoops(ctx context.Context, sc *ipn.ServeConfig)
// renewed at that point. Renewal here is needed to prevent the shared certs from expiry in edge cases where the 'write'
// replica does not get any HTTPS requests.
// https://letsencrypt.org/docs/integration-guide/#retrying-failures
func (cm *certManager) runCertLoop(ctx context.Context, domain string) {
func (cm *CertManager) runCertLoop(ctx context.Context, domain string) {
const (
normalInterval = 24 * time.Hour // regular renewal check
initialRetry = 1 * time.Minute // initial backoff after a failure
maxRetryInterval = 24 * time.Hour // max backoff period
)
if err := cm.waitForCertDomain(ctx, domain); err != nil {
// Best-effort, log and continue with the issuing loop.
cm.logf("error waiting for cert domain %s: %v", domain, err)
}
timer := time.NewTimer(0) // fire off timer immediately
defer timer.Stop()
retryCount := 0
@@ -101,38 +117,31 @@ func (cm *certManager) runCertLoop(ctx context.Context, domain string) {
case <-ctx.Done():
return
case <-timer.C:
// We call the certificate endpoint, but don't do anything
// with the returned certs here.
// The call to the certificate endpoint will ensure that
// certs are issued/renewed as needed and stored in the
// relevant state store. For example, for HA Ingress
// 'write' replica, the cert and key will be stored in a
// Kubernetes Secret named after the domain for which we
// are issuing.
// Note that renewals triggered by the call to the
// certificates endpoint here and by renewal check
// triggered during a call to node's HTTPS endpoint
// share the same state/renewal lock mechanism, so we
// should not run into redundant issuances during
// concurrent renewal checks.
// TODO(irbekrm): maybe it is worth adding a new
// issuance endpoint that explicitly only triggers
// issuance and stores certs in the relevant store, but
// does not return certs to the caller?
// We call the certificate endpoint, but don't do anything with the
// returned certs here. The call to the certificate endpoint will
// ensure that certs are issued/renewed as needed and stored in the
// relevant state store. For example, for HA Ingress 'write' replica,
// the cert and key will be stored in a Kubernetes Secret named after
// the domain for which we are issuing.
//
// Note that renewals triggered by the call to the certificates
// endpoint here and by renewal check triggered during a call to
// node's HTTPS endpoint share the same state/renewal lock mechanism,
// so we should not run into redundant issuances during concurrent
// renewal checks.
// An issuance holds a shared lock, so we need to avoid
// a situation where other services cannot issue certs
// because a single one is holding the lock.
// An issuance holds a shared lock, so we need to avoid a situation
// where other services cannot issue certs because a single one is
// holding the lock.
ctxT, cancel := context.WithTimeout(ctx, time.Second*300)
defer cancel()
_, _, err := cm.lc.CertPair(ctxT, domain)
cancel()
if err != nil {
log.Printf("error refreshing certificate for %s: %v", domain, err)
cm.logf("error refreshing certificate for %s: %v", domain, err)
}
var nextInterval time.Duration
// TODO(irbekrm): distinguish between LE rate limit
// errors and other error types like transient network
// errors.
// TODO(irbekrm): distinguish between LE rate limit errors and other
// error types like transient network errors.
if err == nil {
retryCount = 0
nextInterval = normalInterval
@@ -147,10 +156,34 @@ func (cm *certManager) runCertLoop(ctx context.Context, domain string) {
backoff = maxRetryInterval
}
nextInterval = backoff
log.Printf("Error refreshing certificate for %s (retry %d): %v. Will retry in %v\n",
cm.logf("Error refreshing certificate for %s (retry %d): %v. Will retry in %v\n",
domain, retryCount, err, nextInterval)
}
timer.Reset(nextInterval)
}
}
}
// waitForCertDomain ensures the requested domain is in the list of allowed
// domains before issuing the cert for the first time.
func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) error {
w, err := cm.lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil {
return fmt.Errorf("error watching IPN bus: %w", err)
}
defer w.Close()
for {
n, err := w.Next()
if err != nil {
return err
}
if n.NetMap == nil {
continue
}
if slices.Contains(n.NetMap.DNS.CertDomains, domain) {
return nil
}
}
}

View File

@@ -1,17 +1,18 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
package certs
import (
"context"
"log"
"testing"
"time"
"tailscale.com/ipn"
"tailscale.com/kube/localclient"
"tailscale.com/tailcfg"
"tailscale.com/types/netmap"
)
// TestEnsureCertLoops tests that the certManager correctly starts and stops
@@ -161,8 +162,28 @@ func TestEnsureCertLoops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := &certManager{
lc: &fakeLocalClient{},
notifyChan := make(chan ipn.Notify)
go func() {
for {
notifyChan <- ipn.Notify{
NetMap: &netmap.NetworkMap{
DNS: tailcfg.DNSConfig{
CertDomains: []string{
"my-app.tailnetxyz.ts.net",
"my-other-app.tailnetxyz.ts.net",
},
},
},
}
}
}()
cm := &CertManager{
lc: &localclient.FakeLocalClient{
FakeIPNBusWatcher: localclient.FakeIPNBusWatcher{
NotifyChan: notifyChan,
},
},
logf: log.Printf,
certLoops: make(map[string]context.CancelFunc),
}
@@ -179,7 +200,7 @@ func TestEnsureCertLoops(t *testing.T) {
}
})()
err := cm.ensureCertLoops(ctx, tt.initialConfig)
err := cm.EnsureCertLoops(ctx, tt.initialConfig)
if (err != nil) != tt.wantErr {
t.Fatalf("ensureCertLoops() error = %v", err)
}
@@ -189,7 +210,7 @@ func TestEnsureCertLoops(t *testing.T) {
}
if tt.updatedConfig != nil {
if err := cm.ensureCertLoops(ctx, tt.updatedConfig); err != nil {
if err := cm.EnsureCertLoops(ctx, tt.updatedConfig); err != nil {
t.Fatalf("ensureCertLoops() error on update = %v", err)
}

View File

@@ -9,11 +9,12 @@ package conf
import (
"encoding/json"
"errors"
"fmt"
"net/netip"
"os"
"github.com/tailscale/hujson"
"tailscale.com/tailcfg"
"tailscale.com/types/opt"
)
@@ -21,12 +22,11 @@ const v1Alpha1 = "v1alpha1"
// Config describes a config file.
type Config struct {
Path string // disk path of HuJSON
Raw []byte // raw bytes from disk, in HuJSON form
Raw []byte // raw bytes, in HuJSON form
Std []byte // standardized JSON form
Version string // "v1alpha1"
// Parsed is the parsed config, converted from its on-disk version to the
// Parsed is the parsed config, converted from its raw bytes version to the
// latest known format.
Parsed ConfigV1Alpha1
}
@@ -48,47 +48,49 @@ type VersionedConfig struct {
}
type ConfigV1Alpha1 struct {
AuthKey *string `json:",omitempty"` // Tailscale auth key to use.
Hostname *string `json:",omitempty"` // Tailscale device hostname.
State *string `json:",omitempty"` // Path to the Tailscale state.
LogLevel *string `json:",omitempty"` // "debug", "info". Defaults to "info".
App *string `json:",omitempty"` // e.g. kubetypes.AppProxyGroupKubeAPIServer
KubeAPIServer *KubeAPIServer `json:",omitempty"` // Config specific to the API Server proxy.
ServerURL *string `json:",omitempty"` // URL of the Tailscale coordination server.
AcceptRoutes *bool `json:",omitempty"` // Accepts routes advertised by other Tailscale nodes.
AuthKey *string `json:",omitempty"` // Tailscale auth key to use.
State *string `json:",omitempty"` // Path to the Tailscale state.
LogLevel *string `json:",omitempty"` // "debug", "info". Defaults to "info".
App *string `json:",omitempty"` // e.g. kubetypes.AppProxyGroupKubeAPIServer
ServerURL *string `json:",omitempty"` // URL of the Tailscale coordination server.
// StaticEndpoints are additional, user-defined endpoints that this node
// should advertise amongst its wireguard endpoints.
StaticEndpoints []netip.AddrPort `json:",omitempty"`
// TODO(tomhjp): The remaining fields should all be reloadable during
// runtime, but currently missing most of the APIServerProxy fields.
Hostname *string `json:",omitempty"` // Tailscale device hostname.
AcceptRoutes *bool `json:",omitempty"` // Accepts routes advertised by other Tailscale nodes.
AdvertiseServices []string `json:",omitempty"` // Tailscale Services to advertise.
APIServerProxy *APIServerProxyConfig `json:",omitempty"` // Config specific to the API Server proxy.
}
type KubeAPIServer struct {
AuthMode opt.Bool `json:",omitempty"`
type APIServerProxyConfig struct {
Enabled opt.Bool `json:",omitempty"` // Whether to enable the API Server proxy.
AuthMode opt.Bool `json:",omitempty"` // Run in auth or noauth mode.
ServiceName *tailcfg.ServiceName `json:",omitempty"` // Name of the Tailscale Service to advertise.
IssueCerts opt.Bool `json:",omitempty"` // Whether this replica should issue TLS certs for the Tailscale Service.
}
// Load reads and parses the config file at the provided path on disk.
func Load(path string) (c Config, err error) {
c.Path = path
c.Raw, err = os.ReadFile(path)
if err != nil {
return c, fmt.Errorf("error reading config file %q: %w", path, err)
}
func Load(raw []byte) (c Config, err error) {
c.Raw = raw
c.Std, err = hujson.Standardize(c.Raw)
if err != nil {
return c, fmt.Errorf("error parsing config file %q HuJSON/JSON: %w", path, err)
return c, fmt.Errorf("error parsing config as HuJSON/JSON: %w", err)
}
var ver VersionedConfig
if err := json.Unmarshal(c.Std, &ver); err != nil {
return c, fmt.Errorf("error parsing config file %q: %w", path, err)
return c, fmt.Errorf("error parsing config: %w", err)
}
rootV1Alpha1 := (ver.Version == v1Alpha1)
backCompatV1Alpha1 := (ver.V1Alpha1 != nil)
switch {
case ver.Version == "":
return c, fmt.Errorf("error parsing config file %q: no \"version\" field provided", path)
return c, errors.New("error parsing config: no \"version\" field provided")
case rootV1Alpha1 && backCompatV1Alpha1:
// Exactly one of these should be set.
return c, fmt.Errorf("error parsing config file %q: both root and v1alpha1 config provided", path)
return c, errors.New("error parsing config: both root and v1alpha1 config provided")
case rootV1Alpha1 != backCompatV1Alpha1:
c.Version = v1Alpha1
switch {
@@ -100,7 +102,7 @@ func Load(path string) (c Config, err error) {
c.Parsed = ConfigV1Alpha1{}
}
default:
return c, fmt.Errorf("error parsing config file %q: unsupported \"version\" value %q; want \"%s\"", path, ver.Version, v1Alpha1)
return c, fmt.Errorf("error parsing config: unsupported \"version\" value %q; want \"%s\"", ver.Version, v1Alpha1)
}
return c, nil

View File

@@ -6,8 +6,6 @@
package conf
import (
"os"
"path/filepath"
"strings"
"testing"
@@ -57,12 +55,7 @@ func TestVersionedConfig(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "config.json")
if err := os.WriteFile(path, []byte(tc.inputConfig), 0644); err != nil {
t.Fatalf("failed to write config file: %v", err)
}
cfg, err := Load(path)
cfg, err := Load([]byte(tc.inputConfig))
switch {
case tc.expectedError == "" && err != nil:
t.Fatalf("unexpected error: %v", err)

View File

@@ -54,4 +54,10 @@ const (
LabelManaged = "tailscale.com/managed"
LabelSecretType = "tailscale.com/secret-type" // "config", "state" "certs"
LabelSecretTypeConfig = "config"
LabelSecretTypeState = "state"
LabelSecretTypeCerts = "certs"
KubeAPIServerConfigFile = "config.hujson"
)

View File

@@ -0,0 +1,35 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package localclient
import (
"context"
"fmt"
"tailscale.com/ipn"
)
type FakeLocalClient struct {
FakeIPNBusWatcher
}
func (f *FakeLocalClient) WatchIPNBus(ctx context.Context, mask ipn.NotifyWatchOpt) (IPNBusWatcher, error) {
return &f.FakeIPNBusWatcher, nil
}
func (f *FakeLocalClient) CertPair(ctx context.Context, domain string) ([]byte, []byte, error) {
return nil, nil, fmt.Errorf("CertPair not implemented")
}
type FakeIPNBusWatcher struct {
NotifyChan chan ipn.Notify
}
func (f *FakeIPNBusWatcher) Close() error {
return nil
}
func (f *FakeIPNBusWatcher) Next() (ipn.Notify, error) {
return <-f.NotifyChan, nil
}

View File

@@ -0,0 +1,49 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package localclient provides an interface for all the local.Client methods
// kube needs to use, so that we can easily mock it in tests.
package localclient
import (
"context"
"io"
"tailscale.com/client/local"
"tailscale.com/ipn"
)
// LocalClient is roughly a subset of the local.Client struct's methods, used
// for easier testing.
type LocalClient interface {
WatchIPNBus(ctx context.Context, mask ipn.NotifyWatchOpt) (IPNBusWatcher, error)
CertIssuer
}
// IPNBusWatcher is local.IPNBusWatcher's methods restated in an interface to
// allow for easier mocking in tests.
type IPNBusWatcher interface {
io.Closer
Next() (ipn.Notify, error)
}
type CertIssuer interface {
CertPair(context.Context, string) ([]byte, []byte, error)
}
// New returns a LocalClient that wraps the provided local.Client.
func New(lc *local.Client) LocalClient {
return &localClient{lc: lc}
}
type localClient struct {
lc *local.Client
}
func (l *localClient) WatchIPNBus(ctx context.Context, mask ipn.NotifyWatchOpt) (IPNBusWatcher, error) {
return l.lc.WatchIPNBus(ctx, mask)
}
func (l *localClient) CertPair(ctx context.Context, domain string) ([]byte, []byte, error) {
return l.lc.CertPair(ctx, domain)
}

View File

@@ -1,25 +1,25 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
// Package services manages graceful shutdown of Tailscale Services advertised
// by Kubernetes clients.
package services
import (
"context"
"fmt"
"log"
"time"
"tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/types/logger"
)
// ensureServicesNotAdvertised is a function that gets called on containerboot
// termination and ensures that any currently advertised VIPServices get
// unadvertised to give clients time to switch to another node before this one
// is shut down.
func ensureServicesNotAdvertised(ctx context.Context, lc *local.Client) error {
// EnsureServicesNotAdvertised is a function that gets called on containerboot
// or k8s-proxy termination and ensures that any currently advertised Services
// get unadvertised to give clients time to switch to another node before this
// one is shut down.
func EnsureServicesNotAdvertised(ctx context.Context, lc *local.Client, logf logger.Logf) error {
prefs, err := lc.GetPrefs(ctx)
if err != nil {
return fmt.Errorf("error getting prefs: %w", err)
@@ -28,7 +28,7 @@ func ensureServicesNotAdvertised(ctx context.Context, lc *local.Client) error {
return nil
}
log.Printf("unadvertising services: %v", prefs.AdvertiseServices)
logf("unadvertising services: %v", prefs.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{

View File

@@ -11,11 +11,13 @@
package state
import (
"context"
"encoding/json"
"fmt"
"tailscale.com/ipn"
"tailscale.com/kube/kubetypes"
klc "tailscale.com/kube/localclient"
"tailscale.com/tailcfg"
"tailscale.com/util/deephash"
)
@@ -56,12 +58,20 @@ func SetInitialKeys(store ipn.StateStore, podUID string) error {
// cancelled or it hits an error. The passed in next function is expected to be
// from a local.IPNBusWatcher that is at least subscribed to
// ipn.NotifyInitialNetMap.
func KeepKeysUpdated(store ipn.StateStore, next func() (ipn.Notify, error)) error {
var currentDeviceID, currentDeviceIPs, currentDeviceFQDN deephash.Sum
func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClient) error {
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil {
return fmt.Errorf("error watching IPN bus: %w", err)
}
defer w.Close()
var currentDeviceID, currentDeviceIPs, currentDeviceFQDN deephash.Sum
for {
n, err := next() // Blocks on a streaming LocalAPI HTTP call.
n, err := w.Next() // Blocks on a streaming LocalAPI HTTP call.
if err != nil {
if err == ctx.Err() {
return nil
}
return err
}
if n.NetMap == nil {

View File

@@ -15,6 +15,7 @@ import (
"github.com/google/go-cmp/cmp"
"tailscale.com/ipn"
"tailscale.com/ipn/store"
klc "tailscale.com/kube/localclient"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
@@ -100,24 +101,20 @@ func TestSetInitialStateKeys(t *testing.T) {
}
func TestKeepStateKeysUpdated(t *testing.T) {
store, err := store.New(logger.Discard, "mem:")
if err != nil {
t.Fatalf("error creating in-memory store: %v", err)
store := fakeStore{
writeChan: make(chan string),
}
nextWaiting := make(chan struct{})
go func() {
<-nextWaiting // Acknowledge the initial signal.
}()
notifyCh := make(chan ipn.Notify)
next := func() (ipn.Notify, error) {
nextWaiting <- struct{}{} // Send signal to test that state is consistent.
return <-notifyCh, nil // Wait for test input.
errs := make(chan error)
notifyChan := make(chan ipn.Notify)
lc := &klc.FakeLocalClient{
FakeIPNBusWatcher: klc.FakeIPNBusWatcher{
NotifyChan: notifyChan,
},
}
errs := make(chan error, 1)
go func() {
err := KeepKeysUpdated(store, next)
err := KeepKeysUpdated(t.Context(), store, lc)
if err != nil {
errs <- fmt.Errorf("keepStateKeysUpdated returned with error: %w", err)
}
@@ -126,16 +123,12 @@ func TestKeepStateKeysUpdated(t *testing.T) {
for _, tc := range []struct {
name string
notify ipn.Notify
expected map[ipn.StateKey][]byte
expected []string
}{
{
name: "initial_not_authed",
notify: ipn.Notify{},
expected: map[ipn.StateKey][]byte{
keyDeviceID: nil,
keyDeviceFQDN: nil,
keyDeviceIPs: nil,
},
name: "initial_not_authed",
notify: ipn.Notify{},
expected: nil,
},
{
name: "authed",
@@ -148,10 +141,10 @@ func TestKeepStateKeysUpdated(t *testing.T) {
}).View(),
},
},
expected: map[ipn.StateKey][]byte{
keyDeviceID: []byte("TESTCTRL00000001"),
keyDeviceFQDN: []byte("test-node.test.ts.net"),
keyDeviceIPs: []byte(`["100.64.0.1","fd7a:115c:a1e0:ab12:4843:cd96:0:1"]`),
expected: []string{
fmt.Sprintf("%s=%s", keyDeviceID, "TESTCTRL00000001"),
fmt.Sprintf("%s=%s", keyDeviceFQDN, "test-node.test.ts.net"),
fmt.Sprintf("%s=%s", keyDeviceIPs, `["100.64.0.1","fd7a:115c:a1e0:ab12:4843:cd96:0:1"]`),
},
},
{
@@ -165,39 +158,39 @@ func TestKeepStateKeysUpdated(t *testing.T) {
}).View(),
},
},
expected: map[ipn.StateKey][]byte{
keyDeviceID: []byte("TESTCTRL00000001"),
keyDeviceFQDN: []byte("updated.test.ts.net"),
keyDeviceIPs: []byte(`["100.64.0.250"]`),
expected: []string{
fmt.Sprintf("%s=%s", keyDeviceFQDN, "updated.test.ts.net"),
fmt.Sprintf("%s=%s", keyDeviceIPs, `["100.64.0.250"]`),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Send test input.
select {
case notifyCh <- tc.notify:
case <-errs:
t.Fatal("keepStateKeysUpdated returned before test input")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for next() to be called again")
}
// Wait for next() to be called again so we know the goroutine has
// processed the event.
select {
case <-nextWaiting:
case <-errs:
t.Fatal("keepStateKeysUpdated returned before test input")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for next() to be called again")
}
for key, value := range tc.expected {
got, _ := store.ReadState(key)
if !bytes.Equal(got, value) {
t.Errorf("state key %q mismatch: expected %q, got %q", key, value, got)
notifyChan <- tc.notify
for _, expected := range tc.expected {
select {
case got := <-store.writeChan:
if got != expected {
t.Errorf("expected %q, got %q", expected, got)
}
case err := <-errs:
t.Fatalf("unexpected error: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for expected write %q", expected)
}
}
})
}
}
type fakeStore struct {
writeChan chan string
}
func (f fakeStore) ReadState(key ipn.StateKey) ([]byte, error) {
return nil, fmt.Errorf("ReadState not implemented")
}
func (f fakeStore) WriteState(key ipn.StateKey, value []byte) error {
f.writeChan <- fmt.Sprintf("%s=%s", key, value)
return nil
}