util/eventbus: adjust worker goroutine management helpers

This makes the helpers closer in behavior to cancelable contexts
and taskgroup.Single, and makes the worker code use a more normal
and easier to reason about context.Context for shutdown.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-04 12:08:32 -08:00 committed by Dave Anderson
parent 5eafce7e25
commit 24d4846f00
3 changed files with 113 additions and 94 deletions

View File

@ -15,8 +15,8 @@ import (
// Bus is an event bus that distributes published events to interested // Bus is an event bus that distributes published events to interested
// subscribers. // subscribers.
type Bus struct { type Bus struct {
router *worker
write chan any write chan any
stop goroutineShutdownControl
snapshot chan chan []any snapshot chan chan []any
topicsMu sync.Mutex // guards everything below. topicsMu sync.Mutex // guards everything below.
@ -30,15 +30,13 @@ type Bus struct {
// New returns a new bus. Use [PublisherOf] to make event publishers, // New returns a new bus. Use [PublisherOf] to make event publishers,
// and [Bus.Queue] and [Subscribe] to make event subscribers. // and [Bus.Queue] and [Subscribe] to make event subscribers.
func New() *Bus { func New() *Bus {
stopCtl, stopWorker := newGoroutineShutdown()
ret := &Bus{ ret := &Bus{
write: make(chan any), write: make(chan any),
stop: stopCtl,
snapshot: make(chan chan []any), snapshot: make(chan chan []any),
topics: map[reflect.Type][]*subscribeState{}, topics: map[reflect.Type][]*subscribeState{},
clients: set.Set[*Client]{}, clients: set.Set[*Client]{},
} }
go ret.pump(stopWorker) ret.router = runWorker(ret.pump)
return ret return ret
} }
@ -67,7 +65,7 @@ func (b *Bus) Client(name string) *Client {
// Close blocks until the bus is fully shut down. The bus is // Close blocks until the bus is fully shut down. The bus is
// permanently unusable after closing. // permanently unusable after closing.
func (b *Bus) Close() { func (b *Bus) Close() {
b.stop.StopAndWait() b.router.StopAndWait()
var clients set.Set[*Client] var clients set.Set[*Client]
b.topicsMu.Lock() b.topicsMu.Lock()
@ -79,8 +77,7 @@ func (b *Bus) Close() {
} }
} }
func (b *Bus) pump(stop goroutineShutdownWorker) { func (b *Bus) pump(ctx context.Context) {
defer stop.Done()
var vals queue var vals queue
acceptCh := func() chan any { acceptCh := func() chan any {
if vals.Full() { if vals.Full() {
@ -102,13 +99,13 @@ func (b *Bus) pump(stop goroutineShutdownWorker) {
select { select {
case d.write <- val: case d.write <- val:
break deliverOne break deliverOne
case <-d.stop.WaitChan(): case <-d.closed():
// Queue closed, don't block but continue // Queue closed, don't block but continue
// delivering to others. // delivering to others.
break deliverOne break deliverOne
case in := <-acceptCh(): case in := <-acceptCh():
vals.Add(in) vals.Add(in)
case <-stop.Stop(): case <-ctx.Done():
return return
case ch := <-b.snapshot: case ch := <-b.snapshot:
ch <- vals.Snapshot() ch <- vals.Snapshot()
@ -122,7 +119,7 @@ func (b *Bus) pump(stop goroutineShutdownWorker) {
// resuming. // resuming.
for vals.Empty() { for vals.Empty() {
select { select {
case <-stop.Stop(): case <-ctx.Done():
return return
case val := <-b.write: case val := <-b.write:
vals.Add(val) vals.Add(val)
@ -168,59 +165,89 @@ func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) {
b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1)
} }
func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) { // A worker runs a worker goroutine and helps coordinate its shutdown.
ctx, cancel := context.WithCancel(context.Background()) type worker struct {
ctx context.Context
stop context.CancelFunc
stopped chan struct{}
}
ctl := goroutineShutdownControl{ // runWorker creates a worker goroutine running fn. The context passed
startShutdown: cancel, // to fn is canceled by [worker.Stop].
shutdownFinished: make(chan struct{}), func runWorker(fn func(context.Context)) *worker {
ctx, stop := context.WithCancel(context.Background())
ret := &worker{
ctx: ctx,
stop: stop,
stopped: make(chan struct{}),
} }
work := goroutineShutdownWorker{ go ret.run(fn)
startShutdown: ctx.Done(), return ret
shutdownFinished: ctl.shutdownFinished, }
func (w *worker) run(fn func(context.Context)) {
defer close(w.stopped)
fn(w.ctx)
}
// Stop signals the worker goroutine to shut down.
func (w *worker) Stop() { w.stop() }
// Done returns a channel that is closed when the worker goroutine
// exits.
func (w *worker) Done() <-chan struct{} { return w.stopped }
// Wait waits until the worker goroutine has exited.
func (w *worker) Wait() { <-w.stopped }
// StopAndWait signals the worker goroutine to shut down, then waits
// for it to exit.
func (w *worker) StopAndWait() {
w.stop()
<-w.stopped
}
// stopFlag is a value that can be watched for a notification. The
// zero value is ready for use.
//
// The flag is notified by running [stopFlag.Stop]. Stop can be called
// multiple times. Upon the first call to Stop, [stopFlag.Done] is
// closed, all pending [stopFlag.Wait] calls return, and future Wait
// calls return immediately.
//
// A stopFlag can only notify once, and is intended for use as a
// one-way shutdown signal that's lighter than a cancellable
// context.Context.
type stopFlag struct {
// guards the lazy construction of stopped, and the value of
// alreadyStopped.
mu sync.Mutex
stopped chan struct{}
alreadyStopped bool
}
func (s *stopFlag) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.alreadyStopped {
return
} }
s.alreadyStopped = true
return ctl, work if s.stopped == nil {
s.stopped = make(chan struct{})
}
close(s.stopped)
} }
// goroutineShutdownControl is a helper type to manage the shutdown of func (s *stopFlag) Done() <-chan struct{} {
// a worker goroutine. The worker goroutine should use the s.mu.Lock()
// goroutineShutdownWorker related to this controller. defer s.mu.Unlock()
type goroutineShutdownControl struct { if s.stopped == nil {
startShutdown context.CancelFunc s.stopped = make(chan struct{})
shutdownFinished chan struct{} }
return s.stopped
} }
func (ctl *goroutineShutdownControl) Stop() { func (s *stopFlag) Wait() {
ctl.startShutdown() <-s.Done()
}
func (ctl *goroutineShutdownControl) Wait() {
<-ctl.shutdownFinished
}
func (ctl *goroutineShutdownControl) WaitChan() <-chan struct{} {
return ctl.shutdownFinished
}
func (ctl *goroutineShutdownControl) StopAndWait() {
ctl.Stop()
ctl.Wait()
}
// goroutineShutdownWorker is a helper type for a worker goroutine to
// be notified that it should shut down, and to report that shutdown
// has completed. The notification is triggered by the related
// goroutineShutdownControl.
type goroutineShutdownWorker struct {
startShutdown <-chan struct{}
shutdownFinished chan struct{}
}
func (work *goroutineShutdownWorker) Stop() <-chan struct{} {
return work.startShutdown
}
func (work *goroutineShutdownWorker) Done() {
close(work.shutdownFinished)
} }

View File

@ -4,7 +4,6 @@
package eventbus package eventbus
import ( import (
"context"
"reflect" "reflect"
) )
@ -17,17 +16,13 @@ type publisher interface {
// A Publisher publishes typed events on a bus. // A Publisher publishes typed events on a bus.
type Publisher[T any] struct { type Publisher[T any] struct {
client *Client client *Client
stopCtx context.Context stop stopFlag
stop context.CancelFunc
} }
func newPublisher[T any](c *Client) *Publisher[T] { func newPublisher[T any](c *Client) *Publisher[T] {
ctx, cancel := context.WithCancel(context.Background())
ret := &Publisher[T]{ ret := &Publisher[T]{
client: c, client: c,
stopCtx: ctx,
stop: cancel,
} }
c.addPublisher(ret) c.addPublisher(ret)
return ret return ret
@ -39,7 +34,7 @@ func newPublisher[T any](c *Client) *Publisher[T] {
func (p *Publisher[T]) Close() { func (p *Publisher[T]) Close() {
// Just unblocks any active calls to Publish, no other // Just unblocks any active calls to Publish, no other
// synchronization needed. // synchronization needed.
p.stop() p.stop.Stop()
p.client.deletePublisher(p) p.client.deletePublisher(p)
} }
@ -52,14 +47,14 @@ func (p *Publisher[T]) Publish(v T) {
// Check for just a stopped publisher or bus before trying to // Check for just a stopped publisher or bus before trying to
// write, so that once closed Publish consistently does nothing. // write, so that once closed Publish consistently does nothing.
select { select {
case <-p.stopCtx.Done(): case <-p.stop.Done():
return return
default: default:
} }
select { select {
case p.client.publish() <- v: case p.client.publish() <- v:
case <-p.stopCtx.Done(): case <-p.stop.Done():
} }
} }

View File

@ -27,7 +27,7 @@ type subscriber interface {
// processing other potential sources of wakeups, which is how we end // processing other potential sources of wakeups, which is how we end
// up at this awkward type signature and sharing of internal state // up at this awkward type signature and sharing of internal state
// through dispatch. // through dispatch.
dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool
Close() Close()
} }
@ -35,29 +35,26 @@ type subscriber interface {
type subscribeState struct { type subscribeState struct {
client *Client client *Client
write chan any dispatcher *worker
stop goroutineShutdownControl write chan any
snapshot chan chan []any snapshot chan chan []any
outputsMu sync.Mutex outputsMu sync.Mutex
outputs map[reflect.Type]subscriber outputs map[reflect.Type]subscriber
} }
func newSubscribeState(c *Client) *subscribeState { func newSubscribeState(c *Client) *subscribeState {
stopCtl, stopWorker := newGoroutineShutdown()
ret := &subscribeState{ ret := &subscribeState{
client: c, client: c,
write: make(chan any), write: make(chan any),
stop: stopCtl,
snapshot: make(chan chan []any), snapshot: make(chan chan []any),
outputs: map[reflect.Type]subscriber{}, outputs: map[reflect.Type]subscriber{},
} }
go ret.pump(stopWorker) ret.dispatcher = runWorker(ret.pump)
return ret return ret
} }
func (q *subscribeState) pump(stop goroutineShutdownWorker) { func (q *subscribeState) pump(ctx context.Context) {
defer stop.Done()
var vals queue var vals queue
acceptCh := func() chan any { acceptCh := func() chan any {
if vals.Full() { if vals.Full() {
@ -74,7 +71,7 @@ func (q *subscribeState) pump(stop goroutineShutdownWorker) {
vals.Drop() vals.Drop()
continue continue
} }
if !sub.dispatch(&vals, stop, acceptCh) { if !sub.dispatch(ctx, &vals, acceptCh) {
return return
} }
} else { } else {
@ -85,7 +82,7 @@ func (q *subscribeState) pump(stop goroutineShutdownWorker) {
select { select {
case val := <-q.write: case val := <-q.write:
vals.Add(val) vals.Add(val)
case <-stop.Stop(): case <-ctx.Done():
return return
case ch := <-q.snapshot: case ch := <-q.snapshot:
ch <- vals.Snapshot() ch <- vals.Snapshot()
@ -120,7 +117,7 @@ func (q *subscribeState) subscriberFor(val any) subscriber {
// Close closes the subscribeState. Implicitly closes all Subscribers // Close closes the subscribeState. Implicitly closes all Subscribers
// linked to this state, and any pending events are discarded. // linked to this state, and any pending events are discarded.
func (s *subscribeState) close() { func (s *subscribeState) close() {
s.stop.StopAndWait() s.dispatcher.StopAndWait()
var subs map[reflect.Type]subscriber var subs map[reflect.Type]subscriber
s.outputsMu.Lock() s.outputsMu.Lock()
@ -131,23 +128,23 @@ func (s *subscribeState) close() {
} }
} }
func (s *subscribeState) closed() <-chan struct{} {
return s.dispatcher.Done()
}
// A Subscriber delivers one type of event from a [Client]. // A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct { type Subscriber[T any] struct {
doneCtx context.Context stop stopFlag
done context.CancelFunc recv *subscribeState
recv *subscribeState read chan T
read chan T
} }
func newSubscriber[T any](r *subscribeState) *Subscriber[T] { func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
t := reflect.TypeFor[T]() t := reflect.TypeFor[T]()
ctx, cancel := context.WithCancel(context.Background())
ret := &Subscriber[T]{ ret := &Subscriber[T]{
doneCtx: ctx, recv: r,
done: cancel, read: make(chan T),
recv: r,
read: make(chan T),
} }
r.addSubscriber(t, ret) r.addSubscriber(t, ret)
@ -158,7 +155,7 @@ func (s *Subscriber[T]) subscribeType() reflect.Type {
return reflect.TypeFor[T]() return reflect.TypeFor[T]()
} }
func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool {
t := vals.Peek().(T) t := vals.Peek().(T)
for { for {
// Keep the cases in this select in sync with subscribeState.pump // Keep the cases in this select in sync with subscribeState.pump
@ -170,7 +167,7 @@ func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acce
return true return true
case val := <-acceptCh(): case val := <-acceptCh():
vals.Add(val) vals.Add(val)
case <-stop.Stop(): case <-ctx.Done():
return false return false
case ch := <-s.recv.snapshot: case ch := <-s.recv.snapshot:
ch <- vals.Snapshot() ch <- vals.Snapshot()
@ -187,13 +184,13 @@ func (s *Subscriber[T]) Events() <-chan T {
// Done returns a channel that is closed when the subscriber is // Done returns a channel that is closed when the subscriber is
// closed. // closed.
func (s *Subscriber[T]) Done() <-chan struct{} { func (s *Subscriber[T]) Done() <-chan struct{} {
return s.doneCtx.Done() return s.stop.Done()
} }
// Close closes the Subscriber, indicating the caller no longer wishes // Close closes the Subscriber, indicating the caller no longer wishes
// to receive this event type. After Close, receives on // to receive this event type. After Close, receives on
// [Subscriber.Events] block for ever. // [Subscriber.Events] block for ever.
func (s *Subscriber[T]) Close() { func (s *Subscriber[T]) Close() {
s.done() // unblock receivers s.stop.Stop() // unblock receivers
s.recv.deleteSubscriber(reflect.TypeFor[T]()) s.recv.deleteSubscriber(reflect.TypeFor[T]())
} }