health: remove direct callback and replace with eventbus (#17199)

Pulls out the last callback logic and ensures timers are still running.

The eventbustest package is updated support the absence of events.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl
2025-09-19 14:58:37 -04:00
committed by GitHub
parent d559a21418
commit 009d702adf
5 changed files with 111 additions and 148 deletions

View File

@@ -23,6 +23,7 @@ import (
"tailscale.com/types/persist" "tailscale.com/types/persist"
"tailscale.com/types/structs" "tailscale.com/types/structs"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/execqueue" "tailscale.com/util/execqueue"
) )
@@ -122,7 +123,9 @@ type Auto struct {
observerQueue execqueue.ExecQueue observerQueue execqueue.ExecQueue
shutdownFn func() // to be called prior to shutdown or nil shutdownFn func() // to be called prior to shutdown or nil
unregisterHealthWatch func() eventClient *eventbus.Client
healthChangeSub *eventbus.Subscriber[health.Change]
subsDoneCh chan struct{} // close-only channel when eventClient has closed
mu sync.Mutex // mutex guards the following fields mu sync.Mutex // mutex guards the following fields
@@ -192,21 +195,42 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
updateDone: make(chan struct{}), updateDone: make(chan struct{}),
observer: opts.Observer, observer: opts.Observer,
shutdownFn: opts.Shutdown, shutdownFn: opts.Shutdown,
subsDoneCh: make(chan struct{}),
} }
c.eventClient = opts.Bus.Client("controlClient.Auto")
c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient)
c.authCtx, c.authCancel = context.WithCancel(context.Background()) c.authCtx, c.authCancel = context.WithCancel(context.Background())
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf) c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
c.mapCtx, c.mapCancel = context.WithCancel(context.Background()) c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf) c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf)
c.unregisterHealthWatch = opts.HealthTracker.RegisterWatcher(func(c health.Change) { go c.consumeEventbusTopics()
if c.WarnableChanged {
direct.ReportWarnableChange(c.Warnable, c.UnhealthyState)
}
})
return c, nil return c, nil
} }
// consumeEventbusTopics consumes events from all relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (c *Auto) consumeEventbusTopics() {
defer close(c.subsDoneCh)
for {
select {
case <-c.eventClient.Done():
return
case change := <-c.healthChangeSub.Events():
if change.WarnableChanged {
c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState)
}
}
}
}
// SetPaused controls whether HTTP activity should be paused. // SetPaused controls whether HTTP activity should be paused.
// //
// The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once. // The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once.
@@ -760,6 +784,9 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
} }
func (c *Auto) Shutdown() { func (c *Auto) Shutdown() {
c.eventClient.Close()
<-c.subsDoneCh
c.mu.Lock() c.mu.Lock()
if c.closed { if c.closed {
c.mu.Unlock() c.mu.Unlock()
@@ -783,7 +810,6 @@ func (c *Auto) Shutdown() {
shutdownFn() shutdownFn()
} }
c.unregisterHealthWatch()
<-c.authDone <-c.authDone
<-c.mapDone <-c.mapDone
<-c.updateDone <-c.updateDone

View File

@@ -28,7 +28,6 @@ import (
"tailscale.com/util/eventbus" "tailscale.com/util/eventbus"
"tailscale.com/util/mak" "tailscale.com/util/mak"
"tailscale.com/util/multierr" "tailscale.com/util/multierr"
"tailscale.com/util/set"
"tailscale.com/util/usermetric" "tailscale.com/util/usermetric"
"tailscale.com/version" "tailscale.com/version"
) )
@@ -65,6 +64,21 @@ var receiveNames = []string{
// Tracker tracks the health of various Tailscale subsystems, // Tracker tracks the health of various Tailscale subsystems,
// comparing each subsystems' state with each other to make sure // comparing each subsystems' state with each other to make sure
// they're consistent based on the user's intended state. // they're consistent based on the user's intended state.
//
// If a client [Warnable] becomes unhealthy or its unhealthy state is updated,
// an event will be emitted with WarnableChanged set to true and the Warnable
// and its UnhealthyState:
//
// Change{WarnableChanged: true, Warnable: w, UnhealthyState: us}
//
// If a Warnable becomes healthy, an event will be emitted with
// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil:
//
// Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil}
//
// If the health messages from the control-plane change, an event will be
// emitted with ControlHealthChanged set to true. Recipients can fetch the set of
// control-plane health messages by calling [Tracker.CurrentState]:
type Tracker struct { type Tracker struct {
// MagicSockReceiveFuncs tracks the state of the three // MagicSockReceiveFuncs tracks the state of the three
// magicsock receive functions: IPv4, IPv6, and DERP. // magicsock receive functions: IPv4, IPv6, and DERP.
@@ -91,9 +105,8 @@ type Tracker struct {
// sysErr maps subsystems to their current error (or nil if the subsystem is healthy) // sysErr maps subsystems to their current error (or nil if the subsystem is healthy)
// Deprecated: using Warnables should be preferred // Deprecated: using Warnables should be preferred
sysErr map[Subsystem]error sysErr map[Subsystem]error
watchers set.HandleSet[func(Change)] // opt func to run if error state changes timer tstime.TimerController
timer tstime.TimerController
latestVersion *tailcfg.ClientVersion // or nil latestVersion *tailcfg.ClientVersion // or nil
checkForUpdates bool checkForUpdates bool
@@ -131,10 +144,12 @@ func NewTracker(bus *eventbus.Bus) *Tracker {
} }
cli := bus.Client("health.Tracker") cli := bus.Client("health.Tracker")
return &Tracker{ t := &Tracker{
eventClient: cli, eventClient: cli,
changePub: eventbus.Publish[Change](cli), changePub: eventbus.Publish[Change](cli),
} }
t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck)
return t
} }
func (t *Tracker) now() time.Time { func (t *Tracker) now() time.Time {
@@ -455,33 +470,6 @@ func (t *Tracker) setUnhealthyLocked(w *Warnable, args Args) {
}) })
mak.Set(&t.pendingVisibleTimers, w, tc) mak.Set(&t.pendingVisibleTimers, w, tc)
} }
// Direct callbacks
// TODO(cmol): Remove once all watchers have been moved to events
for _, cb := range t.watchers {
// If the Warnable has been unhealthy for more than its TimeToVisible, the callback should be
// executed immediately. Otherwise, the callback should be enqueued to run once the Warnable
// becomes visible.
if w.IsVisible(ws, t.now) {
cb(change)
continue
}
// The time remaining until the Warnable will be visible to the user is the TimeToVisible
// minus the time that has already passed since the Warnable became unhealthy.
visibleIn := w.TimeToVisible - t.now().Sub(brokenSince)
var tc tstime.TimerController = t.clock().AfterFunc(visibleIn, func() {
t.mu.Lock()
defer t.mu.Unlock()
// Check if the Warnable is still unhealthy, as it could have become healthy between the time
// the timer was set for and the time it was executed.
if t.warnableVal[w] != nil {
cb(change)
delete(t.pendingVisibleTimers, w)
}
})
mak.Set(&t.pendingVisibleTimers, w, tc)
}
} }
} }
@@ -514,10 +502,6 @@ func (t *Tracker) setHealthyLocked(w *Warnable) {
Warnable: w, Warnable: w,
} }
t.changePub.Publish(change) t.changePub.Publish(change)
for _, cb := range t.watchers {
// TODO(cmol): Remove once all watchers have been moved to events
cb(change)
}
} }
// notifyWatchersControlChangedLocked calls each watcher to signal that control // notifyWatchersControlChangedLocked calls each watcher to signal that control
@@ -526,13 +510,7 @@ func (t *Tracker) notifyWatchersControlChangedLocked() {
change := Change{ change := Change{
ControlHealthChanged: true, ControlHealthChanged: true,
} }
if t.changePub != nil { t.changePub.Publish(change)
t.changePub.Publish(change)
}
for _, cb := range t.watchers {
// TODO(cmol): Remove once all watchers have been moved to events
cb(change)
}
} }
// AppendWarnableDebugFlags appends to base any health items that are currently in failed // AppendWarnableDebugFlags appends to base any health items that are currently in failed
@@ -577,62 +555,6 @@ type Change struct {
UnhealthyState *UnhealthyState UnhealthyState *UnhealthyState
} }
// RegisterWatcher adds a function that will be called its own goroutine
// whenever the health state of any client [Warnable] or control-plane health
// messages changes. The returned function can be used to unregister the
// callback.
//
// If a client [Warnable] becomes unhealthy or its unhealthy state is updated,
// the callback will be called with WarnableChanged set to true and the Warnable
// and its UnhealthyState:
//
// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: us})
//
// If a Warnable becomes healthy, the callback will be called with
// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil:
//
// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil})
//
// If the health messages from the control-plane change, the callback will be
// called with ControlHealthChanged set to true. Recipients can fetch the set of
// control-plane health messages by calling [Tracker.CurrentState]:
//
// go cb(Change{ControlHealthChanged: true})
func (t *Tracker) RegisterWatcher(cb func(Change)) (unregister func()) {
return t.registerSyncWatcher(func(c Change) {
go cb(c)
})
}
// registerSyncWatcher adds a function that will be called whenever the health
// state changes. The provided callback function will be executed synchronously.
// Call RegisterWatcher to register any callbacks that won't return from
// execution immediately.
func (t *Tracker) registerSyncWatcher(cb func(c Change)) (unregister func()) {
if t.nil() {
return func() {}
}
t.initOnce.Do(t.doOnceInit)
t.mu.Lock()
defer t.mu.Unlock()
if t.watchers == nil {
t.watchers = set.HandleSet[func(Change)]{}
}
handle := t.watchers.Add(cb)
if t.timer == nil {
t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck)
}
return func() {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.watchers, handle)
if len(t.watchers) == 0 && t.timer != nil {
t.timer.Stop()
t.timer = nil
}
}
}
// SetRouterHealth sets the state of the wgengine/router.Router. // SetRouterHealth sets the state of the wgengine/router.Router.
// //
// Deprecated: Warnables should be preferred over Subsystem errors. // Deprecated: Warnables should be preferred over Subsystem errors.

View File

@@ -4,6 +4,7 @@
package health package health
import ( import (
"errors"
"fmt" "fmt"
"maps" "maps"
"reflect" "reflect"
@@ -158,15 +159,6 @@ func TestWatcher(t *testing.T) {
name string name string
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
}{ }{
{
name: "with-callbacks",
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
t.Cleanup(tht.RegisterWatcher(fn))
if len(tht.watchers) != 1 {
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
}
},
},
{ {
name: "with-eventbus", name: "with-eventbus",
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
@@ -254,15 +246,6 @@ func TestSetUnhealthyWithTimeToVisible(t *testing.T) {
name string name string
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
}{ }{
{
name: "with-callbacks",
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
t.Cleanup(tht.RegisterWatcher(fn))
if len(tht.watchers) != 1 {
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
}
},
},
{ {
name: "with-eventbus", name: "with-eventbus",
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
@@ -668,7 +651,7 @@ func TestControlHealthNotifies(t *testing.T) {
name string name string
initialState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage initialState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage
newState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage newState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage
wantNotify bool wantEvents []any
} }
tests := []test{ tests := []test{
{ {
@@ -679,7 +662,7 @@ func TestControlHealthNotifies(t *testing.T) {
newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
"test": {}, "test": {},
}, },
wantNotify: false, wantEvents: []any{},
}, },
{ {
name: "on-set", name: "on-set",
@@ -687,7 +670,9 @@ func TestControlHealthNotifies(t *testing.T) {
newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
"test": {}, "test": {},
}, },
wantNotify: true, wantEvents: []any{
eventbustest.Type[Change](),
},
}, },
{ {
name: "details-change", name: "details-change",
@@ -701,7 +686,9 @@ func TestControlHealthNotifies(t *testing.T) {
Title: "Updated title", Title: "Updated title",
}, },
}, },
wantNotify: true, wantEvents: []any{
eventbustest.Type[Change](),
},
}, },
{ {
name: "action-changes", name: "action-changes",
@@ -721,42 +708,54 @@ func TestControlHealthNotifies(t *testing.T) {
}, },
}, },
}, },
wantNotify: true, wantEvents: []any{
eventbustest.Type[Change](),
},
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ht := NewTracker(eventbustest.NewBus(t)) bus := eventbustest.NewBus(t)
tw := eventbustest.NewWatcher(t, bus)
tw.TimeOut = time.Second
ht := NewTracker(bus)
ht.SetIPNState("NeedsLogin", true) ht.SetIPNState("NeedsLogin", true)
ht.GotStreamedMapResponse() ht.GotStreamedMapResponse()
if len(test.initialState) != 0 { // Expect events at starup, before doing anything else
ht.SetControlHealth(test.initialState) if err := eventbustest.ExpectExactly(tw,
eventbustest.Type[Change](), // warming-up
eventbustest.Type[Change](), // is-using-unstable-version
eventbustest.Type[Change](), // not-in-map-poll
); err != nil {
t.Errorf("startup error: %v", err)
} }
gotNotified := false // Only set initial state if we need to
ht.registerSyncWatcher(func(_ Change) { if len(test.initialState) != 0 {
gotNotified = true ht.SetControlHealth(test.initialState)
}) if err := eventbustest.ExpectExactly(tw, eventbustest.Type[Change]()); err != nil {
t.Errorf("initial state error: %v", err)
}
}
ht.SetControlHealth(test.newState) ht.SetControlHealth(test.newState)
if gotNotified != test.wantNotify { if err := eventbustest.ExpectExactly(tw, test.wantEvents...); err != nil {
t.Errorf("notified: got %v, want %v", gotNotified, test.wantNotify) t.Errorf("event error: %v", err)
} }
}) })
} }
} }
func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) { func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
ht := NewTracker(eventbustest.NewBus(t)) bus := eventbustest.NewBus(t)
tw := eventbustest.NewWatcher(t, bus)
tw.TimeOut = 100 * time.Millisecond
ht := NewTracker(bus)
ht.SetIPNState("NeedsLogin", true) ht.SetIPNState("NeedsLogin", true)
gotNotified := false
ht.registerSyncWatcher(func(_ Change) {
gotNotified = true
})
ht.SetControlHealth(map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{ ht.SetControlHealth(map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
"control-health": {}, "control-health": {},
}) })
@@ -768,8 +767,19 @@ func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
t.Error("got a warning with code 'control-health', want none") t.Error("got a warning with code 'control-health', want none")
} }
if gotNotified { // An event is emitted when SetIPNState is run above,
t.Error("watcher got called, want it to not be called") // so only fail on the second event.
eventCounter := 0
expectOne := func(c *Change) error {
eventCounter++
if eventCounter == 1 {
return nil
}
return errors.New("saw more than 1 event")
}
if err := eventbustest.Expect(tw, expectOne); err == nil {
t.Error("event got emitted, want it to not be called")
} }
} }

View File

@@ -120,7 +120,12 @@ func Expect(tw *Watcher, filters ...any) error {
// [Expect]. Use [Expect] if other events are allowed. // [Expect]. Use [Expect] if other events are allowed.
func ExpectExactly(tw *Watcher, filters ...any) error { func ExpectExactly(tw *Watcher, filters ...any) error {
if len(filters) == 0 { if len(filters) == 0 {
return errors.New("no event filters were provided") select {
case event := <-tw.events:
return fmt.Errorf("saw event type %s, expected none", reflect.TypeOf(event))
case <-time.After(tw.TimeOut):
return nil
}
} }
eventCount := 0 eventCount := 0
for pos, next := range filters { for pos, next := range filters {

View File

@@ -250,7 +250,7 @@ func TestExpectEvents(t *testing.T) {
tw := eventbustest.NewWatcher(t, bus) tw := eventbustest.NewWatcher(t, bus)
// TODO(cmol): When synctest is out of experimental, use that instead: // TODO(cmol): When synctest is out of experimental, use that instead:
// https://go.dev/blog/synctest // https://go.dev/blog/synctest
tw.TimeOut = 10 * time.Millisecond tw.TimeOut = 100 * time.Millisecond
client := bus.Client("testClient") client := bus.Client("testClient")
defer client.Close() defer client.Close()