Code review feedback

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina 2024-09-27 19:43:46 +01:00
parent 01cea89317
commit c8d3e16e1d
5 changed files with 56 additions and 59 deletions

View File

@ -158,7 +158,7 @@ func main() {
PodIP: defaultEnv("POD_IP", ""), PodIP: defaultEnv("POD_IP", ""),
EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false),
HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""),
EgressServicesPath: defaultEnv("TS_EGRESS_SERVICES_PATH", ""), EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""),
} }
if err := cfg.validate(); err != nil { if err := cfg.validate(); err != nil {
@ -276,10 +276,8 @@ authLoop:
switch *n.State { switch *n.State {
case ipn.NeedsLogin: case ipn.NeedsLogin:
if isOneStepConfig(cfg) { if isOneStepConfig(cfg) {
// This could happen if this is the // This could happen if this is the first time tailscaled was run for this
// first time tailscaled was run for // device and the auth key was not passed via the configfile.
// this device and the auth key was not
// passed via the configfile.
log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.") log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.")
} }
if err := authTailscale(); err != nil { if err := authTailscale(); err != nil {
@ -377,6 +375,9 @@ authLoop:
} }
}) })
) )
// egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+
// egress services in HA mode and errored.
var egressSvcsErrorChan = make(chan error)
defer t.Stop() defer t.Stop()
// resetTimer resets timer for when to next attempt to resolve the DNS // resetTimer resets timer for when to next attempt to resolve the DNS
// name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The // name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The
@ -582,36 +583,27 @@ runLoop:
} }
} }
if !startupTasksDone { if !startupTasksDone {
// For containerboot instances that act as TCP // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint
// proxies (proxying traffic to an endpoint // passed via one of the env vars that containerboot reads) and store state in a
// passed via one of the env vars that // Kubernetes Secret, we consider startup tasks done at the point when device info has
// containerboot reads) and store state in a // been successfully stored to state Secret. For all other containerboot instances, if
// Kubernetes Secret, we consider startup tasks // we just get to this point the startup tasks can be considered done.
// done at the point when device info has been
// successfully stored to state Secret.
// For all other containerboot instances, if we
// just get to this point the startup tasks can
// be considered done.
if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) {
// This log message is used in tests to detect when all // This log message is used in tests to detect when all
// post-auth configuration is done. // post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal") log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true startupTasksDone = true
// Configure egress proxy. Egress proxy // Configure egress proxy. Egress proxy will set up firewall rules to proxy
// will set up firewall rules to proxy // traffic to tailnet targets configured in the provided configuration file. It
// traffic to tailnet targets configured // will then continuously monitor the config file and netmap updates and
// in the provided configuration file. // reconfigure the firewall rules as needed. If any of its operations fail, it
// It will then continuously monitor the // will crash this node.
// config file and netmap updates and if cfg.EgressSvcsCfgPath != "" {
// reconfigure the firewall rules as log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath)
// needed. If any of its operations
// fail, it will crash this node.
if cfg.EgressServicesPath != "" {
log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressServicesPath)
egressSvcsNotify = make(chan ipn.Notify) egressSvcsNotify = make(chan ipn.Notify)
ep := egressProxy{ ep := egressProxy{
cfgPath: cfg.EgressServicesPath, cfgPath: cfg.EgressSvcsCfgPath,
nfr: nfr, nfr: nfr,
kc: kc, kc: kc,
stateSecret: cfg.KubeSecret, stateSecret: cfg.KubeSecret,
@ -619,16 +611,17 @@ runLoop:
podIP: cfg.PodIP, podIP: cfg.PodIP,
tailnetAddrs: addrs, tailnetAddrs: addrs,
} }
go ep.run(ctx, n) go func() {
if err := ep.run(ctx, n); err != nil {
egressSvcsErrorChan <- err
}
}()
} }
// Wait on tailscaled process. It won't // Wait on tailscaled process. It won't be cleaned up by default when the
// be cleaned up by default when the // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
// container exits as it is not PID1. // reaper by a running cmd.Wait in a goroutine immediately after starting
// TODO (irbekrm): perhaps we can // tailscaled?
// replace the reaper by a running
// cmd.Wait in a goroutine immediately
// after starting tailscaled?
reaper := func() { reaper := func() {
defer wg.Done() defer wg.Done()
for { for {
@ -666,6 +659,8 @@ runLoop:
} }
backendAddrs = newBackendAddrs backendAddrs = newBackendAddrs
resetTimer(false) resetTimer(false)
case e := <-egressSvcsErrorChan:
log.Fatalf("egress proxy failed: %v", e)
} }
} }
wg.Wait() wg.Wait()

View File

@ -63,7 +63,7 @@ type egressProxy struct {
// - the mounted egress config has changed // - the mounted egress config has changed
// - the proxy's tailnet IP addresses have changed // - the proxy's tailnet IP addresses have changed
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN // - tailnet IPs have changed for any backend targets specified by tailnet FQDN
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) { func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error {
var tickChan <-chan time.Time var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func // TODO (irbekrm): take a look if this can be pulled into a single func
@ -76,19 +76,19 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) {
} else { } else {
defer w.Close() defer w.Close()
if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil { if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil {
log.Fatalf("failed to add fsnotify watch: %v", err) return fmt.Errorf("failed to add fsnotify watch: %w", err)
} }
eventChan = w.Events eventChan = w.Events
} }
if err := ep.sync(ctx, n); err != nil { if err := ep.sync(ctx, n); err != nil {
log.Fatal(err) return err
} }
for { for {
var err error var err error
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return nil
case <-tickChan: case <-tickChan:
err = ep.sync(ctx, n) err = ep.sync(ctx, n)
case <-eventChan: case <-eventChan:
@ -102,14 +102,15 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) {
} }
} }
if err != nil { if err != nil {
log.Fatalf("error syncing egress service config: %v", err) return fmt.Errorf("error syncing egress service config: %w", err)
} }
} }
} }
// sync triggers an egress proxy config resync. The resync calculates the diff // sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if
// between config and status to determine if any firewall rules need to be // any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
// updated. // firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
// as failed firewall update
func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
cfgs, err := ep.getConfigs() cfgs, err := ep.getConfigs()
if err != nil { if err != nil {
@ -175,8 +176,8 @@ func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *e
mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete) mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete)
} }
if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) { if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) {
// For each tailnet target, set up SNAT from the local // For each tailnet target, set up SNAT from the local tailnet device address of the matching
// tailnet device address of the matching family. // family.
for _, t := range tailnetTargetIPs { for _, t := range tailnetTargetIPs {
if t.Is6() && !ep.nfr.HasIPV6NAT() { if t.Is6() && !ep.nfr.HasIPV6NAT() {
continue continue
@ -358,14 +359,15 @@ func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, e
} }
status := &egressservices.Status{} status := &egressservices.Status{}
raw, ok := secret.Data[egressservices.KeyEgressServices] raw, ok := secret.Data[egressservices.KeyEgressServices]
if ok { if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil { if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err) return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
} }
if reflect.DeepEqual(status.PodIP, ep.podIP) { if reflect.DeepEqual(status.PodIP, ep.podIP) {
return status, nil return status, nil
} }
}
return nil, nil return nil, nil
} }

View File

@ -64,7 +64,7 @@ type settings struct {
// target. // target.
PodIP string PodIP string
HealthCheckAddrPort string HealthCheckAddrPort string
EgressServicesPath string EgressSvcsCfgPath string
} }
func (s *settings) validate() error { func (s *settings) validate() error {
@ -199,7 +199,7 @@ func isOneStepConfig(cfg *settings) bool {
// as an L3 proxy, proxying to an endpoint provided via one of the config env // as an L3 proxy, proxying to an endpoint provided via one of the config env
// vars. // vars.
func isL3Proxy(cfg *settings) bool { func isL3Proxy(cfg *settings) bool {
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressServicesPath != "" return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != ""
} }
// hasKubeStateStore returns true if the state must be stored in a Kubernetes // hasKubeStateStore returns true if the state must be stored in a Kubernetes

View File

@ -258,7 +258,7 @@ type JSONPatch struct {
func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error {
for _, p := range patch { for _, p := range patch {
if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { if p.Op != "remove" && p.Op != "add" && p.Op != "replace" {
panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) return fmt.Errorf("unsupported JSON patch operation: %q", p.Op)
} }
} }
return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json"))

View File

@ -33,9 +33,9 @@ func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip
// DeleteMapRuleForSvc constructs a prerouting rule as would be created by // DeleteMapRuleForSvc constructs a prerouting rule as would be created by
// EnsurePortMapRuleForSvc with the provided args and, if such a rule exists, // EnsurePortMapRuleForSvc with the provided args and, if such a rule exists,
// deletes it. // deletes it.
func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error { func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP netip.Addr, pm PortMap) error {
table := i.getIPTByAddr(targetIP) table := i.getIPTByAddr(targetIP)
args := argsForPortMapRule(svc, tun, targetIP, pm) args := argsForPortMapRule(svc, excludeI, targetIP, pm)
exists, err := table.Exists("nat", "PREROUTING", args...) exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil { if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err) return fmt.Errorf("error checking if rule exists: %w", err)
@ -60,10 +60,10 @@ func (i *iptablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pms
return nil return nil
} }
func argsForPortMapRule(svc, tun string, targetIP netip.Addr, pm PortMap) []string { func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) []string {
c := commentForSvc(svc, pm) c := commentForSvc(svc, pm)
return []string{ return []string{
"!", "-i", tun, "!", "-i", excludeI,
"-p", pm.Protocol, "-p", pm.Protocol,
"--dport", fmt.Sprintf("%d", pm.MatchPort), "--dport", fmt.Sprintf("%d", pm.MatchPort),
"-m", "comment", "--comment", c, "-m", "comment", "--comment", c,