diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 85d73b15e..393596d75 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -20,12 +20,11 @@ type Bus struct { snapshot chan chan []any topicsMu sync.Mutex // guards everything below. - topics map[reflect.Type][]*Queue + topics map[reflect.Type][]*subscribeState // Used for introspection/debugging only, not in the normal event // publishing path. - publishers set.Set[publisher] - queues set.Set[*Queue] + clients set.Set[*Client] } // New returns a new bus. Use [PublisherOf] to make event publishers, @@ -33,17 +32,53 @@ type Bus struct { func New() *Bus { stopCtl, stopWorker := newGoroutineShutdown() ret := &Bus{ - write: make(chan any), - stop: stopCtl, - snapshot: make(chan chan []any), - topics: map[reflect.Type][]*Queue{}, - publishers: set.Set[publisher]{}, - queues: set.Set[*Queue]{}, + write: make(chan any), + stop: stopCtl, + snapshot: make(chan chan []any), + topics: map[reflect.Type][]*subscribeState{}, + clients: set.Set[*Client]{}, } go ret.pump(stopWorker) return ret } +// Client returns a new client with no subscriptions. Use [Subscribe] +// to receive events, and [Publish] to emit events. +// +// The client's name is used only for debugging, to tell humans what +// piece of code a publisher/subscriber belongs to. Aim for something +// short but unique, for example "kernel-route-monitor" or "taildrop", +// not "watcher". +func (b *Bus) Client(name string) *Client { + ret := &Client{ + name: name, + bus: b, + pub: set.Set[publisher]{}, + } + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.clients.Add(ret) + return ret +} + +// Close closes the bus. Implicitly closes all clients, publishers and +// subscribers attached to the bus. +// +// Close blocks until the bus is fully shut down. The bus is +// permanently unusable after closing. +func (b *Bus) Close() { + b.stop.StopAndWait() + + var clients set.Set[*Client] + b.topicsMu.Lock() + clients, b.clients = b.clients, set.Set[*Client]{} + b.topicsMu.Unlock() + + for c := range clients { + c.Close() + } +} + func (b *Bus) pump(stop goroutineShutdownWorker) { defer stop.Done() var vals queue @@ -98,13 +133,19 @@ func (b *Bus) pump(stop goroutineShutdownWorker) { } } -func (b *Bus) dest(t reflect.Type) []*Queue { +func (b *Bus) dest(t reflect.Type) []*subscribeState { b.topicsMu.Lock() defer b.topicsMu.Unlock() return b.topics[t] } -func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) { +func (b *Bus) shouldPublish(t reflect.Type) bool { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + return len(b.topics[t]) > 0 +} + +func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) { b.topicsMu.Lock() defer b.topicsMu.Unlock() b.topics[t] = append(b.topics[t], q) @@ -113,7 +154,7 @@ func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) { } } -func (b *Bus) unsubscribe(t reflect.Type, q *Queue) { +func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) { b.topicsMu.Lock() defer b.topicsMu.Unlock() // Topic slices are accessed by pump without holding a lock, so we @@ -127,44 +168,6 @@ func (b *Bus) unsubscribe(t reflect.Type, q *Queue) { b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) } -func (b *Bus) Close() { - b.stop.StopAndWait() -} - -// Queue returns a new queue with no subscriptions. Use [Subscribe] to -// atach subscriptions to it. -// -// The queue's name should be a short, human-readable string that -// identifies this queue. The name is only visible through debugging -// APIs. -func (b *Bus) Queue(name string) *Queue { - return newQueue(b, name) -} - -func (b *Bus) addQueue(q *Queue) { - b.topicsMu.Lock() - defer b.topicsMu.Unlock() - b.queues.Add(q) -} - -func (b *Bus) deleteQueue(q *Queue) { - b.topicsMu.Lock() - defer b.topicsMu.Unlock() - b.queues.Delete(q) -} - -func (b *Bus) addPublisher(p publisher) { - b.topicsMu.Lock() - defer b.topicsMu.Unlock() - b.publishers.Add(p) -} - -func (b *Bus) deletePublisher(p publisher) { - b.topicsMu.Lock() - defer b.topicsMu.Unlock() - b.publishers.Delete(p) -} - func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 180f4164a..e159b6a12 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -26,14 +26,16 @@ func TestBus(t *testing.T) { b := eventbus.New() defer b.Close() - q := b.Queue("TestBus") - defer q.Close() - s := eventbus.Subscribe[EventA](q) + c := b.Client("TestSub") + defer c.Close() + s := eventbus.Subscribe[EventA](c) go func() { - pa := eventbus.PublisherOf[EventA](b, "TestBusA") + p := b.Client("TestPub") + defer p.Close() + pa := eventbus.Publish[EventA](p) defer pa.Close() - pb := eventbus.PublisherOf[EventB](b, "TestBusB") + pb := eventbus.Publish[EventB](p) defer pb.Close() pa.Publish(EventA{1}) pb.Publish(EventB{2}) @@ -45,7 +47,7 @@ func TestBus(t *testing.T) { select { case got := <-s.Events(): want.Got(got) - case <-q.Done(): + case <-s.Done(): t.Fatalf("queue closed unexpectedly") case <-time.After(time.Second): t.Fatalf("timed out waiting for event") @@ -57,19 +59,21 @@ func TestBusMultipleConsumers(t *testing.T) { b := eventbus.New() defer b.Close() - q1 := b.Queue("TestBusA") - defer q1.Close() - s1 := eventbus.Subscribe[EventA](q1) + c1 := b.Client("TestSubA") + defer c1.Close() + s1 := eventbus.Subscribe[EventA](c1) - q2 := b.Queue("TestBusAB") - defer q2.Close() - s2A := eventbus.Subscribe[EventA](q2) - s2B := eventbus.Subscribe[EventB](q2) + c2 := b.Client("TestSubB") + defer c2.Close() + s2A := eventbus.Subscribe[EventA](c2) + s2B := eventbus.Subscribe[EventB](c2) go func() { - pa := eventbus.PublisherOf[EventA](b, "TestBusA") + p := b.Client("TestPub") + defer p.Close() + pa := eventbus.Publish[EventA](p) defer pa.Close() - pb := eventbus.PublisherOf[EventB](b, "TestBusB") + pb := eventbus.Publish[EventB](p) defer pb.Close() pa.Publish(EventA{1}) pb.Publish(EventB{2}) @@ -86,9 +90,11 @@ func TestBusMultipleConsumers(t *testing.T) { wantB.Got(got) case got := <-s2B.Events(): wantB.Got(got) - case <-q1.Done(): + case <-s1.Done(): t.Fatalf("queue closed unexpectedly") - case <-q2.Done(): + case <-s2A.Done(): + t.Fatalf("queue closed unexpectedly") + case <-s2B.Done(): t.Fatalf("queue closed unexpectedly") case <-time.After(time.Second): t.Fatalf("timed out waiting for event") @@ -111,15 +117,15 @@ func TestSpam(t *testing.T) { received := make([][]EventA, subscribers) for i := range subscribers { - q := b.Queue(fmt.Sprintf("Subscriber%d", i)) - defer q.Close() - s := eventbus.Subscribe[EventA](q) + c := b.Client(fmt.Sprintf("Subscriber%d", i)) + defer c.Close() + s := eventbus.Subscribe[EventA](c) g.Go(func() error { for range wantEvents { select { case evt := <-s.Events(): received[i] = append(received[i], evt) - case <-q.Done(): + case <-s.Done(): t.Errorf("queue done before expected number of events received") return errors.New("queue prematurely closed") case <-time.After(5 * time.Second): @@ -134,7 +140,8 @@ func TestSpam(t *testing.T) { published := make([][]EventA, publishers) for i := range publishers { g.Run(func() { - p := eventbus.PublisherOf[EventA](b, fmt.Sprintf("Publisher%d", i)) + c := b.Client(fmt.Sprintf("Publisher%d", i)) + p := eventbus.Publish[EventA](c) for j := range eventsPerPublisher { evt := EventA{i*eventsPerPublisher + j} p.Publish(evt) diff --git a/util/eventbus/client.go b/util/eventbus/client.go new file mode 100644 index 000000000..ff8eea6ee --- /dev/null +++ b/util/eventbus/client.go @@ -0,0 +1,100 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "reflect" + "sync" + + "tailscale.com/util/set" +) + +// A Client can publish and subscribe to events on its attached +// bus. See [Publish] to publish events, and [Subscribe] to receive +// events. +// +// Subscribers that share the same client receive events one at a +// time, in the order they were published. +type Client struct { + name string + bus *Bus + + mu sync.Mutex + pub set.Set[publisher] + sub *subscribeState // Lazily created on first subscribe +} + +// Close closes the client. Implicitly closes all publishers and +// subscribers obtained from this client. +func (c *Client) Close() { + var ( + pub set.Set[publisher] + sub *subscribeState + ) + + c.mu.Lock() + pub, c.pub = c.pub, nil + sub, c.sub = c.sub, nil + c.mu.Unlock() + + if sub != nil { + sub.close() + } + for p := range pub { + p.Close() + } +} + +func (c *Client) subscribeState() *subscribeState { + c.mu.Lock() + defer c.mu.Unlock() + if c.sub == nil { + c.sub = newSubscribeState(c) + } + return c.sub +} + +func (c *Client) addPublisher(pub publisher) { + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Add(pub) +} + +func (c *Client) deletePublisher(pub publisher) { + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Delete(pub) +} + +func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) { + c.bus.subscribe(t, s) +} + +func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) { + c.bus.unsubscribe(t, s) +} + +func (c *Client) publish() chan<- any { + return c.bus.write +} + +func (c *Client) shouldPublish(t reflect.Type) bool { + return c.bus.shouldPublish(t) +} + +// Subscribe requests delivery of events of type T through the given +// Queue. Panics if the queue already has a subscriber for T. +func Subscribe[T any](c *Client) *Subscriber[T] { + return newSubscriber[T](c.subscribeState()) +} + +// Publisher returns a publisher for event type T using the given +// client. +func Publish[T any](c *Client) *Publisher[T] { + ret := newPublisher[T](c) + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Add(ret) + return ret +} diff --git a/util/eventbus/doc.go b/util/eventbus/doc.go index 136823c42..b3509b48b 100644 --- a/util/eventbus/doc.go +++ b/util/eventbus/doc.go @@ -3,56 +3,59 @@ // Package eventbus provides an in-process event bus. // -// The event bus connects publishers of typed events with subscribers -// interested in those events. +// An event bus connects publishers of typed events with subscribers +// interested in those events. Typically, there is one global event +// bus per process. // // # Usage // -// To publish events, use [PublisherOf] to get a typed publisher for -// your event type, then call [Publisher.Publish] as needed. If your -// event is expensive to construct, you can optionally use -// [Publisher.ShouldPublish] to skip the work if nobody is listening -// for the event. +// To send or receive events, first use [Bus.Client] to register with +// the bus. Clients should register with a human-readable name that +// identifies the code using the client, to aid in debugging. // -// To receive events, first use [Bus.Queue] to create an event -// delivery queue, then use [Subscribe] to get a [Subscriber] for each -// event type you're interested in. Receive the events themselves by -// selecting over all your [Subscriber.Chan] channels, as well as -// [Queue.Done] for shutdown notifications. +// To publish events, use [Publish] on a Client to get a typed +// publisher for your event type, then call [Publisher.Publish] as +// needed. If your event is expensive to construct, you can optionally +// use [Publisher.ShouldPublish] to skip the work if nobody is +// listening for the event. +// +// To receive events, use [Subscribe] to get a typed subscriber for +// each event type you're interested in. Receive the events themselves +// by selecting over all your [Subscriber.Events] channels, as well as +// [Subscriber.Done] for shutdown notifications. // // # Concurrency properties // -// The bus serializes all published events, and preserves that -// ordering when delivering to subscribers that are attached to the -// same Queue. In more detail: +// The bus serializes all published events across all publishers, and +// preserves that ordering when delivering to subscribers that are +// attached to the same Client. In more detail: // // - An event is published to the bus at some instant between the // start and end of the call to [Publisher.Publish]. -// - Events cannot be published at the same instant, and so are +// - Two events cannot be published at the same instant, and so are // totally ordered by their publication time. Given two events E1 // and E2, either E1 happens before E2, or E2 happens before E1. -// - Queues dispatch events to their Subscribers in publication -// order: if E1 happens before E2, the queue always delivers E1 +// - Clients dispatch events to their Subscribers in publication +// order: if E1 happens before E2, the client always delivers E1 // before E2. -// - Queues do not synchronize with each other: given queues Q1 and -// Q2, both subscribed to events E1 and E2, Q1 may deliver both E1 -// and E2 before Q2 delivers E1. +// - Clients do not synchronize subscriptions with each other: given +// clients C1 and C2, both subscribed to events E1 and E2, C1 may +// deliver both E1 and E2 before C2 delivers E1. // // Less formally: there is one true timeline of all published events. -// If you make a Queue and subscribe to events on it, you will receive -// those events one at a time, in the same order as the one true +// If you make a Client and subscribe to events, you will receive +// events one at a time, in the same order as the one true // timeline. You will "skip over" events you didn't subscribe to, but // your view of the world always moves forward in time, never // backwards, and you will observe events in the same order as // everyone else. // -// However, you cannot assume that what your subscribers on your queue -// see as "now" is the same as what other subscribers on other -// queues. Their queue may be further behind you in the timeline, or -// running ahead of you. This means you should be careful about -// reaching out to another component directly after receiving an -// event, as its view of the world may not yet (or ever) be exactly -// consistent with yours. +// However, you cannot assume that what your client see as "now" is +// the same as what other clients. They may be further behind you in +// working through the timeline, or running ahead of you. This means +// you should be careful about reaching out to another component +// directly after receiving an event, as its view of the world may not +// yet (or ever) be exactly consistent with yours. // // To make your code more testable and understandable, you should try // to structure it following the actor model: you have some local @@ -63,7 +66,7 @@ // # Expected subscriber behavior // // Subscribers are expected to promptly receive their events on -// [Subscriber.Chan]. The bus has a small, fixed amount of internal +// [Subscriber.Events]. The bus has a small, fixed amount of internal // buffering, meaning that a slow subscriber will eventually cause // backpressure and block publication of all further events. // diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 14828812b..19ddc1256 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -11,35 +11,41 @@ import ( // publisher is a uniformly typed wrapper around Publisher[T], so that // debugging facilities can look at active publishers. type publisher interface { - publisherName() string + publishType() reflect.Type + Close() } -// A Publisher publishes events on the bus. +// A Publisher publishes typed events on a bus. type Publisher[T any] struct { - bus *Bus - name string + client *Client stopCtx context.Context stop context.CancelFunc } -// PublisherOf returns a publisher for event type T on the given bus. -// -// The publisher's name should be a short, human-readable string that -// identifies this event publisher. The name is only visible through -// debugging APIs. -func PublisherOf[T any](b *Bus, name string) *Publisher[T] { +func newPublisher[T any](c *Client) *Publisher[T] { ctx, cancel := context.WithCancel(context.Background()) ret := &Publisher[T]{ - bus: b, - name: name, + client: c, stopCtx: ctx, stop: cancel, } - b.addPublisher(ret) + c.addPublisher(ret) return ret } -func (p *Publisher[T]) publisherName() string { return p.name } +// Close closes the publisher. +// +// Calls to Publish after Close silently do nothing. +func (p *Publisher[T]) Close() { + // Just unblocks any active calls to Publish, no other + // synchronization needed. + p.stop() + p.client.deletePublisher(p) +} + +func (p *Publisher[T]) publishType() reflect.Type { + return reflect.TypeFor[T]() +} // Publish publishes event v on the bus. func (p *Publisher[T]) Publish(v T) { @@ -48,32 +54,21 @@ func (p *Publisher[T]) Publish(v T) { select { case <-p.stopCtx.Done(): return - case <-p.bus.stop.WaitChan(): - return default: } select { - case p.bus.write <- v: + case p.client.publish() <- v: case <-p.stopCtx.Done(): - case <-p.bus.stop.WaitChan(): } } -// ShouldPublish reports whether anyone is subscribed to events of -// type T. +// ShouldPublish reports whether anyone is subscribed to the events +// that this publisher emits. // // ShouldPublish can be used to skip expensive event construction if // nobody seems to care. Publishers must not assume that someone will // definitely receive an event if ShouldPublish returns true. func (p *Publisher[T]) ShouldPublish() bool { - dests := p.bus.dest(reflect.TypeFor[T]()) - return len(dests) > 0 -} - -// Close closes the publisher, indicating that no further events will -// be published with it. -func (p *Publisher[T]) Close() { - p.stop() - p.bus.deletePublisher(p) + return p.client.shouldPublish(reflect.TypeFor[T]()) } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ade834d77..896f0ce1f 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -4,46 +4,59 @@ package eventbus import ( + "context" "fmt" "reflect" "sync" ) -type dispatchFn func(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool +// subscriber is a uniformly typed wrapper around Subscriber[T], so +// that debugging facilities can look at active subscribers. +type subscriber interface { + subscribeType() reflect.Type + // dispatch is a function that dispatches the head value in vals to + // a subscriber, while also handling stop and incoming queue write + // events. + // + // dispatch exists because of the strongly typed Subscriber[T] + // wrapper around subscriptions: within the bus events are boxed in an + // 'any', and need to be unpacked to their full type before delivery + // to the subscriber. This involves writing to a strongly-typed + // channel, so subscribeState cannot handle that dispatch by itself - + // but if that strongly typed send blocks, we also need to keep + // processing other potential sources of wakeups, which is how we end + // up at this awkward type signature and sharing of internal state + // through dispatch. + dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool + Close() +} -// A Queue receives events from a Bus. -// -// To receive events through the queue, see [Subscribe]. Subscribers -// that share the same Queue receive events one at time, in the order -// they were published. -type Queue struct { - bus *Bus - name string +// subscribeState handles dispatching of events received from a Bus. +type subscribeState struct { + client *Client write chan any stop goroutineShutdownControl snapshot chan chan []any outputsMu sync.Mutex - outputs map[reflect.Type]dispatchFn + outputs map[reflect.Type]subscriber } -func newQueue(b *Bus, name string) *Queue { +func newSubscribeState(c *Client) *subscribeState { stopCtl, stopWorker := newGoroutineShutdown() - ret := &Queue{ - bus: b, - name: name, + ret := &subscribeState{ + client: c, write: make(chan any), stop: stopCtl, snapshot: make(chan chan []any), - outputs: map[reflect.Type]dispatchFn{}, + outputs: map[reflect.Type]subscriber{}, } - b.addQueue(ret) go ret.pump(stopWorker) return ret } -func (q *Queue) pump(stop goroutineShutdownWorker) { +func (q *subscribeState) pump(stop goroutineShutdownWorker) { defer stop.Done() var vals queue acceptCh := func() chan any { @@ -55,13 +68,13 @@ func (q *Queue) pump(stop goroutineShutdownWorker) { for { if !vals.Empty() { val := vals.Peek() - fn := q.dispatchFn(val) - if fn == nil { + sub := q.subscriberFor(val) + if sub == nil { // Raced with unsubscribe. vals.Drop() continue } - if !fn(&vals, stop, acceptCh) { + if !sub.dispatch(&vals, stop, acceptCh) { return } } else { @@ -81,16 +94,74 @@ func (q *Queue) pump(stop goroutineShutdownWorker) { } } -// A Subscriber delivers one type of event from a [Queue]. +func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) { + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + if s.outputs[t] != nil { + panic(fmt.Errorf("double subscription for event %s", t)) + } + s.outputs[t] = sub + s.client.addSubscriber(t, s) +} + +func (s *subscribeState) deleteSubscriber(t reflect.Type) { + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + delete(s.outputs, t) + s.client.deleteSubscriber(t, s) +} + +func (q *subscribeState) subscriberFor(val any) subscriber { + q.outputsMu.Lock() + defer q.outputsMu.Unlock() + return q.outputs[reflect.TypeOf(val)] +} + +// Close closes the subscribeState. Implicitly closes all Subscribers +// linked to this state, and any pending events are discarded. +func (s *subscribeState) close() { + s.stop.StopAndWait() + + var subs map[reflect.Type]subscriber + s.outputsMu.Lock() + subs, s.outputs = s.outputs, nil + s.outputsMu.Unlock() + for _, sub := range subs { + sub.Close() + } +} + +// A Subscriber delivers one type of event from a [Client]. type Subscriber[T any] struct { - recv *Queue - read chan T + doneCtx context.Context + done context.CancelFunc + recv *subscribeState + read chan T +} + +func newSubscriber[T any](r *subscribeState) *Subscriber[T] { + t := reflect.TypeFor[T]() + + ctx, cancel := context.WithCancel(context.Background()) + ret := &Subscriber[T]{ + doneCtx: ctx, + done: cancel, + recv: r, + read: make(chan T), + } + r.addSubscriber(t, ret) + + return ret +} + +func (s *Subscriber[T]) subscribeType() reflect.Type { + return reflect.TypeFor[T]() } func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool { t := vals.Peek().(T) for { - // Keep the cases in this select in sync with Queue.pump + // Keep the cases in this select in sync with subscribeState.pump // above. The only different should be that this select // delivers a value on s.read. select { @@ -113,58 +184,16 @@ func (s *Subscriber[T]) Events() <-chan T { return s.read } -// Close shuts down the Subscriber, indicating the caller no longer -// wishes to receive these events. After Close, receives on -// [Subscriber.Chan] block for ever. +// Done returns a channel that is closed when the subscriber is +// closed. +func (s *Subscriber[T]) Done() <-chan struct{} { + return s.doneCtx.Done() +} + +// Close closes the Subscriber, indicating the caller no longer wishes +// to receive this event type. After Close, receives on +// [Subscriber.Events] block for ever. func (s *Subscriber[T]) Close() { - t := reflect.TypeFor[T]() - s.recv.bus.unsubscribe(t, s.recv) - s.recv.deleteDispatchFn(t) -} - -func (q *Queue) dispatchFn(val any) dispatchFn { - q.outputsMu.Lock() - defer q.outputsMu.Unlock() - return q.outputs[reflect.ValueOf(val).Type()] -} - -func (q *Queue) addDispatchFn(t reflect.Type, fn dispatchFn) { - q.outputsMu.Lock() - defer q.outputsMu.Unlock() - if q.outputs[t] != nil { - panic(fmt.Errorf("double subscription for event %s", t)) - } - q.outputs[t] = fn -} - -func (q *Queue) deleteDispatchFn(t reflect.Type) { - q.outputsMu.Lock() - defer q.outputsMu.Unlock() - delete(q.outputs, t) -} - -// Done returns a channel that is closed when the Queue is closed. -func (q *Queue) Done() <-chan struct{} { - return q.stop.WaitChan() -} - -// Close closes the queue. All Subscribers attached to the queue are -// implicitly closed, and any pending events are discarded. -func (q *Queue) Close() { - q.stop.StopAndWait() - q.bus.deleteQueue(q) -} - -// Subscribe requests delivery of events of type T through the given -// Queue. Panics if the queue already has a subscriber for T. -func Subscribe[T any](r *Queue) Subscriber[T] { - t := reflect.TypeFor[T]() - ret := Subscriber[T]{ - recv: r, - read: make(chan T), - } - r.addDispatchFn(t, ret.dispatch) - r.bus.subscribe(t, r) - - return ret + s.done() // unblock receivers + s.recv.deleteSubscriber(reflect.TypeFor[T]()) }