health: remove direct callback and replace with eventbus (#17199)

Pulls out the last callback logic and ensures timers are still running.

The eventbustest package is updated support the absence of events.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl
2025-09-19 14:58:37 -04:00
committed by GitHub
parent d559a21418
commit 009d702adf
5 changed files with 111 additions and 148 deletions

View File

@@ -23,6 +23,7 @@ import (
"tailscale.com/types/persist"
"tailscale.com/types/structs"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/execqueue"
)
@@ -122,7 +123,9 @@ type Auto struct {
observerQueue execqueue.ExecQueue
shutdownFn func() // to be called prior to shutdown or nil
unregisterHealthWatch func()
eventClient *eventbus.Client
healthChangeSub *eventbus.Subscriber[health.Change]
subsDoneCh chan struct{} // close-only channel when eventClient has closed
mu sync.Mutex // mutex guards the following fields
@@ -192,21 +195,42 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
updateDone: make(chan struct{}),
observer: opts.Observer,
shutdownFn: opts.Shutdown,
subsDoneCh: make(chan struct{}),
}
c.eventClient = opts.Bus.Client("controlClient.Auto")
c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient)
c.authCtx, c.authCancel = context.WithCancel(context.Background())
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf)
c.unregisterHealthWatch = opts.HealthTracker.RegisterWatcher(func(c health.Change) {
if c.WarnableChanged {
direct.ReportWarnableChange(c.Warnable, c.UnhealthyState)
}
})
go c.consumeEventbusTopics()
return c, nil
}
// consumeEventbusTopics consumes events from all relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (c *Auto) consumeEventbusTopics() {
defer close(c.subsDoneCh)
for {
select {
case <-c.eventClient.Done():
return
case change := <-c.healthChangeSub.Events():
if change.WarnableChanged {
c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState)
}
}
}
}
// SetPaused controls whether HTTP activity should be paused.
//
// The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once.
@@ -760,6 +784,9 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
}
func (c *Auto) Shutdown() {
c.eventClient.Close()
<-c.subsDoneCh
c.mu.Lock()
if c.closed {
c.mu.Unlock()
@@ -783,7 +810,6 @@ func (c *Auto) Shutdown() {
shutdownFn()
}
c.unregisterHealthWatch()
<-c.authDone
<-c.mapDone
<-c.updateDone