diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 393596d75..3520be828 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -15,8 +15,8 @@ import ( // Bus is an event bus that distributes published events to interested // subscribers. type Bus struct { + router *worker write chan any - stop goroutineShutdownControl snapshot chan chan []any topicsMu sync.Mutex // guards everything below. @@ -30,15 +30,13 @@ type Bus struct { // New returns a new bus. Use [PublisherOf] to make event publishers, // and [Bus.Queue] and [Subscribe] to make event subscribers. func New() *Bus { - stopCtl, stopWorker := newGoroutineShutdown() ret := &Bus{ write: make(chan any), - stop: stopCtl, snapshot: make(chan chan []any), topics: map[reflect.Type][]*subscribeState{}, clients: set.Set[*Client]{}, } - go ret.pump(stopWorker) + ret.router = runWorker(ret.pump) return ret } @@ -67,7 +65,7 @@ func (b *Bus) Client(name string) *Client { // Close blocks until the bus is fully shut down. The bus is // permanently unusable after closing. func (b *Bus) Close() { - b.stop.StopAndWait() + b.router.StopAndWait() var clients set.Set[*Client] b.topicsMu.Lock() @@ -79,8 +77,7 @@ func (b *Bus) Close() { } } -func (b *Bus) pump(stop goroutineShutdownWorker) { - defer stop.Done() +func (b *Bus) pump(ctx context.Context) { var vals queue acceptCh := func() chan any { if vals.Full() { @@ -102,13 +99,13 @@ func (b *Bus) pump(stop goroutineShutdownWorker) { select { case d.write <- val: break deliverOne - case <-d.stop.WaitChan(): + case <-d.closed(): // Queue closed, don't block but continue // delivering to others. break deliverOne case in := <-acceptCh(): vals.Add(in) - case <-stop.Stop(): + case <-ctx.Done(): return case ch := <-b.snapshot: ch <- vals.Snapshot() @@ -122,7 +119,7 @@ func (b *Bus) pump(stop goroutineShutdownWorker) { // resuming. for vals.Empty() { select { - case <-stop.Stop(): + case <-ctx.Done(): return case val := <-b.write: vals.Add(val) @@ -168,59 +165,89 @@ func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) { b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) } -func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) { - ctx, cancel := context.WithCancel(context.Background()) +// A worker runs a worker goroutine and helps coordinate its shutdown. +type worker struct { + ctx context.Context + stop context.CancelFunc + stopped chan struct{} +} - ctl := goroutineShutdownControl{ - startShutdown: cancel, - shutdownFinished: make(chan struct{}), +// runWorker creates a worker goroutine running fn. The context passed +// to fn is canceled by [worker.Stop]. +func runWorker(fn func(context.Context)) *worker { + ctx, stop := context.WithCancel(context.Background()) + ret := &worker{ + ctx: ctx, + stop: stop, + stopped: make(chan struct{}), } - work := goroutineShutdownWorker{ - startShutdown: ctx.Done(), - shutdownFinished: ctl.shutdownFinished, + go ret.run(fn) + return ret +} + +func (w *worker) run(fn func(context.Context)) { + defer close(w.stopped) + fn(w.ctx) +} + +// Stop signals the worker goroutine to shut down. +func (w *worker) Stop() { w.stop() } + +// Done returns a channel that is closed when the worker goroutine +// exits. +func (w *worker) Done() <-chan struct{} { return w.stopped } + +// Wait waits until the worker goroutine has exited. +func (w *worker) Wait() { <-w.stopped } + +// StopAndWait signals the worker goroutine to shut down, then waits +// for it to exit. +func (w *worker) StopAndWait() { + w.stop() + <-w.stopped +} + +// stopFlag is a value that can be watched for a notification. The +// zero value is ready for use. +// +// The flag is notified by running [stopFlag.Stop]. Stop can be called +// multiple times. Upon the first call to Stop, [stopFlag.Done] is +// closed, all pending [stopFlag.Wait] calls return, and future Wait +// calls return immediately. +// +// A stopFlag can only notify once, and is intended for use as a +// one-way shutdown signal that's lighter than a cancellable +// context.Context. +type stopFlag struct { + // guards the lazy construction of stopped, and the value of + // alreadyStopped. + mu sync.Mutex + stopped chan struct{} + alreadyStopped bool +} + +func (s *stopFlag) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.alreadyStopped { + return } - - return ctl, work + s.alreadyStopped = true + if s.stopped == nil { + s.stopped = make(chan struct{}) + } + close(s.stopped) } -// goroutineShutdownControl is a helper type to manage the shutdown of -// a worker goroutine. The worker goroutine should use the -// goroutineShutdownWorker related to this controller. -type goroutineShutdownControl struct { - startShutdown context.CancelFunc - shutdownFinished chan struct{} +func (s *stopFlag) Done() <-chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + if s.stopped == nil { + s.stopped = make(chan struct{}) + } + return s.stopped } -func (ctl *goroutineShutdownControl) Stop() { - ctl.startShutdown() -} - -func (ctl *goroutineShutdownControl) Wait() { - <-ctl.shutdownFinished -} - -func (ctl *goroutineShutdownControl) WaitChan() <-chan struct{} { - return ctl.shutdownFinished -} - -func (ctl *goroutineShutdownControl) StopAndWait() { - ctl.Stop() - ctl.Wait() -} - -// goroutineShutdownWorker is a helper type for a worker goroutine to -// be notified that it should shut down, and to report that shutdown -// has completed. The notification is triggered by the related -// goroutineShutdownControl. -type goroutineShutdownWorker struct { - startShutdown <-chan struct{} - shutdownFinished chan struct{} -} - -func (work *goroutineShutdownWorker) Stop() <-chan struct{} { - return work.startShutdown -} - -func (work *goroutineShutdownWorker) Done() { - close(work.shutdownFinished) +func (s *stopFlag) Wait() { + <-s.Done() } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 19ddc1256..b2d0641d9 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -4,7 +4,6 @@ package eventbus import ( - "context" "reflect" ) @@ -17,17 +16,13 @@ type publisher interface { // A Publisher publishes typed events on a bus. type Publisher[T any] struct { - client *Client - stopCtx context.Context - stop context.CancelFunc + client *Client + stop stopFlag } func newPublisher[T any](c *Client) *Publisher[T] { - ctx, cancel := context.WithCancel(context.Background()) ret := &Publisher[T]{ - client: c, - stopCtx: ctx, - stop: cancel, + client: c, } c.addPublisher(ret) return ret @@ -39,7 +34,7 @@ func newPublisher[T any](c *Client) *Publisher[T] { func (p *Publisher[T]) Close() { // Just unblocks any active calls to Publish, no other // synchronization needed. - p.stop() + p.stop.Stop() p.client.deletePublisher(p) } @@ -52,14 +47,14 @@ func (p *Publisher[T]) Publish(v T) { // Check for just a stopped publisher or bus before trying to // write, so that once closed Publish consistently does nothing. select { - case <-p.stopCtx.Done(): + case <-p.stop.Done(): return default: } select { case p.client.publish() <- v: - case <-p.stopCtx.Done(): + case <-p.stop.Done(): } } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 896f0ce1f..606410c8e 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -27,7 +27,7 @@ type subscriber interface { // processing other potential sources of wakeups, which is how we end // up at this awkward type signature and sharing of internal state // through dispatch. - dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool + dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool Close() } @@ -35,29 +35,26 @@ type subscriber interface { type subscribeState struct { client *Client - write chan any - stop goroutineShutdownControl - snapshot chan chan []any + dispatcher *worker + write chan any + snapshot chan chan []any outputsMu sync.Mutex outputs map[reflect.Type]subscriber } func newSubscribeState(c *Client) *subscribeState { - stopCtl, stopWorker := newGoroutineShutdown() ret := &subscribeState{ client: c, write: make(chan any), - stop: stopCtl, snapshot: make(chan chan []any), outputs: map[reflect.Type]subscriber{}, } - go ret.pump(stopWorker) + ret.dispatcher = runWorker(ret.pump) return ret } -func (q *subscribeState) pump(stop goroutineShutdownWorker) { - defer stop.Done() +func (q *subscribeState) pump(ctx context.Context) { var vals queue acceptCh := func() chan any { if vals.Full() { @@ -74,7 +71,7 @@ func (q *subscribeState) pump(stop goroutineShutdownWorker) { vals.Drop() continue } - if !sub.dispatch(&vals, stop, acceptCh) { + if !sub.dispatch(ctx, &vals, acceptCh) { return } } else { @@ -85,7 +82,7 @@ func (q *subscribeState) pump(stop goroutineShutdownWorker) { select { case val := <-q.write: vals.Add(val) - case <-stop.Stop(): + case <-ctx.Done(): return case ch := <-q.snapshot: ch <- vals.Snapshot() @@ -120,7 +117,7 @@ func (q *subscribeState) subscriberFor(val any) subscriber { // Close closes the subscribeState. Implicitly closes all Subscribers // linked to this state, and any pending events are discarded. func (s *subscribeState) close() { - s.stop.StopAndWait() + s.dispatcher.StopAndWait() var subs map[reflect.Type]subscriber s.outputsMu.Lock() @@ -131,23 +128,23 @@ func (s *subscribeState) close() { } } +func (s *subscribeState) closed() <-chan struct{} { + return s.dispatcher.Done() +} + // A Subscriber delivers one type of event from a [Client]. type Subscriber[T any] struct { - doneCtx context.Context - done context.CancelFunc - recv *subscribeState - read chan T + stop stopFlag + recv *subscribeState + read chan T } func newSubscriber[T any](r *subscribeState) *Subscriber[T] { t := reflect.TypeFor[T]() - ctx, cancel := context.WithCancel(context.Background()) ret := &Subscriber[T]{ - doneCtx: ctx, - done: cancel, - recv: r, - read: make(chan T), + recv: r, + read: make(chan T), } r.addSubscriber(t, ret) @@ -158,7 +155,7 @@ func (s *Subscriber[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } -func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool { +func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool { t := vals.Peek().(T) for { // Keep the cases in this select in sync with subscribeState.pump @@ -170,7 +167,7 @@ func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acce return true case val := <-acceptCh(): vals.Add(val) - case <-stop.Stop(): + case <-ctx.Done(): return false case ch := <-s.recv.snapshot: ch <- vals.Snapshot() @@ -187,13 +184,13 @@ func (s *Subscriber[T]) Events() <-chan T { // Done returns a channel that is closed when the subscriber is // closed. func (s *Subscriber[T]) Done() <-chan struct{} { - return s.doneCtx.Done() + return s.stop.Done() } // Close closes the Subscriber, indicating the caller no longer wishes // to receive this event type. After Close, receives on // [Subscriber.Events] block for ever. func (s *Subscriber[T]) Close() { - s.done() // unblock receivers + s.stop.Stop() // unblock receivers s.recv.deleteSubscriber(reflect.TypeFor[T]()) }