mirror of
https://github.com/tailscale/tailscale.git
synced 2025-10-15 02:30:07 +00:00
health: remove direct callback and replace with eventbus (#17199)
Pulls out the last callback logic and ensures timers are still running. The eventbustest package is updated support the absence of events. Updates #15160 Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
||||
"tailscale.com/types/persist"
|
||||
"tailscale.com/types/structs"
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/eventbus"
|
||||
"tailscale.com/util/execqueue"
|
||||
)
|
||||
|
||||
@@ -122,7 +123,9 @@ type Auto struct {
|
||||
observerQueue execqueue.ExecQueue
|
||||
shutdownFn func() // to be called prior to shutdown or nil
|
||||
|
||||
unregisterHealthWatch func()
|
||||
eventClient *eventbus.Client
|
||||
healthChangeSub *eventbus.Subscriber[health.Change]
|
||||
subsDoneCh chan struct{} // close-only channel when eventClient has closed
|
||||
|
||||
mu sync.Mutex // mutex guards the following fields
|
||||
|
||||
@@ -192,21 +195,42 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
|
||||
updateDone: make(chan struct{}),
|
||||
observer: opts.Observer,
|
||||
shutdownFn: opts.Shutdown,
|
||||
subsDoneCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.eventClient = opts.Bus.Client("controlClient.Auto")
|
||||
c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient)
|
||||
|
||||
c.authCtx, c.authCancel = context.WithCancel(context.Background())
|
||||
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
|
||||
|
||||
c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
|
||||
c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf)
|
||||
|
||||
c.unregisterHealthWatch = opts.HealthTracker.RegisterWatcher(func(c health.Change) {
|
||||
if c.WarnableChanged {
|
||||
direct.ReportWarnableChange(c.Warnable, c.UnhealthyState)
|
||||
}
|
||||
})
|
||||
go c.consumeEventbusTopics()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// consumeEventbusTopics consumes events from all relevant
|
||||
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
||||
// always handled in the order they are received, i.e. the next event is not
|
||||
// read until the previous event's handler has returned. It returns when the
|
||||
// [eventbus.Client] is closed.
|
||||
func (c *Auto) consumeEventbusTopics() {
|
||||
defer close(c.subsDoneCh)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.eventClient.Done():
|
||||
return
|
||||
case change := <-c.healthChangeSub.Events():
|
||||
if change.WarnableChanged {
|
||||
c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetPaused controls whether HTTP activity should be paused.
|
||||
//
|
||||
// The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once.
|
||||
@@ -760,6 +784,9 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
|
||||
}
|
||||
|
||||
func (c *Auto) Shutdown() {
|
||||
c.eventClient.Close()
|
||||
<-c.subsDoneCh
|
||||
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
@@ -783,7 +810,6 @@ func (c *Auto) Shutdown() {
|
||||
shutdownFn()
|
||||
}
|
||||
|
||||
c.unregisterHealthWatch()
|
||||
<-c.authDone
|
||||
<-c.mapDone
|
||||
<-c.updateDone
|
||||
|
120
health/health.go
120
health/health.go
@@ -28,7 +28,6 @@ import (
|
||||
"tailscale.com/util/eventbus"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/multierr"
|
||||
"tailscale.com/util/set"
|
||||
"tailscale.com/util/usermetric"
|
||||
"tailscale.com/version"
|
||||
)
|
||||
@@ -65,6 +64,21 @@ var receiveNames = []string{
|
||||
// Tracker tracks the health of various Tailscale subsystems,
|
||||
// comparing each subsystems' state with each other to make sure
|
||||
// they're consistent based on the user's intended state.
|
||||
//
|
||||
// If a client [Warnable] becomes unhealthy or its unhealthy state is updated,
|
||||
// an event will be emitted with WarnableChanged set to true and the Warnable
|
||||
// and its UnhealthyState:
|
||||
//
|
||||
// Change{WarnableChanged: true, Warnable: w, UnhealthyState: us}
|
||||
//
|
||||
// If a Warnable becomes healthy, an event will be emitted with
|
||||
// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil:
|
||||
//
|
||||
// Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil}
|
||||
//
|
||||
// If the health messages from the control-plane change, an event will be
|
||||
// emitted with ControlHealthChanged set to true. Recipients can fetch the set of
|
||||
// control-plane health messages by calling [Tracker.CurrentState]:
|
||||
type Tracker struct {
|
||||
// MagicSockReceiveFuncs tracks the state of the three
|
||||
// magicsock receive functions: IPv4, IPv6, and DERP.
|
||||
@@ -91,9 +105,8 @@ type Tracker struct {
|
||||
|
||||
// sysErr maps subsystems to their current error (or nil if the subsystem is healthy)
|
||||
// Deprecated: using Warnables should be preferred
|
||||
sysErr map[Subsystem]error
|
||||
watchers set.HandleSet[func(Change)] // opt func to run if error state changes
|
||||
timer tstime.TimerController
|
||||
sysErr map[Subsystem]error
|
||||
timer tstime.TimerController
|
||||
|
||||
latestVersion *tailcfg.ClientVersion // or nil
|
||||
checkForUpdates bool
|
||||
@@ -131,10 +144,12 @@ func NewTracker(bus *eventbus.Bus) *Tracker {
|
||||
}
|
||||
|
||||
cli := bus.Client("health.Tracker")
|
||||
return &Tracker{
|
||||
t := &Tracker{
|
||||
eventClient: cli,
|
||||
changePub: eventbus.Publish[Change](cli),
|
||||
}
|
||||
t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck)
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *Tracker) now() time.Time {
|
||||
@@ -455,33 +470,6 @@ func (t *Tracker) setUnhealthyLocked(w *Warnable, args Args) {
|
||||
})
|
||||
mak.Set(&t.pendingVisibleTimers, w, tc)
|
||||
}
|
||||
|
||||
// Direct callbacks
|
||||
// TODO(cmol): Remove once all watchers have been moved to events
|
||||
for _, cb := range t.watchers {
|
||||
// If the Warnable has been unhealthy for more than its TimeToVisible, the callback should be
|
||||
// executed immediately. Otherwise, the callback should be enqueued to run once the Warnable
|
||||
// becomes visible.
|
||||
if w.IsVisible(ws, t.now) {
|
||||
cb(change)
|
||||
continue
|
||||
}
|
||||
|
||||
// The time remaining until the Warnable will be visible to the user is the TimeToVisible
|
||||
// minus the time that has already passed since the Warnable became unhealthy.
|
||||
visibleIn := w.TimeToVisible - t.now().Sub(brokenSince)
|
||||
var tc tstime.TimerController = t.clock().AfterFunc(visibleIn, func() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
// Check if the Warnable is still unhealthy, as it could have become healthy between the time
|
||||
// the timer was set for and the time it was executed.
|
||||
if t.warnableVal[w] != nil {
|
||||
cb(change)
|
||||
delete(t.pendingVisibleTimers, w)
|
||||
}
|
||||
})
|
||||
mak.Set(&t.pendingVisibleTimers, w, tc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -514,10 +502,6 @@ func (t *Tracker) setHealthyLocked(w *Warnable) {
|
||||
Warnable: w,
|
||||
}
|
||||
t.changePub.Publish(change)
|
||||
for _, cb := range t.watchers {
|
||||
// TODO(cmol): Remove once all watchers have been moved to events
|
||||
cb(change)
|
||||
}
|
||||
}
|
||||
|
||||
// notifyWatchersControlChangedLocked calls each watcher to signal that control
|
||||
@@ -526,13 +510,7 @@ func (t *Tracker) notifyWatchersControlChangedLocked() {
|
||||
change := Change{
|
||||
ControlHealthChanged: true,
|
||||
}
|
||||
if t.changePub != nil {
|
||||
t.changePub.Publish(change)
|
||||
}
|
||||
for _, cb := range t.watchers {
|
||||
// TODO(cmol): Remove once all watchers have been moved to events
|
||||
cb(change)
|
||||
}
|
||||
t.changePub.Publish(change)
|
||||
}
|
||||
|
||||
// AppendWarnableDebugFlags appends to base any health items that are currently in failed
|
||||
@@ -577,62 +555,6 @@ type Change struct {
|
||||
UnhealthyState *UnhealthyState
|
||||
}
|
||||
|
||||
// RegisterWatcher adds a function that will be called its own goroutine
|
||||
// whenever the health state of any client [Warnable] or control-plane health
|
||||
// messages changes. The returned function can be used to unregister the
|
||||
// callback.
|
||||
//
|
||||
// If a client [Warnable] becomes unhealthy or its unhealthy state is updated,
|
||||
// the callback will be called with WarnableChanged set to true and the Warnable
|
||||
// and its UnhealthyState:
|
||||
//
|
||||
// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: us})
|
||||
//
|
||||
// If a Warnable becomes healthy, the callback will be called with
|
||||
// WarnableChanged set to true, the Warnable set, and UnhealthyState set to nil:
|
||||
//
|
||||
// go cb(Change{WarnableChanged: true, Warnable: w, UnhealthyState: nil})
|
||||
//
|
||||
// If the health messages from the control-plane change, the callback will be
|
||||
// called with ControlHealthChanged set to true. Recipients can fetch the set of
|
||||
// control-plane health messages by calling [Tracker.CurrentState]:
|
||||
//
|
||||
// go cb(Change{ControlHealthChanged: true})
|
||||
func (t *Tracker) RegisterWatcher(cb func(Change)) (unregister func()) {
|
||||
return t.registerSyncWatcher(func(c Change) {
|
||||
go cb(c)
|
||||
})
|
||||
}
|
||||
|
||||
// registerSyncWatcher adds a function that will be called whenever the health
|
||||
// state changes. The provided callback function will be executed synchronously.
|
||||
// Call RegisterWatcher to register any callbacks that won't return from
|
||||
// execution immediately.
|
||||
func (t *Tracker) registerSyncWatcher(cb func(c Change)) (unregister func()) {
|
||||
if t.nil() {
|
||||
return func() {}
|
||||
}
|
||||
t.initOnce.Do(t.doOnceInit)
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.watchers == nil {
|
||||
t.watchers = set.HandleSet[func(Change)]{}
|
||||
}
|
||||
handle := t.watchers.Add(cb)
|
||||
if t.timer == nil {
|
||||
t.timer = t.clock().AfterFunc(time.Minute, t.timerSelfCheck)
|
||||
}
|
||||
return func() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
delete(t.watchers, handle)
|
||||
if len(t.watchers) == 0 && t.timer != nil {
|
||||
t.timer.Stop()
|
||||
t.timer = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetRouterHealth sets the state of the wgengine/router.Router.
|
||||
//
|
||||
// Deprecated: Warnables should be preferred over Subsystem errors.
|
||||
|
@@ -4,6 +4,7 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"reflect"
|
||||
@@ -158,15 +159,6 @@ func TestWatcher(t *testing.T) {
|
||||
name string
|
||||
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
|
||||
}{
|
||||
{
|
||||
name: "with-callbacks",
|
||||
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
|
||||
t.Cleanup(tht.RegisterWatcher(fn))
|
||||
if len(tht.watchers) != 1 {
|
||||
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with-eventbus",
|
||||
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
|
||||
@@ -254,15 +246,6 @@ func TestSetUnhealthyWithTimeToVisible(t *testing.T) {
|
||||
name string
|
||||
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
|
||||
}{
|
||||
{
|
||||
name: "with-callbacks",
|
||||
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
|
||||
t.Cleanup(tht.RegisterWatcher(fn))
|
||||
if len(tht.watchers) != 1 {
|
||||
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with-eventbus",
|
||||
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
|
||||
@@ -668,7 +651,7 @@ func TestControlHealthNotifies(t *testing.T) {
|
||||
name string
|
||||
initialState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage
|
||||
newState map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage
|
||||
wantNotify bool
|
||||
wantEvents []any
|
||||
}
|
||||
tests := []test{
|
||||
{
|
||||
@@ -679,7 +662,7 @@ func TestControlHealthNotifies(t *testing.T) {
|
||||
newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
|
||||
"test": {},
|
||||
},
|
||||
wantNotify: false,
|
||||
wantEvents: []any{},
|
||||
},
|
||||
{
|
||||
name: "on-set",
|
||||
@@ -687,7 +670,9 @@ func TestControlHealthNotifies(t *testing.T) {
|
||||
newState: map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
|
||||
"test": {},
|
||||
},
|
||||
wantNotify: true,
|
||||
wantEvents: []any{
|
||||
eventbustest.Type[Change](),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "details-change",
|
||||
@@ -701,7 +686,9 @@ func TestControlHealthNotifies(t *testing.T) {
|
||||
Title: "Updated title",
|
||||
},
|
||||
},
|
||||
wantNotify: true,
|
||||
wantEvents: []any{
|
||||
eventbustest.Type[Change](),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "action-changes",
|
||||
@@ -721,42 +708,54 @@ func TestControlHealthNotifies(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantNotify: true,
|
||||
wantEvents: []any{
|
||||
eventbustest.Type[Change](),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
ht := NewTracker(eventbustest.NewBus(t))
|
||||
bus := eventbustest.NewBus(t)
|
||||
tw := eventbustest.NewWatcher(t, bus)
|
||||
tw.TimeOut = time.Second
|
||||
|
||||
ht := NewTracker(bus)
|
||||
ht.SetIPNState("NeedsLogin", true)
|
||||
ht.GotStreamedMapResponse()
|
||||
|
||||
if len(test.initialState) != 0 {
|
||||
ht.SetControlHealth(test.initialState)
|
||||
// Expect events at starup, before doing anything else
|
||||
if err := eventbustest.ExpectExactly(tw,
|
||||
eventbustest.Type[Change](), // warming-up
|
||||
eventbustest.Type[Change](), // is-using-unstable-version
|
||||
eventbustest.Type[Change](), // not-in-map-poll
|
||||
); err != nil {
|
||||
t.Errorf("startup error: %v", err)
|
||||
}
|
||||
|
||||
gotNotified := false
|
||||
ht.registerSyncWatcher(func(_ Change) {
|
||||
gotNotified = true
|
||||
})
|
||||
// Only set initial state if we need to
|
||||
if len(test.initialState) != 0 {
|
||||
ht.SetControlHealth(test.initialState)
|
||||
if err := eventbustest.ExpectExactly(tw, eventbustest.Type[Change]()); err != nil {
|
||||
t.Errorf("initial state error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
ht.SetControlHealth(test.newState)
|
||||
|
||||
if gotNotified != test.wantNotify {
|
||||
t.Errorf("notified: got %v, want %v", gotNotified, test.wantNotify)
|
||||
if err := eventbustest.ExpectExactly(tw, test.wantEvents...); err != nil {
|
||||
t.Errorf("event error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
|
||||
ht := NewTracker(eventbustest.NewBus(t))
|
||||
bus := eventbustest.NewBus(t)
|
||||
tw := eventbustest.NewWatcher(t, bus)
|
||||
tw.TimeOut = 100 * time.Millisecond
|
||||
ht := NewTracker(bus)
|
||||
ht.SetIPNState("NeedsLogin", true)
|
||||
|
||||
gotNotified := false
|
||||
ht.registerSyncWatcher(func(_ Change) {
|
||||
gotNotified = true
|
||||
})
|
||||
|
||||
ht.SetControlHealth(map[tailcfg.DisplayMessageID]tailcfg.DisplayMessage{
|
||||
"control-health": {},
|
||||
})
|
||||
@@ -768,8 +767,19 @@ func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
|
||||
t.Error("got a warning with code 'control-health', want none")
|
||||
}
|
||||
|
||||
if gotNotified {
|
||||
t.Error("watcher got called, want it to not be called")
|
||||
// An event is emitted when SetIPNState is run above,
|
||||
// so only fail on the second event.
|
||||
eventCounter := 0
|
||||
expectOne := func(c *Change) error {
|
||||
eventCounter++
|
||||
if eventCounter == 1 {
|
||||
return nil
|
||||
}
|
||||
return errors.New("saw more than 1 event")
|
||||
}
|
||||
|
||||
if err := eventbustest.Expect(tw, expectOne); err == nil {
|
||||
t.Error("event got emitted, want it to not be called")
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -120,7 +120,12 @@ func Expect(tw *Watcher, filters ...any) error {
|
||||
// [Expect]. Use [Expect] if other events are allowed.
|
||||
func ExpectExactly(tw *Watcher, filters ...any) error {
|
||||
if len(filters) == 0 {
|
||||
return errors.New("no event filters were provided")
|
||||
select {
|
||||
case event := <-tw.events:
|
||||
return fmt.Errorf("saw event type %s, expected none", reflect.TypeOf(event))
|
||||
case <-time.After(tw.TimeOut):
|
||||
return nil
|
||||
}
|
||||
}
|
||||
eventCount := 0
|
||||
for pos, next := range filters {
|
||||
|
@@ -250,7 +250,7 @@ func TestExpectEvents(t *testing.T) {
|
||||
tw := eventbustest.NewWatcher(t, bus)
|
||||
// TODO(cmol): When synctest is out of experimental, use that instead:
|
||||
// https://go.dev/blog/synctest
|
||||
tw.TimeOut = 10 * time.Millisecond
|
||||
tw.TimeOut = 100 * time.Millisecond
|
||||
|
||||
client := bus.Client("testClient")
|
||||
defer client.Close()
|
||||
|
Reference in New Issue
Block a user