From 1b65630e8361c64fd7c0932c65c4484d9a8a6241 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 7 Dec 2022 12:29:45 -0800 Subject: [PATCH] cmd/containerboot: switch to IPN bus monitoring instead of polling. We still have to shell out to `tailscale up` because the container image's API includes "arbitrary flags to tailscale up", unfortunately. But this should still speed up startup a little, and also enables k8s-bound containers to update their device information as new netmap updates come in. Fixes #6657 Signed-off-by: David Anderson --- cmd/containerboot/kube.go | 10 +- cmd/containerboot/main.go | 226 +++++++++++++++++++++------------ cmd/containerboot/main_test.go | 220 +++++++++++++++++--------------- 3 files changed, 268 insertions(+), 188 deletions(-) diff --git a/cmd/containerboot/kube.go b/cmd/containerboot/kube.go index 03eff50c4..1e142ae73 100644 --- a/cmd/containerboot/kube.go +++ b/cmd/containerboot/kube.go @@ -22,6 +22,7 @@ "strings" "time" + "tailscale.com/tailcfg" "tailscale.com/util/multierr" ) @@ -140,9 +141,9 @@ func findKeyInKubeSecret(ctx context.Context, secretName string) (string, error) return "", nil } -// storeDeviceID writes deviceID into the "device_id" data field of -// the kube secret secretName. -func storeDeviceID(ctx context.Context, secretName, deviceID string) error { +// storeDeviceInfo writes deviceID into the "device_id" data field of the kube +// secret secretName. +func storeDeviceInfo(ctx context.Context, secretName string, deviceID tailcfg.StableNodeID, fqdn string) error { // First check if the secret exists at all. Even if running on // kubernetes, we do not necessarily store state in a k8s secret. req, err := http.NewRequest("GET", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s", kubeNamespace, secretName), nil) @@ -161,7 +162,8 @@ func storeDeviceID(ctx context.Context, secretName, deviceID string) error { m := map[string]map[string]string{ "stringData": { - "device_id": deviceID, + "device_id": string(deviceID), + "device_fqdn": fqdn, }, } var b bytes.Buffer diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 1e92f5237..8e9ff7185 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -60,7 +60,8 @@ "golang.org/x/sys/unix" "tailscale.com/client/tailscale" - "tailscale.com/ipn/ipnstate" + "tailscale.com/ipn" + "tailscale.com/util/deephash" ) func main() { @@ -152,46 +153,149 @@ func main() { log.Fatalf("failed to bring up tailscale: %v", err) } - st, err := authTailscaled(ctx, client, cfg) + w, err := client.WatchIPNBus(ctx, ipn.NotifyInitialNetMap|ipn.NotifyInitialPrefs|ipn.NotifyInitialState) if err != nil { - log.Fatalf("failed to auth tailscale: %v", err) + log.Fatalf("failed to watch tailscaled for updates: %v", err) } - if cfg.ProxyTo != "" { - if err := installIPTablesRule(ctx, cfg.ProxyTo, st.TailscaleIPs); err != nil { - log.Fatalf("installing proxy rules: %v", err) + // Because we're still shelling out to `tailscale up` to get access to its + // flag parser, we have to stop watching the IPN bus so that we can block on + // the subcommand without stalling anything. Then once it's done, we resume + // watching the bus. + // + // Depending on the requested mode of operation, this auth step happens at + // different points in containerboot's lifecycle, hence the helper function. + didLogin := false + authTailscale := func() error { + if didLogin { + return nil + } + didLogin = true + w.Close() + if err := tailscaleUp(ctx, cfg); err != nil { + return fmt.Errorf("failed to auth tailscale: %v", err) + } + w, err = client.WatchIPNBus(ctx, ipn.NotifyInitialNetMap|ipn.NotifyInitialState) + if err != nil { + return fmt.Errorf("rewatching tailscaled for updates after auth: %v", err) + } + return nil + } + + if !cfg.AuthOnce { + if err := authTailscale(); err != nil { + log.Fatalf("failed to auth tailscale: %v", err) } } - if cfg.InKubernetes && cfg.KubernetesCanPatch && cfg.KubeSecret != "" { - if err := storeDeviceID(ctx, cfg.KubeSecret, string(st.Self.ID)); err != nil { - log.Fatalf("storing device ID in kube secret: %v", err) + +authLoop: + for { + n, err := w.Next() + if err != nil { + log.Fatalf("failed to read from tailscaled: %v", err) } - if cfg.AuthOnce { - // We were told to only auth once, so any secret-bound - // authkey is no longer needed. We don't strictly need to - // wipe it, but it's good hygiene. - log.Printf("Deleting authkey from kube secret") - if err := deleteAuthKey(ctx, cfg.KubeSecret); err != nil { - log.Fatalf("deleting authkey from kube secret: %v", err) + + if n.State != nil { + switch *n.State { + case ipn.NeedsLogin: + if err := authTailscale(); err != nil { + log.Fatalf("failed to auth tailscale: %v", err) + } + case ipn.NeedsMachineAuth: + log.Printf("machine authorization required, please visit the admin panel") + case ipn.Running: + // Technically, all we want is to keep monitoring the bus for + // netmap updates. However, in order to make the container crash + // if tailscale doesn't initially come up, the watch has a + // startup deadline on it. So, we have to break out of this + // watch loop, cancel the watch, and watch again with no + // deadline to continue monitoring for changes. + break authLoop + default: + log.Printf("tailscaled in state %q, waiting", *n.State) } } } - log.Println("Startup complete, waiting for shutdown signal") - // Reap all processes, since we are PID1 and need to collect - // zombies. + w.Close() + + if cfg.InKubernetes && cfg.KubeSecret != "" && cfg.KubernetesCanPatch && cfg.AuthOnce { + // We were told to only auth once, so any secret-bound + // authkey is no longer needed. We don't strictly need to + // wipe it, but it's good hygiene. + log.Printf("Deleting authkey from kube secret") + if err := deleteAuthKey(ctx, cfg.KubeSecret); err != nil { + log.Fatalf("deleting authkey from kube secret: %v", err) + } + } + + w, err = client.WatchIPNBus(context.Background(), ipn.NotifyInitialNetMap|ipn.NotifyInitialState) + if err != nil { + log.Fatalf("rewatching tailscaled for updates after auth: %v", err) + } + + var ( + wantProxy = cfg.ProxyTo != "" + wantDeviceInfo = cfg.InKubernetes && cfg.KubeSecret != "" && cfg.KubernetesCanPatch + startupTasksDone = false + currentIPs deephash.Sum // tailscale IPs assigned to device + currentDeviceInfo deephash.Sum // device ID and fqdn + ) for { - var status unix.WaitStatus - pid, err := unix.Wait4(-1, &status, 0, nil) - if errors.Is(err, unix.EINTR) { - continue - } + n, err := w.Next() if err != nil { - log.Fatalf("Waiting for exited processes: %v", err) + log.Fatalf("failed to read from tailscaled: %v", err) } - if pid == daemonPid { - log.Printf("Tailscaled exited") - os.Exit(0) + + if n.State != nil && *n.State != ipn.Running { + // Something's gone wrong and we've left the authenticated state. + // Our container image never recovered gracefully from this, and the + // control flow required to make it work now is hard. So, just crash + // the container and rely on the container runtime to restart us, + // whereupon we'll go through initial auth again. + log.Fatalf("tailscaled left running state (now in state %q), exiting", *n.State) + } + if n.NetMap != nil { + if cfg.ProxyTo != "" && len(n.NetMap.Addresses) > 0 && deephash.Update(¤tIPs, &n.NetMap.Addresses) { + if err := installIPTablesRule(ctx, cfg.ProxyTo, n.NetMap.Addresses); err != nil { + log.Fatalf("installing proxy rules: %v", err) + } + } + deviceInfo := []any{n.NetMap.SelfNode.StableID, n.NetMap.SelfNode.Name} + if cfg.InKubernetes && cfg.KubernetesCanPatch && cfg.KubeSecret != "" && deephash.Update(¤tDeviceInfo, &deviceInfo) { + if err := storeDeviceInfo(ctx, cfg.KubeSecret, n.NetMap.SelfNode.StableID, n.NetMap.SelfNode.Name); err != nil { + log.Fatalf("storing device ID in kube secret: %v", err) + } + } + } + if !startupTasksDone { + if (!wantProxy || currentIPs != deephash.Sum{}) && (!wantDeviceInfo || currentDeviceInfo != deephash.Sum{}) { + // This log message is used in tests to detect when all + // post-auth configuration is done. + log.Println("Startup complete, waiting for shutdown signal") + startupTasksDone = true + + // Reap all processes, since we are PID1 and need to collect zombies. We can + // only start doing this once we've stopped shelling out to things + // `tailscale up`, otherwise this goroutine can reap the CLI subprocesses + // and wedge bringup. + go func() { + for { + var status unix.WaitStatus + pid, err := unix.Wait4(-1, &status, 0, nil) + if errors.Is(err, unix.EINTR) { + continue + } + if err != nil { + log.Fatalf("Waiting for exited processes: %v", err) + } + if pid == daemonPid { + log.Printf("Tailscaled exited") + os.Exit(0) + } + } + }() + } } } } @@ -242,58 +346,6 @@ func startTailscaled(ctx context.Context, cfg *settings) (*tailscale.LocalClient return tsClient, cmd.Process.Pid, nil } -// startAndAuthTailscaled starts the tailscale daemon and attempts to -// auth it, according to the settings in cfg. If successful, returns -// tailscaled's Status and pid. -func authTailscaled(ctx context.Context, client *tailscale.LocalClient, cfg *settings) (*ipnstate.Status, error) { - didLogin := false - if !cfg.AuthOnce { - if err := tailscaleUp(ctx, cfg); err != nil { - return nil, fmt.Errorf("couldn't log in: %v", err) - } - didLogin = true - } - - // Poll for daemon state until it goes to either Running or - // NeedsLogin. The latter only happens if cfg.AuthOnce is true, - // because in that case we only try to auth when it's necessary to - // reach the running state. - for { - if ctx.Err() != nil { - return nil, ctx.Err() - } - - loopCtx, cancel := context.WithTimeout(ctx, time.Second) - st, err := client.Status(loopCtx) - cancel() - if err != nil { - return nil, fmt.Errorf("Getting tailscaled state: %w", err) - } - - switch st.BackendState { - case "Running": - if len(st.TailscaleIPs) > 0 { - return st, nil - } - log.Printf("No Tailscale IPs assigned yet") - case "NeedsLogin": - if !didLogin { - // Alas, we cannot currently trigger an authkey login from - // LocalAPI, so we still have to shell out to the - // tailscale CLI for this bit. - if err := tailscaleUp(ctx, cfg); err != nil { - return nil, fmt.Errorf("couldn't log in: %v", err) - } - didLogin = true - } - default: - log.Printf("tailscaled in state %q, waiting", st.BackendState) - } - - time.Sleep(100 * time.Millisecond) - } -} - // tailscaledArgs uses cfg to construct the argv for tailscaled. func tailscaledArgs(cfg *settings) []string { args := []string{"--socket=" + cfg.Socket} @@ -428,7 +480,7 @@ func ensureIPForwarding(root, proxyTo, routes string) error { return nil } -func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Addr) error { +func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Prefix) error { dst, err := netip.ParseAddr(dstStr) if err != nil { return err @@ -438,16 +490,22 @@ func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Addr) argv0 = "ip6tables" } var local string - for _, ip := range tsIPs { - if ip.Is4() != dst.Is4() { + for _, pfx := range tsIPs { + if !pfx.IsSingleIP() { continue } - local = ip.String() + if pfx.Addr().Is4() != dst.Is4() { + continue + } + local = pfx.Addr().String() break } if local == "" { return fmt.Errorf("no tailscale IP matching family of %s found in %v", dstStr, tsIPs) } + // Technically, if the control server ever changes the IPs assigned to this + // node, we'll slowly accumulate iptables rules. This shouldn't happen, so + // for now we'll live with it. cmd := exec.CommandContext(ctx, argv0, "-t", "nat", "-I", "PREROUTING", "1", "-d", local, "-j", "DNAT", "--to-destination", dstStr) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index 8fb7c6c9f..94adf8b2e 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -31,9 +31,11 @@ "github.com/google/go-cmp/cmp" "golang.org/x/sys/unix" - "tailscale.com/ipn/ipnstate" + "tailscale.com/ipn" "tailscale.com/tailcfg" "tailscale.com/tstest" + "tailscale.com/types/netmap" + "tailscale.com/types/ptr" ) func TestContainerBoot(t *testing.T) { @@ -91,18 +93,13 @@ func TestContainerBoot(t *testing.T) { } argFile := filepath.Join(d, "args") - tsIPs := []netip.Addr{netip.MustParseAddr("100.64.0.1")} runningSockPath := filepath.Join(d, "tmp/tailscaled.sock") - // TODO: refactor this 1-2 stuff if we ever need a third - // step. Right now all of containerboot's modes either converge - // with no further interaction needed, or with one extra step - // only. type phase struct { - // Make LocalAPI report this status, then wait for the Wants below to be - // satisfied. A zero Status is a valid state for a just-started - // tailscaled. - Status ipnstate.Status + // If non-nil, send this IPN bus notification (and remember it as the + // initial update for any future new watchers, then wait for all the + // Waits below to be true before proceeding to the next phase. + Notify *ipn.Notify // WantCmds is the commands that containerboot should run in this phase. WantCmds []string @@ -113,6 +110,16 @@ type phase struct { // contents. WantFiles map[string]string } + runningNotify := &ipn.Notify{ + State: ptr.To(ipn.Running), + NetMap: &netmap.NetworkMap{ + SelfNode: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net", + }, + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }, + } tests := []struct { Name string Env map[string]string @@ -132,10 +139,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -153,10 +157,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -174,10 +175,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -195,10 +193,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, WantFiles: map[string]string{ "proc/sys/net/ipv4/ip_forward": "0", "proc/sys/net/ipv6/conf/all/forwarding": "0", @@ -221,10 +216,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, WantFiles: map[string]string{ "proc/sys/net/ipv4/ip_forward": "1", "proc/sys/net/ipv6/conf/all/forwarding": "0", @@ -247,10 +239,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, WantFiles: map[string]string{ "proc/sys/net/ipv4/ip_forward": "0", "proc/sys/net/ipv6/conf/all/forwarding": "1", @@ -273,10 +262,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, WantFiles: map[string]string{ "proc/sys/net/ipv4/ip_forward": "1", "proc/sys/net/ipv6/conf/all/forwarding": "1", @@ -299,10 +285,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, WantCmds: []string{ "/usr/bin/iptables -t nat -I PREROUTING 1 -d 100.64.0.1 -j DNAT --to-destination 1.2.3.4", }, @@ -322,18 +305,15 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "NeedsLogin", + Notify: &ipn.Notify{ + State: ptr.To(ipn.NeedsLogin), }, WantCmds: []string{ "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key", }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -357,16 +337,11 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - Self: &ipnstate.PeerStatus{ - ID: tailcfg.StableNodeID("myID"), - }, - }, + Notify: runningNotify, WantKubeSecret: map[string]string{ - "authkey": "tskey-key", - "device_id": "myID", + "authkey": "tskey-key", + "device_fqdn": "test-node.test.ts.net", + "device_id": "myID", }, }, }, @@ -389,13 +364,7 @@ type phase struct { WantKubeSecret: map[string]string{}, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - Self: &ipnstate.PeerStatus{ - ID: tailcfg.StableNodeID("myID"), - }, - }, + Notify: runningNotify, WantKubeSecret: map[string]string{}, }, }, @@ -421,8 +390,8 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "NeedsLogin", + Notify: &ipn.Notify{ + State: ptr.To(ipn.NeedsLogin), }, WantCmds: []string{ "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key", @@ -432,15 +401,56 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - Self: &ipnstate.PeerStatus{ - ID: tailcfg.StableNodeID("myID"), + Notify: runningNotify, + WantKubeSecret: map[string]string{ + "device_fqdn": "test-node.test.ts.net", + "device_id": "myID", + }, + }, + }, + }, + { + Name: "kube_storage_updates", + Env: map[string]string{ + "KUBERNETES_SERVICE_HOST": kube.Host, + "KUBERNETES_SERVICE_PORT_HTTPS": kube.Port, + }, + KubeSecret: map[string]string{ + "authkey": "tskey-key", + }, + Phases: []phase{ + { + WantCmds: []string{ + "/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking", + "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key", + }, + WantKubeSecret: map[string]string{ + "authkey": "tskey-key", + }, + }, + { + Notify: runningNotify, + WantKubeSecret: map[string]string{ + "authkey": "tskey-key", + "device_fqdn": "test-node.test.ts.net", + "device_id": "myID", + }, + }, + { + Notify: &ipn.Notify{ + State: ptr.To(ipn.Running), + NetMap: &netmap.NetworkMap{ + SelfNode: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("newID"), + Name: "new-name.test.ts.net", + }, + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, }, }, WantKubeSecret: map[string]string{ - "device_id": "myID", + "authkey": "tskey-key", + "device_fqdn": "new-name.test.ts.net", + "device_id": "newID", }, }, }, @@ -459,13 +469,7 @@ type phase struct { }, }, { - // The tailscale up call blocks until auth is complete, so - // by the time it returns the next converged state is - // Running. - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -482,10 +486,7 @@ type phase struct { }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -502,10 +503,7 @@ type phase struct { "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --widget=rotated", }, }, { - Status: ipnstate.Status{ - BackendState: "Running", - TailscaleIPs: tsIPs, - }, + Notify: runningNotify, }, }, }, @@ -552,7 +550,7 @@ type phase struct { var wantCmds []string for _, p := range test.Phases { - lapi.SetStatus(p.Status) + lapi.Notify(p.Notify) wantCmds = append(wantCmds, p.WantCmds...) waitArgs(t, 2*time.Second, d, argFile, strings.Join(wantCmds, "\n")) err := tstest.WaitFor(2*time.Second, func() error { @@ -688,7 +686,8 @@ type localAPI struct { srv *http.Server sync.Mutex - status ipnstate.Status + cond *sync.Cond + notify *ipn.Notify } func (l *localAPI) Start() error { @@ -706,6 +705,7 @@ func (l *localAPI) Start() error { Handler: l, } l.Path = path + l.cond = sync.NewCond(&l.Mutex) go l.srv.Serve(ln) return nil } @@ -715,29 +715,49 @@ func (l *localAPI) Close() { } func (l *localAPI) Reset() { - l.SetStatus(ipnstate.Status{ - BackendState: "NoState", - }) -} - -func (l *localAPI) SetStatus(st ipnstate.Status) { l.Lock() defer l.Unlock() - l.status = st + l.notify = nil + l.cond.Broadcast() +} + +func (l *localAPI) Notify(n *ipn.Notify) { + if n == nil { + return + } + l.Lock() + defer l.Unlock() + l.notify = n + l.cond.Broadcast() } func (l *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { panic(fmt.Sprintf("unsupported method %q", r.Method)) } - if r.URL.Path != "/localapi/v0/status" { - panic(fmt.Sprintf("unsupported localAPI path %q", r.URL.Path)) + if r.URL.Path != "/localapi/v0/watch-ipn-bus" { + panic(fmt.Sprintf("unsupported path %q", r.URL.Path)) } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + enc := json.NewEncoder(w) l.Lock() defer l.Unlock() - if err := json.NewEncoder(w).Encode(l.status); err != nil { - panic("json encode failed") + for { + if l.notify != nil { + if err := enc.Encode(l.notify); err != nil { + // Usually broken pipe as the test client disconnects. + return + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + } + l.cond.Wait() } }