diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index 7bca6c8d8..bbc129c5e 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -23,6 +23,7 @@ import ( "tailscale.com/types/persist" "tailscale.com/types/structs" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/execqueue" ) @@ -122,7 +123,9 @@ type Auto struct { observerQueue execqueue.ExecQueue shutdownFn func() // to be called prior to shutdown or nil - unregisterHealthWatch func() + eventClient *eventbus.Client + healthChangeSub *eventbus.Subscriber[health.Change] + subsDoneCh chan struct{} // close-only channel when eventClient has closed mu sync.Mutex // mutex guards the following fields @@ -192,21 +195,42 @@ func NewNoStart(opts Options) (_ *Auto, err error) { updateDone: make(chan struct{}), observer: opts.Observer, shutdownFn: opts.Shutdown, + subsDoneCh: make(chan struct{}), } + + c.eventClient = opts.Bus.Client("controlClient.Auto") + c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient) + c.authCtx, c.authCancel = context.WithCancel(context.Background()) c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf) c.mapCtx, c.mapCancel = context.WithCancel(context.Background()) c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf) - c.unregisterHealthWatch = opts.HealthTracker.RegisterWatcher(func(c health.Change) { - if c.WarnableChanged { - direct.ReportWarnableChange(c.Warnable, c.UnhealthyState) - } - }) + go c.consumeEventbusTopics() return c, nil } +// consumeEventbusTopics consumes events from all relevant +// [eventbus.Subscriber]'s and passes them to their related handler. Events are +// always handled in the order they are received, i.e. the next event is not +// read until the previous event's handler has returned. It returns when the +// [eventbus.Client] is closed. +func (c *Auto) consumeEventbusTopics() { + defer close(c.subsDoneCh) + + for { + select { + case <-c.eventClient.Done(): + return + case change := <-c.healthChangeSub.Events(): + if change.WarnableChanged { + c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState) + } + } + } +} + // SetPaused controls whether HTTP activity should be paused. // // The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once. @@ -760,6 +784,9 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) { } func (c *Auto) Shutdown() { + c.eventClient.Close() + <-c.subsDoneCh + c.mu.Lock() if c.closed { c.mu.Unlock() @@ -783,7 +810,6 @@ func (c *Auto) Shutdown() { shutdownFn() } - c.unregisterHealthWatch() <-c.authDone <-c.mapDone <-c.updateDone diff --git a/health/health.go b/health/health.go index c456b53cb..3d1c46a3d 100644 --- a/health/health.go +++ b/health/health.go @@ -28,7 +28,6 @@ import ( "tailscale.com/util/eventbus" "tailscale.com/util/mak" "tailscale.com/util/multierr" - "tailscale.com/util/set" "tailscale.com/util/usermetric" "tailscale.com/version" ) @@ -65,6 +64,21 @@ var receiveNames = []string{ // Tracker tracks the health of various Tailscale subsystems, // comparing each subsystems' state with each other to make sure // they're consistent based on the user's intended state. +// +// If a client [Warnable] becomes unhealthy or its unhealthy state is updated, +// an event will be emitted with WarnableChanged set to true and the Warnable +// and its UnhealthyState: +// +// Change{WarnableChanged: true, Warnable: w, UnhealthyState: us} +// +// If a Warnable becomes healthy, an event will be emitted with +// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil: +// +// Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil} +// +// If the health messages from the control-plane change, an event will be +// emitted with ControlHealthChanged set to true. Recipients can fetch the set of +// control-plane health messages by calling [Tracker.CurrentState]: type Tracker struct { // MagicSockReceiveFuncs tracks the state of the three // magicsock receive functions: IPv4, IPv6, and DERP. @@ -91,9 +105,8 @@ type Tracker struct { // sysErr maps subsystems to their current error (or nil if the subsystem is healthy) // Deprecated: using Warnables should be preferred - sysErr map[Subsystem]error - watchers set.HandleSet[func(Change)] // opt func to run if error state changes - timer tstime.TimerController + sysErr map[Subsystem]error + timer tstime.TimerController latestVersion *tailcfg.ClientVersion // or nil checkForUpdates bool @@ -131,10 +144,12 @@ func NewTracker(bus *eventbus.Bus) *Tracker { } cli := bus.Client("health.Tracker") - return &Tracker{ + t := &Tracker{ eventClient: cli, changePub: eventbus.Publish[Change](cli), } + t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck) + return t } func (t *Tracker) now() time.Time { @@ -455,33 +470,6 @@ func (t *Tracker) setUnhealthyLocked(w *Warnable, args Args) { }) mak.Set(&t.pendingVisibleTimers, w, tc) } - - // Direct callbacks - // TODO(cmol): Remove once all watchers have been moved to events - for _, cb := range t.watchers { - // If the Warnable has been unhealthy for more than its TimeToVisible, the callback should be - // executed immediately. Otherwise, the callback should be enqueued to run once the Warnable - // becomes visible. - if w.IsVisible(ws, t.now) { - cb(change) - continue - } - - // The time remaining until the Warnable will be visible to the user is the TimeToVisible - // minus the time that has already passed since the Warnable became unhealthy. - visibleIn := w.TimeToVisible - t.now().Sub(brokenSince) - var tc tstime.TimerController = t.clock().AfterFunc(visibleIn, func() { - t.mu.Lock() - defer t.mu.Unlock() - // Check if the Warnable is still unhealthy, as it could have become healthy between the time - // the timer was set for and the time it was executed. - if t.warnableVal[w] != nil { - cb(change) - delete(t.pendingVisibleTimers, w) - } - }) - mak.Set(&t.pendingVisibleTimers, w, tc) - } } } @@ -514,10 +502,6 @@ func (t *Tracker) setHealthyLocked(w *Warnable) { Warnable: w, } t.changePub.Publish(change) - for _, cb := range t.watchers { - // TODO(cmol): Remove once all watchers have been moved to events - cb(change) - } } // notifyWatchersControlChangedLocked calls each watcher to signal that control @@ -526,13 +510,7 @@ func (t *Tracker) notifyWatchersControlChangedLocked() { change := Change{ ControlHealthChanged: true, } - if t.changePub != nil { - t.changePub.Publish(change) - } - for _, cb := range t.watchers { - // TODO(cmol): Remove once all watchers have been moved to events - cb(change) - } + t.changePub.Publish(change) } // AppendWarnableDebugFlags appends to base any health items that are currently in failed @@ -577,62 +555,6 @@ type Change struct { UnhealthyState *UnhealthyState } -// RegisterWatcher adds a function that will be called its own goroutine -// whenever the health state of any client [Warnable] or control-plane health -// messages changes. The returned function can be used to unregister the -// callback. -// -// If a client [Warnable] becomes unhealthy or its unhealthy state is updated, -// the callback will be called with WarnableChanged set to true and the Warnable -// and its UnhealthyState: -// -// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: us}) -// -// If a Warnable becomes healthy, the callback will be called with -// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil: -// -// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil}) -// -// If the health messages from the control-plane change, the callback will be -// called with ControlHealthChanged set to true. Recipients can fetch the set of -// control-plane health messages by calling [Tracker.CurrentState]: -// -// go cb(Change{ControlHealthChanged: true}) -func (t *Tracker) RegisterWatcher(cb func(Change)) (unregister func()) { - return t.registerSyncWatcher(func(c Change) { - go cb(c) - }) -} - -// registerSyncWatcher adds a function that will be called whenever the health -// state changes. The provided callback function will be executed synchronously. -// Call RegisterWatcher to register any callbacks that won't return from -// execution immediately. -func (t *Tracker) registerSyncWatcher(cb func(c Change)) (unregister func()) { - if t.nil() { - return func() {} - } - t.initOnce.Do(t.doOnceInit) - t.mu.Lock() - defer t.mu.Unlock() - if t.watchers == nil { - t.watchers = set.HandleSet[func(Change)]{} - } - handle := t.watchers.Add(cb) - if t.timer == nil { - t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck) - } - return func() { - t.mu.Lock() - defer t.mu.Unlock() - delete(t.watchers, handle) - if len(t.watchers) == 0 && t.timer != nil { - t.timer.Stop() - t.timer = nil - } - } -} - // SetRouterHealth sets the state of the wgengine/router.Router. // // Deprecated: Warnables should be preferred over Subsystem errors. diff --git a/health/health_test.go b/health/health_test.go index c55b0e1f3..3ada37755 100644 --- a/health/health_test.go +++ b/health/health_test.go @@ -4,6 +4,7 @@ package health import ( + "errors" "fmt" "maps" "reflect" @@ -158,15 +159,6 @@ func TestWatcher(t *testing.T) { name string preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) }{ - { - name: "with-callbacks", - preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) { - t.Cleanup(tht.RegisterWatcher(fn)) - if len(tht.watchers) != 1 { - t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers)) - } - }, - }, { name: "with-eventbus", preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { @@ -254,15 +246,6 @@ func TestSetUnhealthyWithTimeToVisible(t *testing.T) { name string preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) }{ - { - name: "with-callbacks", - preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) { - t.Cleanup(tht.RegisterWatcher(fn)) - if len(tht.watchers) != 1 { - t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers)) - } - }, - }, { name: "with-eventbus", preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { @@ -668,7 +651,7 @@ func TestControlHealthNotifies(t *testing.T) { name string initialState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage newState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage - wantNotify bool + wantEvents []any } tests := []test{ { @@ -679,7 +662,7 @@ func TestControlHealthNotifies(t *testing.T) { newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ "test": {}, }, - wantNotify: false, + wantEvents: []any{}, }, { name: "on-set", @@ -687,7 +670,9 @@ func TestControlHealthNotifies(t *testing.T) { newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ "test": {}, }, - wantNotify: true, + wantEvents: []any{ + eventbustest.Type[Change](), + }, }, { name: "details-change", @@ -701,7 +686,9 @@ func TestControlHealthNotifies(t *testing.T) { Title: "Updated title", }, }, - wantNotify: true, + wantEvents: []any{ + eventbustest.Type[Change](), + }, }, { name: "action-changes", @@ -721,42 +708,54 @@ func TestControlHealthNotifies(t *testing.T) { }, }, }, - wantNotify: true, + wantEvents: []any{ + eventbustest.Type[Change](), + }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ht := NewTracker(eventbustest.NewBus(t)) + bus := eventbustest.NewBus(t) + tw := eventbustest.NewWatcher(t, bus) + tw.TimeOut = time.Second + + ht := NewTracker(bus) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() - if len(test.initialState) != 0 { - ht.SetControlHealth(test.initialState) + // Expect events at starup, before doing anything else + if err := eventbustest.ExpectExactly(tw, + eventbustest.Type[Change](), // warming-up + eventbustest.Type[Change](), // is-using-unstable-version + eventbustest.Type[Change](), // not-in-map-poll + ); err != nil { + t.Errorf("startup error: %v", err) } - gotNotified := false - ht.registerSyncWatcher(func(_ Change) { - gotNotified = true - }) + // Only set initial state if we need to + if len(test.initialState) != 0 { + ht.SetControlHealth(test.initialState) + if err := eventbustest.ExpectExactly(tw, eventbustest.Type[Change]()); err != nil { + t.Errorf("initial state error: %v", err) + } + } ht.SetControlHealth(test.newState) - if gotNotified != test.wantNotify { - t.Errorf("notified: got %v, want %v", gotNotified, test.wantNotify) + if err := eventbustest.ExpectExactly(tw, test.wantEvents...); err != nil { + t.Errorf("event error: %v", err) } }) } } func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) { - ht := NewTracker(eventbustest.NewBus(t)) + bus := eventbustest.NewBus(t) + tw := eventbustest.NewWatcher(t, bus) + tw.TimeOut = 100 * time.Millisecond + ht := NewTracker(bus) ht.SetIPNState("NeedsLogin", true) - gotNotified := false - ht.registerSyncWatcher(func(_ Change) { - gotNotified = true - }) - ht.SetControlHealth(map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ "control-health": {}, }) @@ -768,8 +767,19 @@ func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) { t.Error("got a warning with code 'control-health', want none") } - if gotNotified { - t.Error("watcher got called, want it to not be called") + // An event is emitted when SetIPNState is run above, + // so only fail on the second event. + eventCounter := 0 + expectOne := func(c *Change) error { + eventCounter++ + if eventCounter == 1 { + return nil + } + return errors.New("saw more than 1 event") + } + + if err := eventbustest.Expect(tw, expectOne); err == nil { + t.Error("event got emitted, want it to not be called") } } diff --git a/util/eventbus/eventbustest/eventbustest.go b/util/eventbus/eventbustest/eventbustest.go index 0916ae522..3f7bf4553 100644 --- a/util/eventbus/eventbustest/eventbustest.go +++ b/util/eventbus/eventbustest/eventbustest.go @@ -120,7 +120,12 @@ func Expect(tw *Watcher, filters ...any) error { // [Expect]. Use [Expect] if other events are allowed. func ExpectExactly(tw *Watcher, filters ...any) error { if len(filters) == 0 { - return errors.New("no event filters were provided") + select { + case event := <-tw.events: + return fmt.Errorf("saw event type %s, expected none", reflect.TypeOf(event)) + case <-time.After(tw.TimeOut): + return nil + } } eventCount := 0 for pos, next := range filters { diff --git a/util/eventbus/eventbustest/eventbustest_test.go b/util/eventbus/eventbustest/eventbustest_test.go index 7a6b511c7..2d126767d 100644 --- a/util/eventbus/eventbustest/eventbustest_test.go +++ b/util/eventbus/eventbustest/eventbustest_test.go @@ -250,7 +250,7 @@ func TestExpectEvents(t *testing.T) { tw := eventbustest.NewWatcher(t, bus) // TODO(cmol): When synctest is out of experimental, use that instead: // https://go.dev/blog/synctest - tw.TimeOut = 10 * time.Millisecond + tw.TimeOut = 100 * time.Millisecond client := bus.Client("testClient") defer client.Close()