mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-25 11:05:45 +00:00
egress HA via portmapping prototype
Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
parent
a2c42d3cd4
commit
37851142af
@ -81,6 +81,7 @@
|
||||
// cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy)
|
||||
// as a non-cluster workload on tailnet.
|
||||
// This is only meant to be configured by the Kubernetes operator.
|
||||
// - TS_EGRESS_SERVICES_PATH: mounted json formatted egress service config
|
||||
//
|
||||
// When running on Kubernetes, containerboot defaults to storing state in the
|
||||
// "tailscale" kube secret. To store state on local disk instead, set
|
||||
@ -123,11 +124,13 @@
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/conffile"
|
||||
kubeutils "tailscale.com/k8s-operator"
|
||||
"tailscale.com/kube"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/ptr"
|
||||
"tailscale.com/util/deephash"
|
||||
"tailscale.com/util/linuxfw"
|
||||
"tailscale.com/util/mak"
|
||||
)
|
||||
|
||||
func newNetfilterRunner(logf logger.Logf) (linuxfw.NetfilterRunner, error) {
|
||||
@ -166,6 +169,7 @@ func main() {
|
||||
PodIP: defaultEnv("POD_IP", ""),
|
||||
EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false),
|
||||
HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""),
|
||||
EgressServicesPath: defaultEnv("TS_EGRESS_SERVICES_PATH", ""),
|
||||
}
|
||||
|
||||
if err := cfg.validate(); err != nil {
|
||||
@ -576,7 +580,6 @@ func main() {
|
||||
log.Fatalf("storing device IPs and FQDN in Kubernetes Secret: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.HealthCheckAddrPort != "" {
|
||||
h.Lock()
|
||||
h.hasAddrs = len(addrs) != 0
|
||||
@ -601,6 +604,11 @@ func main() {
|
||||
log.Println("Startup complete, waiting for shutdown signal")
|
||||
startupTasksDone = true
|
||||
|
||||
if cfg.EgressServicesPath != "" {
|
||||
log.Printf("ensure egress services are configured")
|
||||
go ensureEgressServicePortMap(ctx, cfg.EgressServicesPath, nfr, addrs, cfg.PodIP, cfg.KubeSecret)
|
||||
}
|
||||
|
||||
// Wait on tailscaled process. It won't
|
||||
// be cleaned up by default when the
|
||||
// container exits as it is not PID1.
|
||||
@ -1172,6 +1180,183 @@ type settings struct {
|
||||
// target.
|
||||
PodIP string
|
||||
HealthCheckAddrPort string
|
||||
EgressServicesPath string
|
||||
}
|
||||
|
||||
func ensureEgressServicePortMap(ctx context.Context, path string, nfr linuxfw.NetfilterRunner, addrs []netip.Prefix, podIPS, stateSecretName string) {
|
||||
if path == "" {
|
||||
panic("egress services config path is empty")
|
||||
}
|
||||
// TODO: also reconfigure if tailnet IPs have changed
|
||||
var tickChan <-chan time.Time
|
||||
var eventChan <-chan fsnotify.Event
|
||||
if w, err := fsnotify.NewWatcher(); err != nil {
|
||||
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
tickChan = ticker.C
|
||||
} else {
|
||||
defer w.Close()
|
||||
if err := w.Add(filepath.Dir(path)); err != nil {
|
||||
log.Fatalf("failed to add fsnotify watch: %v", err)
|
||||
}
|
||||
eventChan = w.Events
|
||||
}
|
||||
f := func() {
|
||||
log.Printf("running egress service reconfigure")
|
||||
cfg, err := readEgressSvcConfig(path)
|
||||
if err != nil {
|
||||
log.Fatalf("error reading egress svc config: %v", err)
|
||||
}
|
||||
if cfg == nil || len(*cfg) == 0 {
|
||||
log.Printf("no services configured yet")
|
||||
return
|
||||
}
|
||||
// get the config from state secret
|
||||
stateS, err := kc.GetSecret(ctx, stateSecretName)
|
||||
if err != nil {
|
||||
log.Fatalf("error retrieving state secret: %v", err)
|
||||
}
|
||||
prevCfg := &kubeutils.EgressServicesStatus{}
|
||||
prevCfgS, ok := stateS.Data["egress-services"]
|
||||
if ok {
|
||||
if err := json.Unmarshal([]byte(prevCfgS), prevCfg); err != nil {
|
||||
log.Fatalf("error unmarshalling previous config: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
hasChanges := false
|
||||
for svcName, svc := range *cfg {
|
||||
// produce wanted config
|
||||
if egressSvcUpToDate(prevCfg, svcName, svc, podIPS) {
|
||||
log.Printf("%s up to date", svcName)
|
||||
continue
|
||||
}
|
||||
log.Printf("svc %s changes detected", svcName)
|
||||
hasChanges = true
|
||||
// only IP is supported for this prototype
|
||||
tailnetTarget, err := netip.ParseAddr(svc.TailnetTarget.IP)
|
||||
if err != nil {
|
||||
log.Fatalf("error parsing tailnet ip: %v", err)
|
||||
}
|
||||
var local netip.Addr
|
||||
for _, pfx := range addrs {
|
||||
if !pfx.IsSingleIP() {
|
||||
continue
|
||||
}
|
||||
if pfx.Addr().Is4() != tailnetTarget.Is4() {
|
||||
continue
|
||||
}
|
||||
local = pfx.Addr()
|
||||
break
|
||||
}
|
||||
if !local.IsValid() {
|
||||
// TODO: watch tailnet IPs and retry when a new one gets allocated
|
||||
log.Fatalf("no valid local IP: %v", local)
|
||||
}
|
||||
// add snat
|
||||
if err := nfr.AddSNATRuleForDst(local, tailnetTarget); err != nil {
|
||||
log.Fatalf("error setting up SNAT: %v", err)
|
||||
}
|
||||
podIP, err := netip.ParseAddr(podIPS)
|
||||
if err != nil {
|
||||
log.Fatalf("provided Pod IP %s cannot be parsed as IP: %v", podIPS, err)
|
||||
}
|
||||
for _, mapping := range svc.Ports {
|
||||
p, err := proto(mapping.Protocol)
|
||||
if err != nil {
|
||||
log.Fatalf("unable to parse protocol: %v", err)
|
||||
}
|
||||
if err := nfr.AddDNATRuleForSrcPrt(podIP, tailnetTarget, mapping.Dst, mapping.Src, p); err != nil {
|
||||
log.Fatalf("error setting up DNAT rule for tailnet target %v port map %v:%v : %v", tailnetTarget, mapping.Src, mapping.Dst, err)
|
||||
}
|
||||
}
|
||||
svcCfg := kubeutils.EgressServiceStatus{
|
||||
TailnetTarget: svc.TailnetTarget,
|
||||
Ports: svc.Ports,
|
||||
PodIP: podIPS,
|
||||
}
|
||||
mak.Set(prevCfg, svcName, svcCfg)
|
||||
}
|
||||
log.Printf("state Secret has changes: %t", hasChanges)
|
||||
if hasChanges {
|
||||
bs, err := json.Marshal(prevCfg)
|
||||
if err != nil {
|
||||
log.Fatalf("error marshalling service config: %v", err)
|
||||
}
|
||||
stateS.Data["egress-services"] = bs
|
||||
patch := kube.JSONPatch{
|
||||
Op: "replace",
|
||||
Path: "/data/egress-services",
|
||||
Value: bs,
|
||||
}
|
||||
if err := kc.JSONPatchSecret(ctx, stateSecretName, []kube.JSONPatch{patch}); err != nil {
|
||||
log.Fatalf("error patching state Secret: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
f()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tickChan:
|
||||
case <-eventChan:
|
||||
f()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func proto(s string) (uint8, error) {
|
||||
switch strings.ToLower(s) {
|
||||
case "tcp":
|
||||
return 6, nil
|
||||
case "udp":
|
||||
return 11, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unrecognized protocol: %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
func egressSvcUpToDate(prevCfg *kubeutils.EgressServicesStatus, svcName string, svcCfg kubeutils.EgressService, podIP string) bool {
|
||||
if prevCfg == nil {
|
||||
return false
|
||||
}
|
||||
prev, ok := (*prevCfg)[svcName]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if !strings.EqualFold(prev.TailnetTarget.IP, svcCfg.TailnetTarget.IP) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(prev.Ports, svcCfg.Ports) {
|
||||
return false
|
||||
}
|
||||
if !strings.EqualFold(podIP, prev.PodIP) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func readEgressSvcConfig(path string) (*kubeutils.EgressServices, error) {
|
||||
if path == "" {
|
||||
return nil, nil
|
||||
}
|
||||
j, err := os.ReadFile(path)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(j) == 0 || string(j) == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var cfg *kubeutils.EgressServices
|
||||
if err := json.Unmarshal(j, &cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (s *settings) validate() error {
|
||||
@ -1351,7 +1536,7 @@ func isOneStepConfig(cfg *settings) bool {
|
||||
// as an L3 proxy, proxying to an endpoint provided via one of the config env
|
||||
// vars.
|
||||
func isL3Proxy(cfg *settings) bool {
|
||||
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress
|
||||
return cfg.EgressServicesPath != "" || cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress
|
||||
}
|
||||
|
||||
// hasKubeStateStore returns true if the state must be stored in a Kubernetes
|
||||
|
@ -48,14 +48,14 @@ metadata:
|
||||
namespace: {{ .Release.Namespace }}
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["secrets", "serviceaccounts", "configmaps"]
|
||||
resources: ["secrets", "serviceaccounts", "configmaps", "pods"]
|
||||
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
|
||||
- apiGroups: ["apps"]
|
||||
resources: ["statefulsets", "deployments"]
|
||||
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
|
||||
- apiGroups: ["discovery.k8s.io"]
|
||||
resources: ["endpointslices"]
|
||||
verbs: ["get", "list", "watch"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch"]
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: RoleBinding
|
||||
|
@ -2506,6 +2506,7 @@ rules:
|
||||
- secrets
|
||||
- serviceaccounts
|
||||
- configmaps
|
||||
- pods
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
@ -2537,6 +2538,9 @@ rules:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- create
|
||||
- update
|
||||
- patch
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
|
214
cmd/k8s-operator/egress-ha.md
Normal file
214
cmd/k8s-operator/egress-ha.md
Normal file
@ -0,0 +1,214 @@
|
||||
# This is a Kubernetes Operator egress HA prototype based on portmapping
|
||||
|
||||
It contains:
|
||||
- containerboot/netfilter runner changes to parse egress services config and set up portmapping based DNAT
|
||||
- two new operator reconcile loops to parse HA egress resources
|
||||
- static manifests that imitate having a ProxyGroup deployed to cluster
|
||||
- some other changes, additional RBAC etc
|
||||
|
||||
## To try this out
|
||||
|
||||
### Setup
|
||||
|
||||
(The setup steps use images built from this branch available from a public GCR that I own)
|
||||
|
||||
- set up a cluster WITHOUT the operator
|
||||
|
||||
From this branch:
|
||||
- `$ kubectl apply -f cmd/k8s-operator/crds`
|
||||
- install operator:
|
||||
```
|
||||
$ helm upgrade --install operator ./cmd/k8s-operator/deploy/chart/ --set operatorConfig.image.repo=gcr.io/csi-test-290908/operator --set operatorConfig.image.tag=v0.0.14egresshapm -n tailscale --set oauth.clientId=<oauth-client-id> --set oauth.clientSecret=<oauth-client-secret> --set operatorConfig.logging=debug --create-namespace --set operatorConfig.image.pullPolicy=IfNotPresent
|
||||
```
|
||||
- apply static manifests that imitate having a ProxyGroup:
|
||||
|
||||
Create a REUSABLE Tailscale auth key and update ./cmd/k8s-operator/egress-ha.yaml with it.
|
||||
|
||||
Run:
|
||||
|
||||
```
|
||||
$ kubectl apply -f ./cmd/k8s-operator/egress-ha.yaml
|
||||
```
|
||||
- observe that the 'proxy group' `Pods` have come up:
|
||||
```
|
||||
$ kubectl get pods -n tailscale
|
||||
NAME READY STATUS RESTARTS AGE
|
||||
egress-proxies-0 1/1 Running 0 6m23s
|
||||
egress-proxies-1 1/1 Running 0 6m22s
|
||||
egress-proxies-2 1/1 Running 0 6m21s
|
||||
...
|
||||
```
|
||||
|
||||
### Test it out
|
||||
|
||||
- ensure you have some service on your tailnet that you can access via egress
|
||||
|
||||
#### Expose a tailnet service(s) on the ProxyGroup proxies
|
||||
|
||||
- Apply some egress `Services` with `tailscale.com/proxy-group` label, and a `tailscale.com/tailnet-ip` annotation pointing at the tailnet service i.e:
|
||||
|
||||
```
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
annotations:
|
||||
tailscale.com/tailnet-ip: 100.64.1.230
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
name: kuard-egress
|
||||
spec:
|
||||
externalName: placeholder
|
||||
type: ExternalName
|
||||
ports:
|
||||
- port: 80
|
||||
protocol: TCP
|
||||
name: http
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
annotations:
|
||||
tailscale.com/tailnet-ip: 100.64.1.196
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
name: dns-server
|
||||
spec:
|
||||
externalName: placeholder
|
||||
type: ExternalName
|
||||
ports:
|
||||
- port: 53
|
||||
protocol: UDP
|
||||
name: udp
|
||||
- port: 53
|
||||
protocol: TCP
|
||||
name: tcp
|
||||
```
|
||||
|
||||
- Note- it will take a little while for the mounted ConfigMap to be updated.
|
||||
To follow, you can take a look at whether the mounted config has been updated:
|
||||
```
|
||||
$ kubectl exec -it egress-proxies-0 -n tailscale -- cat /etc/egress-services/cfg
|
||||
|
||||
```
|
||||
.. as well as check proxy logs
|
||||
```
|
||||
$ kubectl logs egress-proxies-0 -n tailscale
|
||||
...
|
||||
boot: 2024/09/04 07:35:48 running egress service reconfigure
|
||||
boot: 2024/09/04 07:35:48 svc dns-server-default changes detected
|
||||
boot: 2024/09/04 07:35:48 svc kuard-egress-default changes detected
|
||||
...
|
||||
|
||||
```
|
||||
|
||||
- Once the config has been updated, test that any cluster workload can access the egress service(s)
|
||||
via the ExternalName Service(s):
|
||||
|
||||
```
|
||||
$ kubectl exec -it proxy -- sh
|
||||
/ # curl -vv kuard-egress
|
||||
* Host kuard-egress:80 was resolved.
|
||||
...
|
||||
/ # dig @dns-server <some-dns-name>
|
||||
|
||||
; <<>> DiG 9.18.24 <<>> @dns-server <some-dns-name>
|
||||
; (1 server found)
|
||||
...
|
||||
```
|
||||
|
||||
- Verify that the EndpointSlice created for each egress service contains all ProxyGroup Pod IPs:
|
||||
|
||||
```
|
||||
$ kubectl get po -n tailscale -owide
|
||||
NAME READY STATUS RESTARTS AGE IP
|
||||
egress-proxies-0 1/1 Running 0 31m 10.80.0.51
|
||||
egress-proxies-1 1/1 Running 0 31m 10.80.2.54
|
||||
egress-proxies-2 1/1 Running 0 31m 10.80.0.52
|
||||
...
|
||||
$ kubectl get endpointslice -n tailscale
|
||||
NAME ADDRESSTYPE PORTS ENDPOINTS AGE
|
||||
dns-server-default IPv4 3160,2181 10.80.0.52,10.80.0.51,10.80.2.54 30m
|
||||
kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 30m
|
||||
...
|
||||
```
|
||||
|
||||
#### Add another Pod to 'proxy group'
|
||||
|
||||
Scale replicas 3 -> 4:
|
||||
|
||||
- `$ kubectl scale sts/egress-proxies -n tailscale --replicas=4`
|
||||
|
||||
This change should be processed a lot faster as the proxy will read its config on start
|
||||
|
||||
- Once the additional `Pod` is up, observe that it's IP address has been added to the EndpointSlice:
|
||||
|
||||
```
|
||||
$ kubectl get po -n tailscale -owide
|
||||
NAME READY STATUS RESTARTS AGE IP
|
||||
egress-proxies-0 1/1 Running 0 41m 10.80.0.51
|
||||
egress-proxies-1 1/1 Running 0 41m 10.80.2.54
|
||||
egress-proxies-2 1/1 Running 0 41m 10.80.0.52
|
||||
egress-proxies-3 1/1 Running 0 69s 10.80.2.56
|
||||
...
|
||||
$ kubectl get endpointslice -n tailscale
|
||||
NAME ADDRESSTYPE PORTS ENDPOINTS AGE
|
||||
dns-server-default IPv4 3160,2181 10.80.2.56,10.80.0.51,10.80.2.54 + 1 more... 40m
|
||||
kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 + 1 more... 40m
|
||||
```
|
||||
|
||||
- You can also test that the new `Pod` knows how to route the traffic.
|
||||
|
||||
Find the `Pod`'s target port from the ExternalName Service that you created:
|
||||
|
||||
```
|
||||
$ kubectl get svc kuard-egress -oyaml
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
annotations:
|
||||
tailscale.com/tailnet-ip: 100.64.1.230
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
name: kuard-egress
|
||||
namespace: default
|
||||
spec:
|
||||
externalName: kuard-egress-default.tailscale.svc.cluster.local
|
||||
ports:
|
||||
- name: http
|
||||
port: 80
|
||||
protocol: TCP
|
||||
targetPort: 2688
|
||||
type: ExternalName
|
||||
```
|
||||
Try to route to the tailnet service using the new `Pod`'s IP:
|
||||
|
||||
```
|
||||
$ kubectl exec -it proxy -- sh
|
||||
/ # curl -vv 10.80.2.56:2688
|
||||
* Trying 10.80.2.56:2688...
|
||||
* Connected to 10.80.2.56 (10.80.2.56) port 2688
|
||||
...
|
||||
```
|
||||
|
||||
#### Remove a Pod from the 'proxy group'
|
||||
|
||||
Scale replicas 4 -> 3:
|
||||
|
||||
- `$ kubectl scale sts/egress-proxies -n tailscale --replicas=3`
|
||||
|
||||
This change should get processed fairly fast.
|
||||
|
||||
- Observe that once the `Pod` is gone, it's IP address is removed from the `EndpointSlice`(s):
|
||||
|
||||
```
|
||||
$ kubectl get po -n tailscale -owide
|
||||
NAME READY STATUS RESTARTS AGE IP
|
||||
egress-proxies-0 1/1 Running 0 49m 10.80.0.51
|
||||
egress-proxies-1 1/1 Running 0 49m 10.80.2.54
|
||||
egress-proxies-2 1/1 Running 0 49m 10.80.0.52
|
||||
...
|
||||
$ kubectl get endpointslice -n tailscale
|
||||
NAME ADDRESSTYPE PORTS ENDPOINTS AGE
|
||||
dns-server-default IPv4 3160,2181 10.80.0.51,10.80.2.54,10.80.0.52 48m
|
||||
kuard-egress-default IPv4 2688 10.80.0.51,10.80.2.54,10.80.0.52 48m
|
||||
```
|
161
cmd/k8s-operator/egress-ha.yaml
Normal file
161
cmd/k8s-operator/egress-ha.yaml
Normal file
@ -0,0 +1,161 @@
|
||||
# This yaml contains roughly what the operator would create for ProxyGroup resource like:
|
||||
# apiVersion: tailscale.com/v1alpha1
|
||||
# kind: ProxyGroup
|
||||
# metadata:
|
||||
# name: egress-proxies
|
||||
# spec:
|
||||
# replicas: 3
|
||||
# type: egress
|
||||
#
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: egress-proxies
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
metadata:
|
||||
name: egress-proxies
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- secrets
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
- deletecollection
|
||||
- get
|
||||
- list
|
||||
- patch
|
||||
- update
|
||||
- watch
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: RoleBinding
|
||||
metadata:
|
||||
name: egress-proxies
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: Role
|
||||
name: egress-proxies
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: egress-proxies
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: egress-proxies
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
app: egress-proxies
|
||||
spec:
|
||||
replicas: 3
|
||||
selector:
|
||||
matchLabels:
|
||||
app: egress-proxies
|
||||
serviceName: ""
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: egress-proxies
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
spec:
|
||||
containers:
|
||||
- env:
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
- name: TS_USERSPACE
|
||||
value: "false"
|
||||
- name: TS_KUBE_SECRET
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.name
|
||||
- name: TS_AUTH_ONCE
|
||||
value: "true"
|
||||
- name: TS_AUTHKEY
|
||||
value: <insert auth key>
|
||||
- name: TS_HOSTNAME
|
||||
value: egress-proxies
|
||||
- name: TS_DEBUG_FIREWALL_MODE
|
||||
value: iptables
|
||||
- name: TS_EGRESS_SERVICES_PATH
|
||||
value: /etc/egress-services/cfg
|
||||
image: gcr.io/csi-test-290908/proxy:v0.0.8egresshapm
|
||||
imagePullPolicy: IfNotPresent
|
||||
name: tailscale
|
||||
securityContext:
|
||||
capabilities:
|
||||
add:
|
||||
- NET_ADMIN
|
||||
volumeMounts:
|
||||
- mountPath: /etc/egress-services
|
||||
name: egress-services
|
||||
dnsPolicy: ClusterFirst
|
||||
restartPolicy: Always
|
||||
schedulerName: default-scheduler
|
||||
serviceAccount: egress-proxies
|
||||
serviceAccountName: egress-proxies
|
||||
volumes:
|
||||
- configMap:
|
||||
defaultMode: 420
|
||||
name: egress-proxies-egress-services
|
||||
name: egress-services
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: egress-proxies-egress-services
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
---
|
||||
kind: Secret
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
name: egress-proxies-0
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
type: Opaque
|
||||
---
|
||||
kind: Secret
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
name: egress-proxies-1
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
type: Opaque
|
||||
---
|
||||
kind: Secret
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
name: egress-proxies-2
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
type: Opaque
|
||||
---
|
||||
kind: Secret
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
name: egress-proxies-3
|
||||
namespace: tailscale
|
||||
labels:
|
||||
tailscale.com/proxy-group: egress-proxies
|
||||
type: Opaque
|
185
cmd/k8s-operator/egressha-eps.go
Normal file
185
cmd/k8s-operator/egressha-eps.go
Normal file
@ -0,0 +1,185 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
kube "tailscale.com/k8s-operator"
|
||||
"tailscale.com/types/ptr"
|
||||
)
|
||||
|
||||
type egressHAEndpointSliceReconciler struct {
|
||||
client.Client
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
// Get EndpointSlice
|
||||
// Retrieve all proxy group Pods
|
||||
func (ehr *egressHAEndpointSliceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
|
||||
logger := ehr.logger.With("Service", req.NamespacedName)
|
||||
logger.Debugf("starting reconcile")
|
||||
defer logger.Debugf("reconcile finished")
|
||||
|
||||
eps := new(discoveryv1.EndpointSlice)
|
||||
err = ehr.Get(ctx, req.NamespacedName, eps)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("EndpointSlice not found")
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err)
|
||||
}
|
||||
if !eps.DeletionTimestamp.IsZero() {
|
||||
logger.Debugf("EnpointSlice is being deleted")
|
||||
return res, nil
|
||||
}
|
||||
oldEps := eps.DeepCopy()
|
||||
proxyGroupName := eps.Labels["tailscale.com/proxy-group"]
|
||||
egressServiceName := eps.Labels["tailscale.com/egress-service"]
|
||||
|
||||
cm := &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-egress-services", proxyGroupName),
|
||||
Namespace: "tailscale",
|
||||
},
|
||||
}
|
||||
err = ehr.Get(ctx, client.ObjectKeyFromObject(cm), cm)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("ConfigMap %s not found", cm.Name)
|
||||
return res, nil
|
||||
}
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error retrieving ConfigMap %s: %w", cm.Name, err)
|
||||
}
|
||||
wantsCfgBS, ok := cm.BinaryData["cfg"]
|
||||
if !ok {
|
||||
// nothing here
|
||||
logger.Debugf("egress-services config is empty")
|
||||
return res, nil
|
||||
}
|
||||
wantsCfg := &kube.EgressServices{}
|
||||
if err := json.Unmarshal(wantsCfgBS, wantsCfg); err != nil {
|
||||
return res, fmt.Errorf("error unmarshalling egress services config: %w", err)
|
||||
}
|
||||
wantsEgressCfg, ok := (*wantsCfg)[egressServiceName]
|
||||
if !ok {
|
||||
logger.Debugf("egress services config does not contain config for %s", egressServiceName)
|
||||
return res, nil
|
||||
}
|
||||
// get all proxy pods
|
||||
podList := &corev1.PodList{}
|
||||
if err := ehr.List(ctx, podList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": proxyGroupName})); err != nil {
|
||||
return res, fmt.Errorf("error listing Pods for %s ProxyGroup: %w", proxyGroupName, err)
|
||||
}
|
||||
if len(podList.Items) == 0 {
|
||||
logger.Debugf("no Pods")
|
||||
return res, nil
|
||||
}
|
||||
// also remove any leftover ones
|
||||
// for each pod
|
||||
newEndpoints := make([]discoveryv1.Endpoint, 0)
|
||||
for _, pod := range podList.Items {
|
||||
if !pod.DeletionTimestamp.IsZero() {
|
||||
logger.Debugf("Pod %s is being deleted, ignore", pod.Name)
|
||||
continue
|
||||
}
|
||||
// TODO: maybe some more Pod readiness checks
|
||||
podIP := pod.Status.PodIP
|
||||
// get the associated state Secret
|
||||
secret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
}
|
||||
err := ehr.Get(ctx, client.ObjectKeyFromObject(secret), secret)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("state Secret %s not yet exists", secret.Name)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error getting state Secret %s: %w", secret.Name, err)
|
||||
}
|
||||
svcStatusBS := secret.Data["egress-services"]
|
||||
if len(svcStatusBS) == 0 {
|
||||
// nothing ready here
|
||||
logger.Debugf("state Secret %s does not yet have egress services status", secret.Name)
|
||||
continue
|
||||
}
|
||||
svcStatus := &kube.EgressServicesStatus{}
|
||||
if err := json.Unmarshal(svcStatusBS, svcStatus); err != nil {
|
||||
return res, fmt.Errorf("error unmarshalling service status: %v", err)
|
||||
}
|
||||
thisSvcStatus, ok := (*svcStatus)[egressServiceName]
|
||||
if !ok {
|
||||
logger.Debugf("state Secret %s does not yet have status for egress service %s", secret.Name, egressServiceName)
|
||||
continue
|
||||
}
|
||||
if !strings.EqualFold(podIP, thisSvcStatus.PodIP) {
|
||||
logger.Debugf("got Pod IP %s, want Pod IP %s, not yet ready", thisSvcStatus.PodIP, podIP)
|
||||
continue
|
||||
}
|
||||
if !strings.EqualFold(wantsEgressCfg.TailnetTarget.IP, thisSvcStatus.TailnetTarget.IP) {
|
||||
logger.Debugf("got tailnet target IP %s, want %s, not yet ready", thisSvcStatus.TailnetTarget.IP, wantsEgressCfg.TailnetTarget.IP)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(wantsEgressCfg.Ports, thisSvcStatus.Ports) {
|
||||
logger.Debugf("got ports %+#v, wants ports %+#v", thisSvcStatus.Ports, wantsEgressCfg.Ports)
|
||||
continue
|
||||
}
|
||||
// appears like the proxy's firewall should be ready to route traffic for this egress service
|
||||
newEndpoints = append(newEndpoints, discoveryv1.Endpoint{
|
||||
Hostname: (*string)(&pod.UID),
|
||||
Addresses: []string{podIP},
|
||||
Conditions: discoveryv1.EndpointConditions{
|
||||
Ready: ptr.To(true),
|
||||
Serving: ptr.To(true),
|
||||
Terminating: ptr.To(false),
|
||||
},
|
||||
})
|
||||
}
|
||||
eps.Endpoints = newEndpoints
|
||||
if !reflect.DeepEqual(eps, oldEps) {
|
||||
if err := ehr.Update(ctx, eps); err != nil {
|
||||
return res, fmt.Errorf("error updating EndpointSlice: %v", err)
|
||||
}
|
||||
}
|
||||
// TODO: or maybe do this elsewhere
|
||||
if len(eps.Endpoints) > 0 {
|
||||
extSvcName := eps.Labels["tailscale.com/external-service-name"]
|
||||
extSvcNamespace := eps.Labels["tailscale.com/external-service-namespace"]
|
||||
svc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: extSvcName,
|
||||
Namespace: extSvcNamespace,
|
||||
},
|
||||
}
|
||||
if err := ehr.Get(ctx, client.ObjectKeyFromObject(svc), svc); err != nil {
|
||||
// unexpected
|
||||
return res, fmt.Errorf("error getting ExternalName Service %s/%s: %w", extSvcName, extSvcNamespace, err)
|
||||
}
|
||||
clusterSvcFQDN := fmt.Sprintf("%s.tailscale.svc.cluster.local", egressServiceName)
|
||||
if !strings.EqualFold(svc.Spec.ExternalName, clusterSvcFQDN) {
|
||||
svc.Spec.ExternalName = clusterSvcFQDN
|
||||
if err := ehr.Update(ctx, svc); err != nil {
|
||||
return res, fmt.Errorf("error updating ExternalName service %s/%s: %w", extSvcName, extSvcNamespace, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
209
cmd/k8s-operator/egressha.go
Normal file
209
cmd/k8s-operator/egressha.go
Normal file
@ -0,0 +1,209 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"reflect"
|
||||
|
||||
"go.uber.org/zap"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
kube "tailscale.com/k8s-operator"
|
||||
"tailscale.com/util/mak"
|
||||
)
|
||||
|
||||
// Reconciles Services with tailscale.com/tailnet-ip annotation and
|
||||
// tailscale.com/proxy-group label.
|
||||
type egressHAServiceReconciler struct {
|
||||
client.Client
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func (ehr *egressHAServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
|
||||
logger := ehr.logger.With("Service", req.NamespacedName)
|
||||
logger.Debugf("starting reconcile")
|
||||
defer logger.Debugf("reconcile finished")
|
||||
|
||||
svc := new(corev1.Service)
|
||||
err = ehr.Get(ctx, req.NamespacedName, svc)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("Service not found")
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return reconcile.Result{}, fmt.Errorf("failed to get Service: %w", err)
|
||||
}
|
||||
if !svc.DeletionTimestamp.IsZero() {
|
||||
logger.Debugf("Service is being deleted")
|
||||
// TODO: cleanup
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// TODO: probably will have to switch to an annotation as else it's too confusing
|
||||
proxyGroupName := svc.Labels["tailscale.com/proxy-group"]
|
||||
if proxyGroupName == "" {
|
||||
logger.Debugf("not reconciling Service without tailscale.com/proxy-group label")
|
||||
return res, nil
|
||||
}
|
||||
// TODO: also validate that the ProxyGroup is for egress service type
|
||||
|
||||
tailnetIP := svc.Annotations["tailscale.com/tailnet-ip"]
|
||||
if tailnetIP == "" {
|
||||
logger.Debugf("not reconciling Service without tailscale.com/tailnet-ip annotation")
|
||||
return res, nil
|
||||
}
|
||||
// get the egress services config for these proxies
|
||||
cm := &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-egress-services", proxyGroupName),
|
||||
Namespace: "tailscale", // hardcoded for this prototype
|
||||
},
|
||||
}
|
||||
err = ehr.Get(ctx, client.ObjectKeyFromObject(cm), cm)
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("egress services ConfigMap for %s not yet created, waiting", proxyGroupName)
|
||||
return res, nil
|
||||
}
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error retrieving egress service config map for %s", proxyGroupName)
|
||||
}
|
||||
oldCM := cm.DeepCopy()
|
||||
config := &kube.EgressServices{}
|
||||
if len(cm.BinaryData["cfg"]) != 0 {
|
||||
if err := json.Unmarshal(cm.BinaryData["cfg"], config); err != nil {
|
||||
return res, fmt.Errorf("error unmarshaling egress services config %v: %v", cm.BinaryData["cfg"], err)
|
||||
}
|
||||
}
|
||||
|
||||
svcConfig := kube.EgressService{
|
||||
TailnetTarget: kube.TailnetTarget{
|
||||
IP: tailnetIP,
|
||||
},
|
||||
Ports: []kube.PortMap{},
|
||||
}
|
||||
|
||||
oldSvcSpec := svc.DeepCopy()
|
||||
// TODO: only do this stuff if needed
|
||||
svcList := &corev1.ServiceList{}
|
||||
if err := ehr.List(ctx, svcList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": proxyGroupName})); err != nil {
|
||||
return res, fmt.Errorf("error listing Services: %v", err)
|
||||
}
|
||||
usedPorts := sets.NewInt32()
|
||||
for _, s := range svcList.Items {
|
||||
for _, p := range s.Spec.Ports {
|
||||
usedPorts.Insert(p.Port)
|
||||
}
|
||||
}
|
||||
// loop over ports, for each port that does not yet have a target port set, allocate one
|
||||
epsPorts := []discoveryv1.EndpointPort{}
|
||||
for i, portmap := range svc.Spec.Ports {
|
||||
if portmap.TargetPort.String() == "" || portmap.TargetPort.IntVal == portmap.Port {
|
||||
logger.Debugf("need to allocate target port for port %d", portmap.Port)
|
||||
// TODO: this is why tailscale.com/proxy-group has to be a label- but we can instead add markers in cache and make it an annotation
|
||||
// get a random port
|
||||
foundFreePort := false
|
||||
var suggestPort int32 = 0
|
||||
for !foundFreePort {
|
||||
suggestPort = rand.Int32N(4000) + 1 // don't want 0, otherwise doesn't matter, we're root in the container and this is not going to be a sidecar
|
||||
if !usedPorts.Has(suggestPort) {
|
||||
foundFreePort = true
|
||||
}
|
||||
}
|
||||
svc.Spec.Ports[i].TargetPort = intstr.FromInt32(suggestPort)
|
||||
}
|
||||
svcConfig.Ports = append(svcConfig.Ports, kube.PortMap{Src: uint16(portmap.Port), Protocol: string(portmap.Protocol), Dst: uint16(svc.Spec.Ports[i].TargetPort.IntVal)})
|
||||
epsPorts = append(epsPorts, discoveryv1.EndpointPort{Protocol: &portmap.Protocol, Port: &svc.Spec.Ports[i].TargetPort.IntVal, Name: &svc.Spec.Ports[i].Name})
|
||||
}
|
||||
if !reflect.DeepEqual(oldSvcSpec, svc.Spec) {
|
||||
// update ports only
|
||||
if _, err := createOrUpdate(ctx, ehr.Client, svc.Namespace, svc, func(s *corev1.Service) { s.Spec.Ports = svc.Spec.Ports }); err != nil {
|
||||
return res, fmt.Errorf("error updating Service: %v", err)
|
||||
}
|
||||
} else {
|
||||
logger.Debugf("update to service not needed")
|
||||
}
|
||||
// update configmap
|
||||
egressSvcName := fmt.Sprintf("%s-%s", svc.Name, svc.Namespace) // TODO: or hostname
|
||||
mak.Set(config, egressSvcName, svcConfig)
|
||||
bs, err := json.Marshal(config)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("error updating service config: %v", err)
|
||||
}
|
||||
mak.Set(&cm.BinaryData, "cfg", bs)
|
||||
if !reflect.DeepEqual(cm, oldCM) {
|
||||
if err := ehr.Update(ctx, cm); err != nil {
|
||||
return res, fmt.Errorf("error updating ConfigMap: %v", err)
|
||||
}
|
||||
}
|
||||
logger.Debugf("updating EndpointSlice, line 151")
|
||||
// ensure EndpointSlice
|
||||
// TODO: ports?
|
||||
eps := &discoveryv1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: egressSvcName,
|
||||
Namespace: "tailscale",
|
||||
Labels: map[string]string{
|
||||
"tailscale.com/egress-service": egressSvcName,
|
||||
"tailscale.com/proxy-group": proxyGroupName,
|
||||
"tailscale.com/external-service-name": svc.Name,
|
||||
"tailscale.com/external-service-namespace": svc.Namespace,
|
||||
"kubernetes.io/service-name": egressSvcName,
|
||||
},
|
||||
},
|
||||
AddressType: "IPv4",
|
||||
Ports: epsPorts,
|
||||
}
|
||||
err = ehr.Get(ctx, client.ObjectKeyFromObject(eps), &discoveryv1.EndpointSlice{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("creating EndpointSlice")
|
||||
if err := ehr.Create(ctx, eps); err != nil {
|
||||
logger.Debugf("error creating EndpointSlice: %v", err)
|
||||
return res, fmt.Errorf("error creating EndpointSlice: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return res, fmt.Errorf("error retrieving EnpointSlice %s: %w", eps.Name, err)
|
||||
}
|
||||
// TODO: deal with port update
|
||||
logger.Debugf("updating ClusterIP Service, line 174")
|
||||
|
||||
// TODO: will need to generate a different name for the ClusterIP
|
||||
// service as else this will prevent from creating egresses in ts
|
||||
// namespace. ensure ClusterIP Service
|
||||
clusterIPSvc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: egressSvcName,
|
||||
Namespace: "tailscale",
|
||||
Labels: map[string]string{"tailscale.com/egress-service": egressSvcName,
|
||||
"tailscale.com/proxy-group": proxyGroupName,
|
||||
"tailscale.com/external-service-name": svc.Name,
|
||||
"tailscale.com/external-service-namespace": svc.Namespace,
|
||||
},
|
||||
},
|
||||
Spec: corev1.ServiceSpec{Ports: svc.Spec.Ports, Type: corev1.ServiceTypeClusterIP},
|
||||
}
|
||||
// TODO: deal with ports update
|
||||
err = ehr.Client.Get(ctx, client.ObjectKeyFromObject(clusterIPSvc), &corev1.Service{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.Debugf("creating ClusterIP Service")
|
||||
if err := ehr.Create(ctx, clusterIPSvc); err != nil {
|
||||
logger.Debugf("error creating ClusterIP Service: %v", err)
|
||||
return res, fmt.Errorf("error creating ClusterIP Service: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return res, fmt.Errorf("error retrieving ClusterIP Service: %v", err)
|
||||
}
|
||||
return res, nil
|
||||
}
|
@ -40,7 +40,6 @@
|
||||
"tailscale.com/ipn/store/kubestore"
|
||||
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
|
||||
"tailscale.com/tsnet"
|
||||
"tailscale.com/tstime"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/version"
|
||||
)
|
||||
@ -235,6 +234,7 @@ func runReconcilers(opts reconcilerOpts) {
|
||||
Cache: cache.Options{
|
||||
ByObject: map[client.Object]cache.ByObject{
|
||||
&corev1.Secret{}: nsFilter,
|
||||
&corev1.Pod{}: nsFilter,
|
||||
&corev1.ServiceAccount{}: nsFilter,
|
||||
&corev1.ConfigMap{}: nsFilter,
|
||||
&appsv1.StatefulSet{}: nsFilter,
|
||||
@ -249,145 +249,35 @@ func runReconcilers(opts reconcilerOpts) {
|
||||
startlog.Fatalf("could not create manager: %v", err)
|
||||
}
|
||||
|
||||
svcFilter := handler.EnqueueRequestsFromMapFunc(serviceHandler)
|
||||
svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc"))
|
||||
// If a ProxyClass changes, enqueue all Services labeled with that
|
||||
// ProxyClass's name.
|
||||
proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog))
|
||||
|
||||
eventRecorder := mgr.GetEventRecorderFor("tailscale-operator")
|
||||
ssr := &tailscaleSTSReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
tsnetServer: opts.tsServer,
|
||||
tsClient: opts.tsClient,
|
||||
defaultTags: strings.Split(opts.proxyTags, ","),
|
||||
operatorNamespace: opts.tailscaleNamespace,
|
||||
proxyImage: opts.proxyImage,
|
||||
proxyPriorityClassName: opts.proxyPriorityClassName,
|
||||
tsFirewallMode: opts.proxyFirewallMode,
|
||||
}
|
||||
svcFilter := handler.EnqueueRequestsFromMapFunc(egressHAServiceHandler)
|
||||
err = builder.
|
||||
ControllerManagedBy(mgr).
|
||||
Named("service-reconciler").
|
||||
Named("egress-ha-service-reconciler").
|
||||
Watches(&corev1.Service{}, svcFilter).
|
||||
Watches(&appsv1.StatefulSet{}, svcChildFilter).
|
||||
Watches(&corev1.Secret{}, svcChildFilter).
|
||||
Watches(&tsapi.ProxyClass{}, proxyClassFilterForSvc).
|
||||
Complete(&ServiceReconciler{
|
||||
ssr: ssr,
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("service-reconciler"),
|
||||
isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer,
|
||||
recorder: eventRecorder,
|
||||
tsNamespace: opts.tailscaleNamespace,
|
||||
clock: tstime.DefaultClock{},
|
||||
proxyDefaultClass: opts.proxyDefaultClass,
|
||||
Complete(&egressHAServiceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("egress-ha-service-reconciler"),
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatalf("could not create service reconciler: %v", err)
|
||||
}
|
||||
ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress"))
|
||||
// If a ProxyClassChanges, enqueue all Ingresses labeled with that
|
||||
// ProxyClass's name.
|
||||
proxyClassFilterForIngress := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForIngress(mgr.GetClient(), startlog))
|
||||
// Enque Ingress if a managed Service or backend Service associated with a tailscale Ingress changes.
|
||||
svcHandlerForIngress := handler.EnqueueRequestsFromMapFunc(serviceHandlerForIngress(mgr.GetClient(), startlog))
|
||||
err = builder.
|
||||
ControllerManagedBy(mgr).
|
||||
For(&networkingv1.Ingress{}).
|
||||
Watches(&appsv1.StatefulSet{}, ingressChildFilter).
|
||||
Watches(&corev1.Secret{}, ingressChildFilter).
|
||||
Watches(&corev1.Service{}, svcHandlerForIngress).
|
||||
Watches(&tsapi.ProxyClass{}, proxyClassFilterForIngress).
|
||||
Complete(&IngressReconciler{
|
||||
ssr: ssr,
|
||||
recorder: eventRecorder,
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("ingress-reconciler"),
|
||||
proxyDefaultClass: opts.proxyDefaultClass,
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatalf("could not create ingress reconciler: %v", err)
|
||||
startlog.Fatalf("could not create egress-ha-service reconciler: %v", err)
|
||||
}
|
||||
|
||||
connectorFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("connector"))
|
||||
// If a ProxyClassChanges, enqueue all Connectors that have
|
||||
// .spec.proxyClass set to the name of this ProxyClass.
|
||||
proxyClassFilterForConnector := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForConnector(mgr.GetClient(), startlog))
|
||||
err = builder.ControllerManagedBy(mgr).
|
||||
For(&tsapi.Connector{}).
|
||||
Watches(&appsv1.StatefulSet{}, connectorFilter).
|
||||
Watches(&corev1.Secret{}, connectorFilter).
|
||||
Watches(&tsapi.ProxyClass{}, proxyClassFilterForConnector).
|
||||
Complete(&ConnectorReconciler{
|
||||
ssr: ssr,
|
||||
recorder: eventRecorder,
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("connector-reconciler"),
|
||||
clock: tstime.DefaultClock{},
|
||||
epsFilter := handler.EnqueueRequestsFromMapFunc(egressHAEPSHandler)
|
||||
assocResourceFilter := handler.EnqueueRequestsFromMapFunc(serviceHandlerForEgressProxyGroupPods(mgr.GetClient(), opts.log))
|
||||
err = builder.
|
||||
ControllerManagedBy(mgr).
|
||||
Named("egress-ha-eps-reconciler").
|
||||
Watches(&discoveryv1.EndpointSlice{}, epsFilter).
|
||||
Watches(&corev1.Pod{}, assocResourceFilter).
|
||||
Watches(&corev1.Secret{}, assocResourceFilter).
|
||||
Complete(&egressHAEndpointSliceReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("egress-ha-eps-reconciler"),
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatalf("could not create connector reconciler: %v", err)
|
||||
}
|
||||
// TODO (irbekrm): switch to metadata-only watches for resources whose
|
||||
// spec we don't need to inspect to reduce memory consumption.
|
||||
// https://github.com/kubernetes-sigs/controller-runtime/issues/1159
|
||||
nameserverFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("nameserver"))
|
||||
err = builder.ControllerManagedBy(mgr).
|
||||
For(&tsapi.DNSConfig{}).
|
||||
Watches(&appsv1.Deployment{}, nameserverFilter).
|
||||
Watches(&corev1.ConfigMap{}, nameserverFilter).
|
||||
Watches(&corev1.Service{}, nameserverFilter).
|
||||
Watches(&corev1.ServiceAccount{}, nameserverFilter).
|
||||
Complete(&NameserverReconciler{
|
||||
recorder: eventRecorder,
|
||||
tsNamespace: opts.tailscaleNamespace,
|
||||
Client: mgr.GetClient(),
|
||||
logger: opts.log.Named("nameserver-reconciler"),
|
||||
clock: tstime.DefaultClock{},
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatalf("could not create nameserver reconciler: %v", err)
|
||||
}
|
||||
err = builder.ControllerManagedBy(mgr).
|
||||
For(&tsapi.ProxyClass{}).
|
||||
Complete(&ProxyClassReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
recorder: eventRecorder,
|
||||
logger: opts.log.Named("proxyclass-reconciler"),
|
||||
clock: tstime.DefaultClock{},
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatal("could not create proxyclass reconciler: %v", err)
|
||||
}
|
||||
logger := startlog.Named("dns-records-reconciler-event-handlers")
|
||||
// On EndpointSlice events, if it is an EndpointSlice for an
|
||||
// ingress/egress proxy headless Service, reconcile the headless
|
||||
// Service.
|
||||
dnsRREpsOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerEndpointSliceHandler)
|
||||
// On DNSConfig changes, reconcile all headless Services for
|
||||
// ingress/egress proxies in operator namespace.
|
||||
dnsRRDNSConfigOpts := handler.EnqueueRequestsFromMapFunc(enqueueAllIngressEgressProxySvcsInNS(opts.tailscaleNamespace, mgr.GetClient(), logger))
|
||||
// On Service events, if it is an ingress/egress proxy headless Service, reconcile it.
|
||||
dnsRRServiceOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerServiceHandler)
|
||||
// On Ingress events, if it is a tailscale Ingress or if tailscale is the default ingress controller, reconcile the proxy
|
||||
// headless Service.
|
||||
dnsRRIngressOpts := handler.EnqueueRequestsFromMapFunc(dnsRecordsReconcilerIngressHandler(opts.tailscaleNamespace, opts.proxyActAsDefaultLoadBalancer, mgr.GetClient(), logger))
|
||||
err = builder.ControllerManagedBy(mgr).
|
||||
Named("dns-records-reconciler").
|
||||
Watches(&corev1.Service{}, dnsRRServiceOpts).
|
||||
Watches(&networkingv1.Ingress{}, dnsRRIngressOpts).
|
||||
Watches(&discoveryv1.EndpointSlice{}, dnsRREpsOpts).
|
||||
Watches(&tsapi.DNSConfig{}, dnsRRDNSConfigOpts).
|
||||
Complete(&dnsRecordsReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
tsNamespace: opts.tailscaleNamespace,
|
||||
logger: opts.log.Named("dns-records-reconciler"),
|
||||
isDefaultLoadBalancer: opts.proxyActAsDefaultLoadBalancer,
|
||||
})
|
||||
if err != nil {
|
||||
startlog.Fatalf("could not create DNS records reconciler: %v", err)
|
||||
startlog.Fatalf("could not create egress-ha-endpointslice reconciler: %v", err)
|
||||
}
|
||||
|
||||
startlog.Infof("Startup complete, operator running, version: %s", version.Long())
|
||||
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
|
||||
startlog.Fatalf("could not start manager: %v", err)
|
||||
@ -658,6 +548,65 @@ func serviceHandlerForIngress(cl client.Client, logger *zap.SugaredLogger) handl
|
||||
return reqs
|
||||
}
|
||||
}
|
||||
func egressHAServiceHandler(_ context.Context, o client.Object) []reconcile.Request {
|
||||
_, ok := o.GetLabels()["tailscale.com/proxy-group"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
_, ok = o.GetAnnotations()["tailscale.com/tailnet-ip"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// If this is not a managed Service we want to enqueue it
|
||||
return []reconcile.Request{
|
||||
{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: o.GetNamespace(),
|
||||
Name: o.GetName(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
func egressHAEPSHandler(_ context.Context, o client.Object) []reconcile.Request {
|
||||
_, ok := o.GetLabels()["tailscale.com/egress-service"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// If this is not a managed Service we want to enqueue it
|
||||
return []reconcile.Request{
|
||||
{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: o.GetNamespace(),
|
||||
Name: o.GetName(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// On egress ProxyGroup Pod events, reconcile all EnpdointSlices for egress services exposed on that ProxyGroup
|
||||
func serviceHandlerForEgressProxyGroupPods(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
|
||||
return func(_ context.Context, o client.Object) []reconcile.Request {
|
||||
// TODO: type: egress
|
||||
pg, ok := o.GetLabels()["tailscale.com/proxy-group"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
epsList := discoveryv1.EndpointSliceList{}
|
||||
if err := cl.List(context.Background(), &epsList, client.MatchingLabels(map[string]string{"tailscale.com/proxy-group": pg})); err != nil {
|
||||
logger.Debugf("error listing endpointslices: %v", err)
|
||||
}
|
||||
reqs := make([]reconcile.Request, 0)
|
||||
for _, ep := range epsList.Items {
|
||||
reqs = append(reqs, reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: ep.Namespace,
|
||||
Name: ep.Name,
|
||||
},
|
||||
})
|
||||
}
|
||||
return reqs
|
||||
}
|
||||
}
|
||||
|
||||
func serviceHandler(_ context.Context, o client.Object) []reconcile.Request {
|
||||
if isManagedByType(o, "svc") {
|
||||
|
@ -53,7 +53,7 @@ func (s *Store) String() string { return "kube.Store" }
|
||||
|
||||
// ReadState implements the StateStore interface.
|
||||
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
secret, err := s.client.GetSecret(ctx, s.secretName)
|
||||
@ -83,7 +83,7 @@ func sanitizeKey(k ipn.StateKey) string {
|
||||
|
||||
// WriteState implements the StateStore interface.
|
||||
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
secret, err := s.client.GetSecret(ctx, s.secretName)
|
||||
|
33
k8s-operator/egressservices.go
Normal file
33
k8s-operator/egressservices.go
Normal file
@ -0,0 +1,33 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package kube
|
||||
|
||||
// TODO: figure out how to build a mechanism to dynamically update iptables/nftables rules
|
||||
type EgressServices map[string]EgressService
|
||||
|
||||
type EgressService struct {
|
||||
TailnetTarget TailnetTarget `json:"tailnetTarget"`
|
||||
Ports []PortMap `json:"ports"`
|
||||
}
|
||||
|
||||
type TailnetTarget struct {
|
||||
IP string `json:"ip,omitempty"`
|
||||
FQDN string `json:"fqdn,omitempty"`
|
||||
}
|
||||
|
||||
type PortMap struct {
|
||||
Protocol string `json:"protocol"`
|
||||
Src uint16 `json:"src"`
|
||||
Dst uint16 `json:"dst"`
|
||||
}
|
||||
|
||||
type EgressServicesStatus map[string]EgressServiceStatus
|
||||
|
||||
type EgressServiceStatus struct {
|
||||
PodIP string `json:"podIP"`
|
||||
TailnetTarget TailnetTarget `json:"tailnetTarget"`
|
||||
Ports []PortMap `json:"ports"`
|
||||
}
|
@ -253,7 +253,7 @@ type JSONPatch struct {
|
||||
// It currently (2023-03-02) only supports "add" and "remove" operations.
|
||||
func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error {
|
||||
for _, p := range patch {
|
||||
if p.Op != "remove" && p.Op != "add" {
|
||||
if p.Op != "remove" && p.Op != "add" && p.Op != "replace" {
|
||||
panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op))
|
||||
}
|
||||
}
|
||||
|
@ -371,6 +371,28 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error {
|
||||
return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String())
|
||||
}
|
||||
|
||||
// For this prototype only - for the real implementation we will probably split
|
||||
// this into custom chains to make this more readable/easier update-eable.
|
||||
func (i *iptablesRunner) AddDNATRuleForSrcPrt(origDst, dst netip.Addr, srcPrt, dstPrt uint16, proto uint8) error {
|
||||
table := i.getIPTByAddr(dst)
|
||||
protoS, err := protoName(proto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return table.Insert("nat", "PREROUTING", 1, "-p", protoS, "--dport", fmt.Sprintf("%d", srcPrt), "--destination", origDst.String(), "-j", "DNAT", "--to-destination", fmt.Sprintf("%v:%v", dst, dstPrt))
|
||||
}
|
||||
|
||||
func protoName(proto uint8) (string, error) {
|
||||
switch proto {
|
||||
case 6:
|
||||
return "tcp", nil
|
||||
case 11:
|
||||
return "udp", nil
|
||||
default:
|
||||
return "", fmt.Errorf("unrecognized protocol code: %d", proto)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *iptablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error {
|
||||
table := i.getIPTByAddr(dst)
|
||||
return table.Insert("nat", "POSTROUTING", 1, "--destination", dst.String(), "-j", "SNAT", "--to-source", src.String())
|
||||
|
@ -16,6 +16,7 @@
|
||||
"strings"
|
||||
|
||||
"github.com/google/nftables"
|
||||
"github.com/google/nftables/binaryutil"
|
||||
"github.com/google/nftables/expr"
|
||||
"golang.org/x/sys/unix"
|
||||
"tailscale.com/net/tsaddr"
|
||||
@ -101,6 +102,72 @@ func (n *nftablesRunner) ensurePreroutingChain(dst netip.Addr) (*nftables.Table,
|
||||
}
|
||||
return nat, preroutingCh, nil
|
||||
}
|
||||
func (n *nftablesRunner) AddDNATRuleForSrcPrt(origDst netip.Addr, dst netip.Addr, origPrt, dstPrt uint16, proto uint8) error {
|
||||
nat, preroutingCh, err := n.ensurePreroutingChain(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var daddrOffset, fam, dadderLen uint32
|
||||
if origDst.Is4() {
|
||||
daddrOffset = 16
|
||||
dadderLen = 4
|
||||
fam = unix.NFPROTO_IPV4
|
||||
} else {
|
||||
daddrOffset = 24
|
||||
dadderLen = 16
|
||||
fam = unix.NFPROTO_IPV6
|
||||
}
|
||||
dnatRule := &nftables.Rule{
|
||||
Table: nat,
|
||||
Chain: preroutingCh,
|
||||
Exprs: []expr.Any{
|
||||
&expr.Payload{
|
||||
DestRegister: 1,
|
||||
Base: expr.PayloadBaseNetworkHeader,
|
||||
Offset: daddrOffset,
|
||||
Len: dadderLen,
|
||||
},
|
||||
&expr.Cmp{
|
||||
Op: expr.CmpOpEq,
|
||||
Register: 1,
|
||||
Data: origDst.AsSlice(),
|
||||
},
|
||||
&expr.Meta{Key: expr.MetaKeyL4PROTO, Register: 1},
|
||||
&expr.Cmp{
|
||||
Op: expr.CmpOpEq,
|
||||
Register: 1,
|
||||
Data: []byte{proto},
|
||||
},
|
||||
&expr.Payload{
|
||||
DestRegister: 1,
|
||||
Base: expr.PayloadBaseTransportHeader,
|
||||
Offset: 2,
|
||||
Len: 2,
|
||||
},
|
||||
&expr.Cmp{
|
||||
Op: expr.CmpOpEq,
|
||||
Register: 1,
|
||||
Data: binaryutil.BigEndian.PutUint16(origPrt),
|
||||
},
|
||||
&expr.Immediate{
|
||||
Register: 1,
|
||||
Data: dst.AsSlice(),
|
||||
},
|
||||
&expr.Immediate{
|
||||
Register: 2,
|
||||
Data: binaryutil.BigEndian.PutUint16(dstPrt),
|
||||
},
|
||||
&expr.NAT{
|
||||
Type: expr.NATTypeDestNAT,
|
||||
Family: fam,
|
||||
RegAddrMin: 1,
|
||||
RegProtoMin: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
n.conn.InsertRule(dnatRule)
|
||||
return n.conn.Flush()
|
||||
}
|
||||
|
||||
func (n *nftablesRunner) AddDNATRule(origDst netip.Addr, dst netip.Addr) error {
|
||||
nat, preroutingCh, err := n.ensurePreroutingChain(dst)
|
||||
@ -581,6 +648,8 @@ type NetfilterRunner interface {
|
||||
// DelMagicsockPortRule removes the rule created by AddMagicsockPortRule,
|
||||
// if it exists.
|
||||
DelMagicsockPortRule(port uint16, network string) error
|
||||
// DNAT traffic to originDst:originPrt to dst:dstPort
|
||||
AddDNATRuleForSrcPrt(origDst, dst netip.Addr, originPrt, dstPrt uint16, proto uint8) error
|
||||
}
|
||||
|
||||
// New creates a NetfilterRunner, auto-detecting whether to use
|
||||
|
Loading…
Reference in New Issue
Block a user