From eec734a5787214abb98d5af3421094e56280e6ed Mon Sep 17 00:00:00 2001 From: salman Date: Tue, 17 Jan 2023 20:59:03 +0000 Subject: [PATCH] ipn/{ipnlocal,localapi}: ensure watcher is installed before /watch-ipn-bus/ responds with 200 This change delays the first flush in the /watch-ipn-bus/ handler until after the watcher has been successfully installed on the IPN bus. It does this by adding a new onWatchAdded callback to LocalBackend.WatchNotifications(). Without this, the endpoint returns a 200 almost immediatly, and only then installs a watcher for IPN events. This means there's a small window where events could be missed by clients after calling WatchIPNBus(). Fixes tailscale/corp#8594. Signed-off-by: salman --- ipn/ipnlocal/local.go | 14 +++++++++++--- ipn/ipnlocal/local_test.go | 36 ++++++++++++++++++++++++++++++++++++ ipn/localapi/localapi.go | 3 +-- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 371a911db..d9f513cac 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -1802,13 +1802,17 @@ func (b *LocalBackend) readPoller() { // // WatchNotifications blocks until ctx is done. // -// The provided fn will only be called with non-nil pointers. The caller must -// not modify roNotify. If fn returns false, the watch also stops. +// The provided onWatchAdded, if non-nil, will be called once the watcher +// is installed. +// +// The provided fn will be called for each notification. It will only be +// called with non-nil pointers. The caller must not modify roNotify. If +// fn returns false, the watch also stops. // // Failure to consume many notifications in a row will result in dropped // notifications. There is currently (2022-11-22) no mechanism provided to // detect when a message has been dropped. -func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, fn func(roNotify *ipn.Notify) (keepGoing bool)) { +func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, onWatchAdded func(), fn func(roNotify *ipn.Notify) (keepGoing bool)) { ch := make(chan *ipn.Notify, 128) origFn := fn @@ -1858,6 +1862,10 @@ func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWa b.mu.Unlock() }() + if onWatchAdded != nil { + onWatchAdded() + } + if ini != nil { if !fn(ini) { return diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 4c462f75d..c57666f93 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -5,6 +5,7 @@ package ipnlocal import ( + "context" "fmt" "net" "net/http" @@ -820,3 +821,38 @@ type legacyBackend interface { // Verify that LocalBackend still implements the legacyBackend interface // for now, at least until the macOS and iOS clients move off of it. var _ legacyBackend = (*LocalBackend)(nil) + +func TestWatchNotificationsCallbacks(t *testing.T) { + b := new(LocalBackend) + n := new(ipn.Notify) + b.WatchNotifications(context.Background(), 0, func() { + b.mu.Lock() + defer b.mu.Unlock() + + // Ensure a watcher has been installed. + if len(b.notifyWatchers) != 1 { + t.Fatalf("unexpected number of watchers in new LocalBackend, want: 1 got: %v", len(b.notifyWatchers)) + } + // Send a notification. Range over notifyWatchers to get the channel + // because WatchNotifications doesn't expose the handle for it. + for _, c := range b.notifyWatchers { + select { + case c <- n: + default: + t.Fatalf("could not send notification") + } + } + }, func(roNotify *ipn.Notify) bool { + if roNotify != n { + t.Fatalf("unexpected notification received. want: %v got: %v", n, roNotify) + } + return false + }) + + // Ensure watchers have been cleaned up. + b.mu.Lock() + defer b.mu.Unlock() + if len(b.notifyWatchers) != 0 { + t.Fatalf("unexpected number of watchers in new LocalBackend, want: 0 got: %v", len(b.notifyWatchers)) + } +} diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 9cefb1728..4af93e6ac 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -721,7 +721,6 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/json") - f.Flush() var mask ipn.NotifyWatchOpt if s := r.FormValue("mask"); s != "" { @@ -733,7 +732,7 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) { mask = ipn.NotifyWatchOpt(v) } ctx := r.Context() - h.b.WatchNotifications(ctx, mask, func(roNotify *ipn.Notify) (keepGoing bool) { + h.b.WatchNotifications(ctx, mask, f.Flush, func(roNotify *ipn.Notify) (keepGoing bool) { js, err := json.Marshal(roNotify) if err != nil { h.logf("json.Marshal: %v", err)