util/eventbus: rework to have a Client abstraction

The Client carries both publishers and subscribers for a single
actor. This makes the APIs for publish and subscribe look more
similar, and this structure is a better fit for upcoming debug
facilities.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-04 11:22:30 -08:00 committed by Dave Anderson
parent f840aad49e
commit 3e18434595
6 changed files with 346 additions and 209 deletions

View File

@ -20,12 +20,11 @@ type Bus struct {
snapshot chan chan []any snapshot chan chan []any
topicsMu sync.Mutex // guards everything below. topicsMu sync.Mutex // guards everything below.
topics map[reflect.Type][]*Queue topics map[reflect.Type][]*subscribeState
// Used for introspection/debugging only, not in the normal event // Used for introspection/debugging only, not in the normal event
// publishing path. // publishing path.
publishers set.Set[publisher] clients set.Set[*Client]
queues set.Set[*Queue]
} }
// New returns a new bus. Use [PublisherOf] to make event publishers, // New returns a new bus. Use [PublisherOf] to make event publishers,
@ -33,17 +32,53 @@ type Bus struct {
func New() *Bus { func New() *Bus {
stopCtl, stopWorker := newGoroutineShutdown() stopCtl, stopWorker := newGoroutineShutdown()
ret := &Bus{ ret := &Bus{
write: make(chan any), write: make(chan any),
stop: stopCtl, stop: stopCtl,
snapshot: make(chan chan []any), snapshot: make(chan chan []any),
topics: map[reflect.Type][]*Queue{}, topics: map[reflect.Type][]*subscribeState{},
publishers: set.Set[publisher]{}, clients: set.Set[*Client]{},
queues: set.Set[*Queue]{},
} }
go ret.pump(stopWorker) go ret.pump(stopWorker)
return ret return ret
} }
// Client returns a new client with no subscriptions. Use [Subscribe]
// to receive events, and [Publish] to emit events.
//
// The client's name is used only for debugging, to tell humans what
// piece of code a publisher/subscriber belongs to. Aim for something
// short but unique, for example "kernel-route-monitor" or "taildrop",
// not "watcher".
func (b *Bus) Client(name string) *Client {
ret := &Client{
name: name,
bus: b,
pub: set.Set[publisher]{},
}
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.clients.Add(ret)
return ret
}
// Close closes the bus. Implicitly closes all clients, publishers and
// subscribers attached to the bus.
//
// Close blocks until the bus is fully shut down. The bus is
// permanently unusable after closing.
func (b *Bus) Close() {
b.stop.StopAndWait()
var clients set.Set[*Client]
b.topicsMu.Lock()
clients, b.clients = b.clients, set.Set[*Client]{}
b.topicsMu.Unlock()
for c := range clients {
c.Close()
}
}
func (b *Bus) pump(stop goroutineShutdownWorker) { func (b *Bus) pump(stop goroutineShutdownWorker) {
defer stop.Done() defer stop.Done()
var vals queue var vals queue
@ -98,13 +133,19 @@ func (b *Bus) pump(stop goroutineShutdownWorker) {
} }
} }
func (b *Bus) dest(t reflect.Type) []*Queue { func (b *Bus) dest(t reflect.Type) []*subscribeState {
b.topicsMu.Lock() b.topicsMu.Lock()
defer b.topicsMu.Unlock() defer b.topicsMu.Unlock()
return b.topics[t] return b.topics[t]
} }
func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) { func (b *Bus) shouldPublish(t reflect.Type) bool {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
return len(b.topics[t]) > 0
}
func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) {
b.topicsMu.Lock() b.topicsMu.Lock()
defer b.topicsMu.Unlock() defer b.topicsMu.Unlock()
b.topics[t] = append(b.topics[t], q) b.topics[t] = append(b.topics[t], q)
@ -113,7 +154,7 @@ func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) {
} }
} }
func (b *Bus) unsubscribe(t reflect.Type, q *Queue) { func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) {
b.topicsMu.Lock() b.topicsMu.Lock()
defer b.topicsMu.Unlock() defer b.topicsMu.Unlock()
// Topic slices are accessed by pump without holding a lock, so we // Topic slices are accessed by pump without holding a lock, so we
@ -127,44 +168,6 @@ func (b *Bus) unsubscribe(t reflect.Type, q *Queue) {
b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1)
} }
func (b *Bus) Close() {
b.stop.StopAndWait()
}
// Queue returns a new queue with no subscriptions. Use [Subscribe] to
// atach subscriptions to it.
//
// The queue's name should be a short, human-readable string that
// identifies this queue. The name is only visible through debugging
// APIs.
func (b *Bus) Queue(name string) *Queue {
return newQueue(b, name)
}
func (b *Bus) addQueue(q *Queue) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.queues.Add(q)
}
func (b *Bus) deleteQueue(q *Queue) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.queues.Delete(q)
}
func (b *Bus) addPublisher(p publisher) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.publishers.Add(p)
}
func (b *Bus) deletePublisher(p publisher) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.publishers.Delete(p)
}
func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) { func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -26,14 +26,16 @@ func TestBus(t *testing.T) {
b := eventbus.New() b := eventbus.New()
defer b.Close() defer b.Close()
q := b.Queue("TestBus") c := b.Client("TestSub")
defer q.Close() defer c.Close()
s := eventbus.Subscribe[EventA](q) s := eventbus.Subscribe[EventA](c)
go func() { go func() {
pa := eventbus.PublisherOf[EventA](b, "TestBusA") p := b.Client("TestPub")
defer p.Close()
pa := eventbus.Publish[EventA](p)
defer pa.Close() defer pa.Close()
pb := eventbus.PublisherOf[EventB](b, "TestBusB") pb := eventbus.Publish[EventB](p)
defer pb.Close() defer pb.Close()
pa.Publish(EventA{1}) pa.Publish(EventA{1})
pb.Publish(EventB{2}) pb.Publish(EventB{2})
@ -45,7 +47,7 @@ func TestBus(t *testing.T) {
select { select {
case got := <-s.Events(): case got := <-s.Events():
want.Got(got) want.Got(got)
case <-q.Done(): case <-s.Done():
t.Fatalf("queue closed unexpectedly") t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second): case <-time.After(time.Second):
t.Fatalf("timed out waiting for event") t.Fatalf("timed out waiting for event")
@ -57,19 +59,21 @@ func TestBusMultipleConsumers(t *testing.T) {
b := eventbus.New() b := eventbus.New()
defer b.Close() defer b.Close()
q1 := b.Queue("TestBusA") c1 := b.Client("TestSubA")
defer q1.Close() defer c1.Close()
s1 := eventbus.Subscribe[EventA](q1) s1 := eventbus.Subscribe[EventA](c1)
q2 := b.Queue("TestBusAB") c2 := b.Client("TestSubB")
defer q2.Close() defer c2.Close()
s2A := eventbus.Subscribe[EventA](q2) s2A := eventbus.Subscribe[EventA](c2)
s2B := eventbus.Subscribe[EventB](q2) s2B := eventbus.Subscribe[EventB](c2)
go func() { go func() {
pa := eventbus.PublisherOf[EventA](b, "TestBusA") p := b.Client("TestPub")
defer p.Close()
pa := eventbus.Publish[EventA](p)
defer pa.Close() defer pa.Close()
pb := eventbus.PublisherOf[EventB](b, "TestBusB") pb := eventbus.Publish[EventB](p)
defer pb.Close() defer pb.Close()
pa.Publish(EventA{1}) pa.Publish(EventA{1})
pb.Publish(EventB{2}) pb.Publish(EventB{2})
@ -86,9 +90,11 @@ func TestBusMultipleConsumers(t *testing.T) {
wantB.Got(got) wantB.Got(got)
case got := <-s2B.Events(): case got := <-s2B.Events():
wantB.Got(got) wantB.Got(got)
case <-q1.Done(): case <-s1.Done():
t.Fatalf("queue closed unexpectedly") t.Fatalf("queue closed unexpectedly")
case <-q2.Done(): case <-s2A.Done():
t.Fatalf("queue closed unexpectedly")
case <-s2B.Done():
t.Fatalf("queue closed unexpectedly") t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second): case <-time.After(time.Second):
t.Fatalf("timed out waiting for event") t.Fatalf("timed out waiting for event")
@ -111,15 +117,15 @@ func TestSpam(t *testing.T) {
received := make([][]EventA, subscribers) received := make([][]EventA, subscribers)
for i := range subscribers { for i := range subscribers {
q := b.Queue(fmt.Sprintf("Subscriber%d", i)) c := b.Client(fmt.Sprintf("Subscriber%d", i))
defer q.Close() defer c.Close()
s := eventbus.Subscribe[EventA](q) s := eventbus.Subscribe[EventA](c)
g.Go(func() error { g.Go(func() error {
for range wantEvents { for range wantEvents {
select { select {
case evt := <-s.Events(): case evt := <-s.Events():
received[i] = append(received[i], evt) received[i] = append(received[i], evt)
case <-q.Done(): case <-s.Done():
t.Errorf("queue done before expected number of events received") t.Errorf("queue done before expected number of events received")
return errors.New("queue prematurely closed") return errors.New("queue prematurely closed")
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
@ -134,7 +140,8 @@ func TestSpam(t *testing.T) {
published := make([][]EventA, publishers) published := make([][]EventA, publishers)
for i := range publishers { for i := range publishers {
g.Run(func() { g.Run(func() {
p := eventbus.PublisherOf[EventA](b, fmt.Sprintf("Publisher%d", i)) c := b.Client(fmt.Sprintf("Publisher%d", i))
p := eventbus.Publish[EventA](c)
for j := range eventsPerPublisher { for j := range eventsPerPublisher {
evt := EventA{i*eventsPerPublisher + j} evt := EventA{i*eventsPerPublisher + j}
p.Publish(evt) p.Publish(evt)

100
util/eventbus/client.go Normal file
View File

@ -0,0 +1,100 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"reflect"
"sync"
"tailscale.com/util/set"
)
// A Client can publish and subscribe to events on its attached
// bus. See [Publish] to publish events, and [Subscribe] to receive
// events.
//
// Subscribers that share the same client receive events one at a
// time, in the order they were published.
type Client struct {
name string
bus *Bus
mu sync.Mutex
pub set.Set[publisher]
sub *subscribeState // Lazily created on first subscribe
}
// Close closes the client. Implicitly closes all publishers and
// subscribers obtained from this client.
func (c *Client) Close() {
var (
pub set.Set[publisher]
sub *subscribeState
)
c.mu.Lock()
pub, c.pub = c.pub, nil
sub, c.sub = c.sub, nil
c.mu.Unlock()
if sub != nil {
sub.close()
}
for p := range pub {
p.Close()
}
}
func (c *Client) subscribeState() *subscribeState {
c.mu.Lock()
defer c.mu.Unlock()
if c.sub == nil {
c.sub = newSubscribeState(c)
}
return c.sub
}
func (c *Client) addPublisher(pub publisher) {
c.mu.Lock()
defer c.mu.Unlock()
c.pub.Add(pub)
}
func (c *Client) deletePublisher(pub publisher) {
c.mu.Lock()
defer c.mu.Unlock()
c.pub.Delete(pub)
}
func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) {
c.bus.subscribe(t, s)
}
func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
c.bus.unsubscribe(t, s)
}
func (c *Client) publish() chan<- any {
return c.bus.write
}
func (c *Client) shouldPublish(t reflect.Type) bool {
return c.bus.shouldPublish(t)
}
// Subscribe requests delivery of events of type T through the given
// Queue. Panics if the queue already has a subscriber for T.
func Subscribe[T any](c *Client) *Subscriber[T] {
return newSubscriber[T](c.subscribeState())
}
// Publisher returns a publisher for event type T using the given
// client.
func Publish[T any](c *Client) *Publisher[T] {
ret := newPublisher[T](c)
c.mu.Lock()
defer c.mu.Unlock()
c.pub.Add(ret)
return ret
}

View File

@ -3,56 +3,59 @@
// Package eventbus provides an in-process event bus. // Package eventbus provides an in-process event bus.
// //
// The event bus connects publishers of typed events with subscribers // An event bus connects publishers of typed events with subscribers
// interested in those events. // interested in those events. Typically, there is one global event
// bus per process.
// //
// # Usage // # Usage
// //
// To publish events, use [PublisherOf] to get a typed publisher for // To send or receive events, first use [Bus.Client] to register with
// your event type, then call [Publisher.Publish] as needed. If your // the bus. Clients should register with a human-readable name that
// event is expensive to construct, you can optionally use // identifies the code using the client, to aid in debugging.
// [Publisher.ShouldPublish] to skip the work if nobody is listening
// for the event.
// //
// To receive events, first use [Bus.Queue] to create an event // To publish events, use [Publish] on a Client to get a typed
// delivery queue, then use [Subscribe] to get a [Subscriber] for each // publisher for your event type, then call [Publisher.Publish] as
// event type you're interested in. Receive the events themselves by // needed. If your event is expensive to construct, you can optionally
// selecting over all your [Subscriber.Chan] channels, as well as // use [Publisher.ShouldPublish] to skip the work if nobody is
// [Queue.Done] for shutdown notifications. // listening for the event.
//
// To receive events, use [Subscribe] to get a typed subscriber for
// each event type you're interested in. Receive the events themselves
// by selecting over all your [Subscriber.Events] channels, as well as
// [Subscriber.Done] for shutdown notifications.
// //
// # Concurrency properties // # Concurrency properties
// //
// The bus serializes all published events, and preserves that // The bus serializes all published events across all publishers, and
// ordering when delivering to subscribers that are attached to the // preserves that ordering when delivering to subscribers that are
// same Queue. In more detail: // attached to the same Client. In more detail:
// //
// - An event is published to the bus at some instant between the // - An event is published to the bus at some instant between the
// start and end of the call to [Publisher.Publish]. // start and end of the call to [Publisher.Publish].
// - Events cannot be published at the same instant, and so are // - Two events cannot be published at the same instant, and so are
// totally ordered by their publication time. Given two events E1 // totally ordered by their publication time. Given two events E1
// and E2, either E1 happens before E2, or E2 happens before E1. // and E2, either E1 happens before E2, or E2 happens before E1.
// - Queues dispatch events to their Subscribers in publication // - Clients dispatch events to their Subscribers in publication
// order: if E1 happens before E2, the queue always delivers E1 // order: if E1 happens before E2, the client always delivers E1
// before E2. // before E2.
// - Queues do not synchronize with each other: given queues Q1 and // - Clients do not synchronize subscriptions with each other: given
// Q2, both subscribed to events E1 and E2, Q1 may deliver both E1 // clients C1 and C2, both subscribed to events E1 and E2, C1 may
// and E2 before Q2 delivers E1. // deliver both E1 and E2 before C2 delivers E1.
// //
// Less formally: there is one true timeline of all published events. // Less formally: there is one true timeline of all published events.
// If you make a Queue and subscribe to events on it, you will receive // If you make a Client and subscribe to events, you will receive
// those events one at a time, in the same order as the one true // events one at a time, in the same order as the one true
// timeline. You will "skip over" events you didn't subscribe to, but // timeline. You will "skip over" events you didn't subscribe to, but
// your view of the world always moves forward in time, never // your view of the world always moves forward in time, never
// backwards, and you will observe events in the same order as // backwards, and you will observe events in the same order as
// everyone else. // everyone else.
// //
// However, you cannot assume that what your subscribers on your queue // However, you cannot assume that what your client see as "now" is
// see as "now" is the same as what other subscribers on other // the same as what other clients. They may be further behind you in
// queues. Their queue may be further behind you in the timeline, or // working through the timeline, or running ahead of you. This means
// running ahead of you. This means you should be careful about // you should be careful about reaching out to another component
// reaching out to another component directly after receiving an // directly after receiving an event, as its view of the world may not
// event, as its view of the world may not yet (or ever) be exactly // yet (or ever) be exactly consistent with yours.
// consistent with yours.
// //
// To make your code more testable and understandable, you should try // To make your code more testable and understandable, you should try
// to structure it following the actor model: you have some local // to structure it following the actor model: you have some local
@ -63,7 +66,7 @@
// # Expected subscriber behavior // # Expected subscriber behavior
// //
// Subscribers are expected to promptly receive their events on // Subscribers are expected to promptly receive their events on
// [Subscriber.Chan]. The bus has a small, fixed amount of internal // [Subscriber.Events]. The bus has a small, fixed amount of internal
// buffering, meaning that a slow subscriber will eventually cause // buffering, meaning that a slow subscriber will eventually cause
// backpressure and block publication of all further events. // backpressure and block publication of all further events.
// //

View File

@ -11,35 +11,41 @@ import (
// publisher is a uniformly typed wrapper around Publisher[T], so that // publisher is a uniformly typed wrapper around Publisher[T], so that
// debugging facilities can look at active publishers. // debugging facilities can look at active publishers.
type publisher interface { type publisher interface {
publisherName() string publishType() reflect.Type
Close()
} }
// A Publisher publishes events on the bus. // A Publisher publishes typed events on a bus.
type Publisher[T any] struct { type Publisher[T any] struct {
bus *Bus client *Client
name string
stopCtx context.Context stopCtx context.Context
stop context.CancelFunc stop context.CancelFunc
} }
// PublisherOf returns a publisher for event type T on the given bus. func newPublisher[T any](c *Client) *Publisher[T] {
//
// The publisher's name should be a short, human-readable string that
// identifies this event publisher. The name is only visible through
// debugging APIs.
func PublisherOf[T any](b *Bus, name string) *Publisher[T] {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ret := &Publisher[T]{ ret := &Publisher[T]{
bus: b, client: c,
name: name,
stopCtx: ctx, stopCtx: ctx,
stop: cancel, stop: cancel,
} }
b.addPublisher(ret) c.addPublisher(ret)
return ret return ret
} }
func (p *Publisher[T]) publisherName() string { return p.name } // Close closes the publisher.
//
// Calls to Publish after Close silently do nothing.
func (p *Publisher[T]) Close() {
// Just unblocks any active calls to Publish, no other
// synchronization needed.
p.stop()
p.client.deletePublisher(p)
}
func (p *Publisher[T]) publishType() reflect.Type {
return reflect.TypeFor[T]()
}
// Publish publishes event v on the bus. // Publish publishes event v on the bus.
func (p *Publisher[T]) Publish(v T) { func (p *Publisher[T]) Publish(v T) {
@ -48,32 +54,21 @@ func (p *Publisher[T]) Publish(v T) {
select { select {
case <-p.stopCtx.Done(): case <-p.stopCtx.Done():
return return
case <-p.bus.stop.WaitChan():
return
default: default:
} }
select { select {
case p.bus.write <- v: case p.client.publish() <- v:
case <-p.stopCtx.Done(): case <-p.stopCtx.Done():
case <-p.bus.stop.WaitChan():
} }
} }
// ShouldPublish reports whether anyone is subscribed to events of // ShouldPublish reports whether anyone is subscribed to the events
// type T. // that this publisher emits.
// //
// ShouldPublish can be used to skip expensive event construction if // ShouldPublish can be used to skip expensive event construction if
// nobody seems to care. Publishers must not assume that someone will // nobody seems to care. Publishers must not assume that someone will
// definitely receive an event if ShouldPublish returns true. // definitely receive an event if ShouldPublish returns true.
func (p *Publisher[T]) ShouldPublish() bool { func (p *Publisher[T]) ShouldPublish() bool {
dests := p.bus.dest(reflect.TypeFor[T]()) return p.client.shouldPublish(reflect.TypeFor[T]())
return len(dests) > 0
}
// Close closes the publisher, indicating that no further events will
// be published with it.
func (p *Publisher[T]) Close() {
p.stop()
p.bus.deletePublisher(p)
} }

View File

@ -4,46 +4,59 @@
package eventbus package eventbus
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
) )
type dispatchFn func(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool // subscriber is a uniformly typed wrapper around Subscriber[T], so
// that debugging facilities can look at active subscribers.
type subscriber interface {
subscribeType() reflect.Type
// dispatch is a function that dispatches the head value in vals to
// a subscriber, while also handling stop and incoming queue write
// events.
//
// dispatch exists because of the strongly typed Subscriber[T]
// wrapper around subscriptions: within the bus events are boxed in an
// 'any', and need to be unpacked to their full type before delivery
// to the subscriber. This involves writing to a strongly-typed
// channel, so subscribeState cannot handle that dispatch by itself -
// but if that strongly typed send blocks, we also need to keep
// processing other potential sources of wakeups, which is how we end
// up at this awkward type signature and sharing of internal state
// through dispatch.
dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool
Close()
}
// A Queue receives events from a Bus. // subscribeState handles dispatching of events received from a Bus.
// type subscribeState struct {
// To receive events through the queue, see [Subscribe]. Subscribers client *Client
// that share the same Queue receive events one at time, in the order
// they were published.
type Queue struct {
bus *Bus
name string
write chan any write chan any
stop goroutineShutdownControl stop goroutineShutdownControl
snapshot chan chan []any snapshot chan chan []any
outputsMu sync.Mutex outputsMu sync.Mutex
outputs map[reflect.Type]dispatchFn outputs map[reflect.Type]subscriber
} }
func newQueue(b *Bus, name string) *Queue { func newSubscribeState(c *Client) *subscribeState {
stopCtl, stopWorker := newGoroutineShutdown() stopCtl, stopWorker := newGoroutineShutdown()
ret := &Queue{ ret := &subscribeState{
bus: b, client: c,
name: name,
write: make(chan any), write: make(chan any),
stop: stopCtl, stop: stopCtl,
snapshot: make(chan chan []any), snapshot: make(chan chan []any),
outputs: map[reflect.Type]dispatchFn{}, outputs: map[reflect.Type]subscriber{},
} }
b.addQueue(ret)
go ret.pump(stopWorker) go ret.pump(stopWorker)
return ret return ret
} }
func (q *Queue) pump(stop goroutineShutdownWorker) { func (q *subscribeState) pump(stop goroutineShutdownWorker) {
defer stop.Done() defer stop.Done()
var vals queue var vals queue
acceptCh := func() chan any { acceptCh := func() chan any {
@ -55,13 +68,13 @@ func (q *Queue) pump(stop goroutineShutdownWorker) {
for { for {
if !vals.Empty() { if !vals.Empty() {
val := vals.Peek() val := vals.Peek()
fn := q.dispatchFn(val) sub := q.subscriberFor(val)
if fn == nil { if sub == nil {
// Raced with unsubscribe. // Raced with unsubscribe.
vals.Drop() vals.Drop()
continue continue
} }
if !fn(&vals, stop, acceptCh) { if !sub.dispatch(&vals, stop, acceptCh) {
return return
} }
} else { } else {
@ -81,16 +94,74 @@ func (q *Queue) pump(stop goroutineShutdownWorker) {
} }
} }
// A Subscriber delivers one type of event from a [Queue]. func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) {
s.outputsMu.Lock()
defer s.outputsMu.Unlock()
if s.outputs[t] != nil {
panic(fmt.Errorf("double subscription for event %s", t))
}
s.outputs[t] = sub
s.client.addSubscriber(t, s)
}
func (s *subscribeState) deleteSubscriber(t reflect.Type) {
s.outputsMu.Lock()
defer s.outputsMu.Unlock()
delete(s.outputs, t)
s.client.deleteSubscriber(t, s)
}
func (q *subscribeState) subscriberFor(val any) subscriber {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
return q.outputs[reflect.TypeOf(val)]
}
// Close closes the subscribeState. Implicitly closes all Subscribers
// linked to this state, and any pending events are discarded.
func (s *subscribeState) close() {
s.stop.StopAndWait()
var subs map[reflect.Type]subscriber
s.outputsMu.Lock()
subs, s.outputs = s.outputs, nil
s.outputsMu.Unlock()
for _, sub := range subs {
sub.Close()
}
}
// A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct { type Subscriber[T any] struct {
recv *Queue doneCtx context.Context
read chan T done context.CancelFunc
recv *subscribeState
read chan T
}
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
t := reflect.TypeFor[T]()
ctx, cancel := context.WithCancel(context.Background())
ret := &Subscriber[T]{
doneCtx: ctx,
done: cancel,
recv: r,
read: make(chan T),
}
r.addSubscriber(t, ret)
return ret
}
func (s *Subscriber[T]) subscribeType() reflect.Type {
return reflect.TypeFor[T]()
} }
func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool { func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool {
t := vals.Peek().(T) t := vals.Peek().(T)
for { for {
// Keep the cases in this select in sync with Queue.pump // Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select // above. The only different should be that this select
// delivers a value on s.read. // delivers a value on s.read.
select { select {
@ -113,58 +184,16 @@ func (s *Subscriber[T]) Events() <-chan T {
return s.read return s.read
} }
// Close shuts down the Subscriber, indicating the caller no longer // Done returns a channel that is closed when the subscriber is
// wishes to receive these events. After Close, receives on // closed.
// [Subscriber.Chan] block for ever. func (s *Subscriber[T]) Done() <-chan struct{} {
return s.doneCtx.Done()
}
// Close closes the Subscriber, indicating the caller no longer wishes
// to receive this event type. After Close, receives on
// [Subscriber.Events] block for ever.
func (s *Subscriber[T]) Close() { func (s *Subscriber[T]) Close() {
t := reflect.TypeFor[T]() s.done() // unblock receivers
s.recv.bus.unsubscribe(t, s.recv) s.recv.deleteSubscriber(reflect.TypeFor[T]())
s.recv.deleteDispatchFn(t)
}
func (q *Queue) dispatchFn(val any) dispatchFn {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
return q.outputs[reflect.ValueOf(val).Type()]
}
func (q *Queue) addDispatchFn(t reflect.Type, fn dispatchFn) {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
if q.outputs[t] != nil {
panic(fmt.Errorf("double subscription for event %s", t))
}
q.outputs[t] = fn
}
func (q *Queue) deleteDispatchFn(t reflect.Type) {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
delete(q.outputs, t)
}
// Done returns a channel that is closed when the Queue is closed.
func (q *Queue) Done() <-chan struct{} {
return q.stop.WaitChan()
}
// Close closes the queue. All Subscribers attached to the queue are
// implicitly closed, and any pending events are discarded.
func (q *Queue) Close() {
q.stop.StopAndWait()
q.bus.deleteQueue(q)
}
// Subscribe requests delivery of events of type T through the given
// Queue. Panics if the queue already has a subscriber for T.
func Subscribe[T any](r *Queue) Subscriber[T] {
t := reflect.TypeFor[T]()
ret := Subscriber[T]{
recv: r,
read: make(chan T),
}
r.addDispatchFn(t, ret.dispatch)
r.bus.subscribe(t, r)
return ret
} }