cmd/containerboot: switch to IPN bus monitoring instead of polling.

We still have to shell out to `tailscale up` because the container image's
API includes "arbitrary flags to tailscale up", unfortunately. But this
should still speed up startup a little, and also enables k8s-bound containers
to update their device information as new netmap updates come in.

Fixes #6657

Signed-off-by: David Anderson <danderson@tailscale.com>
This commit is contained in:
David Anderson 2022-12-07 12:29:45 -08:00 committed by Dave Anderson
parent 55e0512a05
commit 1b65630e83
3 changed files with 268 additions and 188 deletions

View File

@ -22,6 +22,7 @@
"strings" "strings"
"time" "time"
"tailscale.com/tailcfg"
"tailscale.com/util/multierr" "tailscale.com/util/multierr"
) )
@ -140,9 +141,9 @@ func findKeyInKubeSecret(ctx context.Context, secretName string) (string, error)
return "", nil return "", nil
} }
// storeDeviceID writes deviceID into the "device_id" data field of // storeDeviceInfo writes deviceID into the "device_id" data field of the kube
// the kube secret secretName. // secret secretName.
func storeDeviceID(ctx context.Context, secretName, deviceID string) error { func storeDeviceInfo(ctx context.Context, secretName string, deviceID tailcfg.StableNodeID, fqdn string) error {
// First check if the secret exists at all. Even if running on // First check if the secret exists at all. Even if running on
// kubernetes, we do not necessarily store state in a k8s secret. // kubernetes, we do not necessarily store state in a k8s secret.
req, err := http.NewRequest("GET", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s", kubeNamespace, secretName), nil) req, err := http.NewRequest("GET", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s", kubeNamespace, secretName), nil)
@ -161,7 +162,8 @@ func storeDeviceID(ctx context.Context, secretName, deviceID string) error {
m := map[string]map[string]string{ m := map[string]map[string]string{
"stringData": { "stringData": {
"device_id": deviceID, "device_id": string(deviceID),
"device_fqdn": fqdn,
}, },
} }
var b bytes.Buffer var b bytes.Buffer

View File

@ -60,7 +60,8 @@
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn"
"tailscale.com/util/deephash"
) )
func main() { func main() {
@ -152,46 +153,149 @@ func main() {
log.Fatalf("failed to bring up tailscale: %v", err) log.Fatalf("failed to bring up tailscale: %v", err)
} }
st, err := authTailscaled(ctx, client, cfg) w, err := client.WatchIPNBus(ctx, ipn.NotifyInitialNetMap|ipn.NotifyInitialPrefs|ipn.NotifyInitialState)
if err != nil { if err != nil {
log.Fatalf("failed to auth tailscale: %v", err) log.Fatalf("failed to watch tailscaled for updates: %v", err)
} }
if cfg.ProxyTo != "" { // Because we're still shelling out to `tailscale up` to get access to its
if err := installIPTablesRule(ctx, cfg.ProxyTo, st.TailscaleIPs); err != nil { // flag parser, we have to stop watching the IPN bus so that we can block on
log.Fatalf("installing proxy rules: %v", err) // the subcommand without stalling anything. Then once it's done, we resume
// watching the bus.
//
// Depending on the requested mode of operation, this auth step happens at
// different points in containerboot's lifecycle, hence the helper function.
didLogin := false
authTailscale := func() error {
if didLogin {
return nil
}
didLogin = true
w.Close()
if err := tailscaleUp(ctx, cfg); err != nil {
return fmt.Errorf("failed to auth tailscale: %v", err)
}
w, err = client.WatchIPNBus(ctx, ipn.NotifyInitialNetMap|ipn.NotifyInitialState)
if err != nil {
return fmt.Errorf("rewatching tailscaled for updates after auth: %v", err)
}
return nil
}
if !cfg.AuthOnce {
if err := authTailscale(); err != nil {
log.Fatalf("failed to auth tailscale: %v", err)
} }
} }
if cfg.InKubernetes && cfg.KubernetesCanPatch && cfg.KubeSecret != "" {
if err := storeDeviceID(ctx, cfg.KubeSecret, string(st.Self.ID)); err != nil { authLoop:
log.Fatalf("storing device ID in kube secret: %v", err) for {
n, err := w.Next()
if err != nil {
log.Fatalf("failed to read from tailscaled: %v", err)
} }
if cfg.AuthOnce {
// We were told to only auth once, so any secret-bound if n.State != nil {
// authkey is no longer needed. We don't strictly need to switch *n.State {
// wipe it, but it's good hygiene. case ipn.NeedsLogin:
log.Printf("Deleting authkey from kube secret") if err := authTailscale(); err != nil {
if err := deleteAuthKey(ctx, cfg.KubeSecret); err != nil { log.Fatalf("failed to auth tailscale: %v", err)
log.Fatalf("deleting authkey from kube secret: %v", err) }
case ipn.NeedsMachineAuth:
log.Printf("machine authorization required, please visit the admin panel")
case ipn.Running:
// Technically, all we want is to keep monitoring the bus for
// netmap updates. However, in order to make the container crash
// if tailscale doesn't initially come up, the watch has a
// startup deadline on it. So, we have to break out of this
// watch loop, cancel the watch, and watch again with no
// deadline to continue monitoring for changes.
break authLoop
default:
log.Printf("tailscaled in state %q, waiting", *n.State)
} }
} }
} }
log.Println("Startup complete, waiting for shutdown signal") w.Close()
// Reap all processes, since we are PID1 and need to collect
// zombies. if cfg.InKubernetes && cfg.KubeSecret != "" && cfg.KubernetesCanPatch && cfg.AuthOnce {
// We were told to only auth once, so any secret-bound
// authkey is no longer needed. We don't strictly need to
// wipe it, but it's good hygiene.
log.Printf("Deleting authkey from kube secret")
if err := deleteAuthKey(ctx, cfg.KubeSecret); err != nil {
log.Fatalf("deleting authkey from kube secret: %v", err)
}
}
w, err = client.WatchIPNBus(context.Background(), ipn.NotifyInitialNetMap|ipn.NotifyInitialState)
if err != nil {
log.Fatalf("rewatching tailscaled for updates after auth: %v", err)
}
var (
wantProxy = cfg.ProxyTo != ""
wantDeviceInfo = cfg.InKubernetes && cfg.KubeSecret != "" && cfg.KubernetesCanPatch
startupTasksDone = false
currentIPs deephash.Sum // tailscale IPs assigned to device
currentDeviceInfo deephash.Sum // device ID and fqdn
)
for { for {
var status unix.WaitStatus n, err := w.Next()
pid, err := unix.Wait4(-1, &status, 0, nil)
if errors.Is(err, unix.EINTR) {
continue
}
if err != nil { if err != nil {
log.Fatalf("Waiting for exited processes: %v", err) log.Fatalf("failed to read from tailscaled: %v", err)
} }
if pid == daemonPid {
log.Printf("Tailscaled exited") if n.State != nil && *n.State != ipn.Running {
os.Exit(0) // Something's gone wrong and we've left the authenticated state.
// Our container image never recovered gracefully from this, and the
// control flow required to make it work now is hard. So, just crash
// the container and rely on the container runtime to restart us,
// whereupon we'll go through initial auth again.
log.Fatalf("tailscaled left running state (now in state %q), exiting", *n.State)
}
if n.NetMap != nil {
if cfg.ProxyTo != "" && len(n.NetMap.Addresses) > 0 && deephash.Update(&currentIPs, &n.NetMap.Addresses) {
if err := installIPTablesRule(ctx, cfg.ProxyTo, n.NetMap.Addresses); err != nil {
log.Fatalf("installing proxy rules: %v", err)
}
}
deviceInfo := []any{n.NetMap.SelfNode.StableID, n.NetMap.SelfNode.Name}
if cfg.InKubernetes && cfg.KubernetesCanPatch && cfg.KubeSecret != "" && deephash.Update(&currentDeviceInfo, &deviceInfo) {
if err := storeDeviceInfo(ctx, cfg.KubeSecret, n.NetMap.SelfNode.StableID, n.NetMap.SelfNode.Name); err != nil {
log.Fatalf("storing device ID in kube secret: %v", err)
}
}
}
if !startupTasksDone {
if (!wantProxy || currentIPs != deephash.Sum{}) && (!wantDeviceInfo || currentDeviceInfo != deephash.Sum{}) {
// This log message is used in tests to detect when all
// post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true
// Reap all processes, since we are PID1 and need to collect zombies. We can
// only start doing this once we've stopped shelling out to things
// `tailscale up`, otherwise this goroutine can reap the CLI subprocesses
// and wedge bringup.
go func() {
for {
var status unix.WaitStatus
pid, err := unix.Wait4(-1, &status, 0, nil)
if errors.Is(err, unix.EINTR) {
continue
}
if err != nil {
log.Fatalf("Waiting for exited processes: %v", err)
}
if pid == daemonPid {
log.Printf("Tailscaled exited")
os.Exit(0)
}
}
}()
}
} }
} }
} }
@ -242,58 +346,6 @@ func startTailscaled(ctx context.Context, cfg *settings) (*tailscale.LocalClient
return tsClient, cmd.Process.Pid, nil return tsClient, cmd.Process.Pid, nil
} }
// startAndAuthTailscaled starts the tailscale daemon and attempts to
// auth it, according to the settings in cfg. If successful, returns
// tailscaled's Status and pid.
func authTailscaled(ctx context.Context, client *tailscale.LocalClient, cfg *settings) (*ipnstate.Status, error) {
didLogin := false
if !cfg.AuthOnce {
if err := tailscaleUp(ctx, cfg); err != nil {
return nil, fmt.Errorf("couldn't log in: %v", err)
}
didLogin = true
}
// Poll for daemon state until it goes to either Running or
// NeedsLogin. The latter only happens if cfg.AuthOnce is true,
// because in that case we only try to auth when it's necessary to
// reach the running state.
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
loopCtx, cancel := context.WithTimeout(ctx, time.Second)
st, err := client.Status(loopCtx)
cancel()
if err != nil {
return nil, fmt.Errorf("Getting tailscaled state: %w", err)
}
switch st.BackendState {
case "Running":
if len(st.TailscaleIPs) > 0 {
return st, nil
}
log.Printf("No Tailscale IPs assigned yet")
case "NeedsLogin":
if !didLogin {
// Alas, we cannot currently trigger an authkey login from
// LocalAPI, so we still have to shell out to the
// tailscale CLI for this bit.
if err := tailscaleUp(ctx, cfg); err != nil {
return nil, fmt.Errorf("couldn't log in: %v", err)
}
didLogin = true
}
default:
log.Printf("tailscaled in state %q, waiting", st.BackendState)
}
time.Sleep(100 * time.Millisecond)
}
}
// tailscaledArgs uses cfg to construct the argv for tailscaled. // tailscaledArgs uses cfg to construct the argv for tailscaled.
func tailscaledArgs(cfg *settings) []string { func tailscaledArgs(cfg *settings) []string {
args := []string{"--socket=" + cfg.Socket} args := []string{"--socket=" + cfg.Socket}
@ -428,7 +480,7 @@ func ensureIPForwarding(root, proxyTo, routes string) error {
return nil return nil
} }
func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Addr) error { func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Prefix) error {
dst, err := netip.ParseAddr(dstStr) dst, err := netip.ParseAddr(dstStr)
if err != nil { if err != nil {
return err return err
@ -438,16 +490,22 @@ func installIPTablesRule(ctx context.Context, dstStr string, tsIPs []netip.Addr)
argv0 = "ip6tables" argv0 = "ip6tables"
} }
var local string var local string
for _, ip := range tsIPs { for _, pfx := range tsIPs {
if ip.Is4() != dst.Is4() { if !pfx.IsSingleIP() {
continue continue
} }
local = ip.String() if pfx.Addr().Is4() != dst.Is4() {
continue
}
local = pfx.Addr().String()
break break
} }
if local == "" { if local == "" {
return fmt.Errorf("no tailscale IP matching family of %s found in %v", dstStr, tsIPs) return fmt.Errorf("no tailscale IP matching family of %s found in %v", dstStr, tsIPs)
} }
// Technically, if the control server ever changes the IPs assigned to this
// node, we'll slowly accumulate iptables rules. This shouldn't happen, so
// for now we'll live with it.
cmd := exec.CommandContext(ctx, argv0, "-t", "nat", "-I", "PREROUTING", "1", "-d", local, "-j", "DNAT", "--to-destination", dstStr) cmd := exec.CommandContext(ctx, argv0, "-t", "nat", "-I", "PREROUTING", "1", "-d", local, "-j", "DNAT", "--to-destination", dstStr)
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr

View File

@ -31,9 +31,11 @@
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/types/netmap"
"tailscale.com/types/ptr"
) )
func TestContainerBoot(t *testing.T) { func TestContainerBoot(t *testing.T) {
@ -91,18 +93,13 @@ func TestContainerBoot(t *testing.T) {
} }
argFile := filepath.Join(d, "args") argFile := filepath.Join(d, "args")
tsIPs := []netip.Addr{netip.MustParseAddr("100.64.0.1")}
runningSockPath := filepath.Join(d, "tmp/tailscaled.sock") runningSockPath := filepath.Join(d, "tmp/tailscaled.sock")
// TODO: refactor this 1-2 stuff if we ever need a third
// step. Right now all of containerboot's modes either converge
// with no further interaction needed, or with one extra step
// only.
type phase struct { type phase struct {
// Make LocalAPI report this status, then wait for the Wants below to be // If non-nil, send this IPN bus notification (and remember it as the
// satisfied. A zero Status is a valid state for a just-started // initial update for any future new watchers, then wait for all the
// tailscaled. // Waits below to be true before proceeding to the next phase.
Status ipnstate.Status Notify *ipn.Notify
// WantCmds is the commands that containerboot should run in this phase. // WantCmds is the commands that containerboot should run in this phase.
WantCmds []string WantCmds []string
@ -113,6 +110,16 @@ type phase struct {
// contents. // contents.
WantFiles map[string]string WantFiles map[string]string
} }
runningNotify := &ipn.Notify{
State: ptr.To(ipn.Running),
NetMap: &netmap.NetworkMap{
SelfNode: &tailcfg.Node{
StableID: tailcfg.StableNodeID("myID"),
Name: "test-node.test.ts.net",
},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
},
}
tests := []struct { tests := []struct {
Name string Name string
Env map[string]string Env map[string]string
@ -132,10 +139,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -153,10 +157,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -174,10 +175,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -195,10 +193,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
WantFiles: map[string]string{ WantFiles: map[string]string{
"proc/sys/net/ipv4/ip_forward": "0", "proc/sys/net/ipv4/ip_forward": "0",
"proc/sys/net/ipv6/conf/all/forwarding": "0", "proc/sys/net/ipv6/conf/all/forwarding": "0",
@ -221,10 +216,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
WantFiles: map[string]string{ WantFiles: map[string]string{
"proc/sys/net/ipv4/ip_forward": "1", "proc/sys/net/ipv4/ip_forward": "1",
"proc/sys/net/ipv6/conf/all/forwarding": "0", "proc/sys/net/ipv6/conf/all/forwarding": "0",
@ -247,10 +239,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
WantFiles: map[string]string{ WantFiles: map[string]string{
"proc/sys/net/ipv4/ip_forward": "0", "proc/sys/net/ipv4/ip_forward": "0",
"proc/sys/net/ipv6/conf/all/forwarding": "1", "proc/sys/net/ipv6/conf/all/forwarding": "1",
@ -273,10 +262,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
WantFiles: map[string]string{ WantFiles: map[string]string{
"proc/sys/net/ipv4/ip_forward": "1", "proc/sys/net/ipv4/ip_forward": "1",
"proc/sys/net/ipv6/conf/all/forwarding": "1", "proc/sys/net/ipv6/conf/all/forwarding": "1",
@ -299,10 +285,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
WantCmds: []string{ WantCmds: []string{
"/usr/bin/iptables -t nat -I PREROUTING 1 -d 100.64.0.1 -j DNAT --to-destination 1.2.3.4", "/usr/bin/iptables -t nat -I PREROUTING 1 -d 100.64.0.1 -j DNAT --to-destination 1.2.3.4",
}, },
@ -322,18 +305,15 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: &ipn.Notify{
BackendState: "NeedsLogin", State: ptr.To(ipn.NeedsLogin),
}, },
WantCmds: []string{ WantCmds: []string{
"/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key", "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key",
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -357,16 +337,11 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
Self: &ipnstate.PeerStatus{
ID: tailcfg.StableNodeID("myID"),
},
},
WantKubeSecret: map[string]string{ WantKubeSecret: map[string]string{
"authkey": "tskey-key", "authkey": "tskey-key",
"device_id": "myID", "device_fqdn": "test-node.test.ts.net",
"device_id": "myID",
}, },
}, },
}, },
@ -389,13 +364,7 @@ type phase struct {
WantKubeSecret: map[string]string{}, WantKubeSecret: map[string]string{},
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
Self: &ipnstate.PeerStatus{
ID: tailcfg.StableNodeID("myID"),
},
},
WantKubeSecret: map[string]string{}, WantKubeSecret: map[string]string{},
}, },
}, },
@ -421,8 +390,8 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: &ipn.Notify{
BackendState: "NeedsLogin", State: ptr.To(ipn.NeedsLogin),
}, },
WantCmds: []string{ WantCmds: []string{
"/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key", "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key",
@ -432,15 +401,56 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running", WantKubeSecret: map[string]string{
TailscaleIPs: tsIPs, "device_fqdn": "test-node.test.ts.net",
Self: &ipnstate.PeerStatus{ "device_id": "myID",
ID: tailcfg.StableNodeID("myID"), },
},
},
},
{
Name: "kube_storage_updates",
Env: map[string]string{
"KUBERNETES_SERVICE_HOST": kube.Host,
"KUBERNETES_SERVICE_PORT_HTTPS": kube.Port,
},
KubeSecret: map[string]string{
"authkey": "tskey-key",
},
Phases: []phase{
{
WantCmds: []string{
"/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking",
"/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key",
},
WantKubeSecret: map[string]string{
"authkey": "tskey-key",
},
},
{
Notify: runningNotify,
WantKubeSecret: map[string]string{
"authkey": "tskey-key",
"device_fqdn": "test-node.test.ts.net",
"device_id": "myID",
},
},
{
Notify: &ipn.Notify{
State: ptr.To(ipn.Running),
NetMap: &netmap.NetworkMap{
SelfNode: &tailcfg.Node{
StableID: tailcfg.StableNodeID("newID"),
Name: "new-name.test.ts.net",
},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}, },
}, },
WantKubeSecret: map[string]string{ WantKubeSecret: map[string]string{
"device_id": "myID", "authkey": "tskey-key",
"device_fqdn": "new-name.test.ts.net",
"device_id": "newID",
}, },
}, },
}, },
@ -459,13 +469,7 @@ type phase struct {
}, },
}, },
{ {
// The tailscale up call blocks until auth is complete, so Notify: runningNotify,
// by the time it returns the next converged state is
// Running.
Status: ipnstate.Status{
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -482,10 +486,7 @@ type phase struct {
}, },
}, },
{ {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -502,10 +503,7 @@ type phase struct {
"/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --widget=rotated", "/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --widget=rotated",
}, },
}, { }, {
Status: ipnstate.Status{ Notify: runningNotify,
BackendState: "Running",
TailscaleIPs: tsIPs,
},
}, },
}, },
}, },
@ -552,7 +550,7 @@ type phase struct {
var wantCmds []string var wantCmds []string
for _, p := range test.Phases { for _, p := range test.Phases {
lapi.SetStatus(p.Status) lapi.Notify(p.Notify)
wantCmds = append(wantCmds, p.WantCmds...) wantCmds = append(wantCmds, p.WantCmds...)
waitArgs(t, 2*time.Second, d, argFile, strings.Join(wantCmds, "\n")) waitArgs(t, 2*time.Second, d, argFile, strings.Join(wantCmds, "\n"))
err := tstest.WaitFor(2*time.Second, func() error { err := tstest.WaitFor(2*time.Second, func() error {
@ -688,7 +686,8 @@ type localAPI struct {
srv *http.Server srv *http.Server
sync.Mutex sync.Mutex
status ipnstate.Status cond *sync.Cond
notify *ipn.Notify
} }
func (l *localAPI) Start() error { func (l *localAPI) Start() error {
@ -706,6 +705,7 @@ func (l *localAPI) Start() error {
Handler: l, Handler: l,
} }
l.Path = path l.Path = path
l.cond = sync.NewCond(&l.Mutex)
go l.srv.Serve(ln) go l.srv.Serve(ln)
return nil return nil
} }
@ -715,29 +715,49 @@ func (l *localAPI) Close() {
} }
func (l *localAPI) Reset() { func (l *localAPI) Reset() {
l.SetStatus(ipnstate.Status{
BackendState: "NoState",
})
}
func (l *localAPI) SetStatus(st ipnstate.Status) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
l.status = st l.notify = nil
l.cond.Broadcast()
}
func (l *localAPI) Notify(n *ipn.Notify) {
if n == nil {
return
}
l.Lock()
defer l.Unlock()
l.notify = n
l.cond.Broadcast()
} }
func (l *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (l *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" { if r.Method != "GET" {
panic(fmt.Sprintf("unsupported method %q", r.Method)) panic(fmt.Sprintf("unsupported method %q", r.Method))
} }
if r.URL.Path != "/localapi/v0/status" { if r.URL.Path != "/localapi/v0/watch-ipn-bus" {
panic(fmt.Sprintf("unsupported localAPI path %q", r.URL.Path)) panic(fmt.Sprintf("unsupported path %q", r.URL.Path))
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
enc := json.NewEncoder(w)
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
if err := json.NewEncoder(w).Encode(l.status); err != nil { for {
panic("json encode failed") if l.notify != nil {
if err := enc.Encode(l.notify); err != nil {
// Usually broken pipe as the test client disconnects.
return
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
l.cond.Wait()
} }
} }