diff --git a/go.mod b/go.mod index 106538e94..970e2e63c 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/coder/websocket v1.8.12 github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf + github.com/creachadair/taskgroup v0.13.2 github.com/creack/pty v1.1.23 github.com/dblohm7/wingoes v0.0.0-20240119213807-a09d6be7affa github.com/digitalocean/go-smbios v0.0.0-20180907143718-390a4f403a8e diff --git a/go.sum b/go.sum index efbf8ae2b..1707effd5 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7 github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creachadair/mds v0.17.1 h1:lXQbTGKmb3nE3aK6OEp29L1gCx6B5ynzlQ6c1KOBurc= github.com/creachadair/mds v0.17.1/go.mod h1:4b//mUiL8YldH6TImXjmW45myzTLNS1LLjOmrk888eg= +github.com/creachadair/taskgroup v0.13.2 h1:3KyqakBuFsm3KkXi/9XIb0QcA8tEzLHLgaoidf0MdVc= +github.com/creachadair/taskgroup v0.13.2/go.mod h1:i3V1Zx7H8RjwljUEeUWYT30Lmb9poewSb2XI1yTwD0g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.23 h1:4M6+isWdcStXEf15G/RbrMPOQj1dZ7HPZCGwE4kOeP0= github.com/creack/pty v1.1.23/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= @@ -298,6 +300,8 @@ github.com/firefart/nonamedreturns v1.0.4 h1:abzI1p7mAEPYuR4A+VLKn4eNDOycjYo2phm github.com/firefart/nonamedreturns v1.0.4/go.mod h1:TDhe/tjI1BXo48CmYbUduTV7BdIga8MAO/xbKdcVsGI= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go new file mode 100644 index 000000000..85d73b15e --- /dev/null +++ b/util/eventbus/bus.go @@ -0,0 +1,223 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "context" + "reflect" + "slices" + "sync" + + "tailscale.com/util/set" +) + +// Bus is an event bus that distributes published events to interested +// subscribers. +type Bus struct { + write chan any + stop goroutineShutdownControl + snapshot chan chan []any + + topicsMu sync.Mutex // guards everything below. + topics map[reflect.Type][]*Queue + + // Used for introspection/debugging only, not in the normal event + // publishing path. + publishers set.Set[publisher] + queues set.Set[*Queue] +} + +// New returns a new bus. Use [PublisherOf] to make event publishers, +// and [Bus.Queue] and [Subscribe] to make event subscribers. +func New() *Bus { + stopCtl, stopWorker := newGoroutineShutdown() + ret := &Bus{ + write: make(chan any), + stop: stopCtl, + snapshot: make(chan chan []any), + topics: map[reflect.Type][]*Queue{}, + publishers: set.Set[publisher]{}, + queues: set.Set[*Queue]{}, + } + go ret.pump(stopWorker) + return ret +} + +func (b *Bus) pump(stop goroutineShutdownWorker) { + defer stop.Done() + var vals queue + acceptCh := func() chan any { + if vals.Full() { + return nil + } + return b.write + } + for { + // Drain all pending events. Note that while we're draining + // events into subscriber queues, we continue to + // opportunistically accept more incoming events, if we have + // queue space for it. + for !vals.Empty() { + val := vals.Peek() + dests := b.dest(reflect.ValueOf(val).Type()) + for _, d := range dests { + deliverOne: + for { + select { + case d.write <- val: + break deliverOne + case <-d.stop.WaitChan(): + // Queue closed, don't block but continue + // delivering to others. + break deliverOne + case in := <-acceptCh(): + vals.Add(in) + case <-stop.Stop(): + return + case ch := <-b.snapshot: + ch <- vals.Snapshot() + } + } + } + vals.Drop() + } + + // Inbound queue empty, wait for at least 1 work item before + // resuming. + for vals.Empty() { + select { + case <-stop.Stop(): + return + case val := <-b.write: + vals.Add(val) + case ch := <-b.snapshot: + ch <- nil + } + } + } +} + +func (b *Bus) dest(t reflect.Type) []*Queue { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + return b.topics[t] +} + +func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.topics[t] = append(b.topics[t], q) + return func() { + b.unsubscribe(t, q) + } +} + +func (b *Bus) unsubscribe(t reflect.Type, q *Queue) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + // Topic slices are accessed by pump without holding a lock, so we + // have to replace the entire slice when unsubscribing. + // Unsubscribing should be infrequent enough that this won't + // matter. + i := slices.Index(b.topics[t], q) + if i < 0 { + return + } + b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) +} + +func (b *Bus) Close() { + b.stop.StopAndWait() +} + +// Queue returns a new queue with no subscriptions. Use [Subscribe] to +// atach subscriptions to it. +// +// The queue's name should be a short, human-readable string that +// identifies this queue. The name is only visible through debugging +// APIs. +func (b *Bus) Queue(name string) *Queue { + return newQueue(b, name) +} + +func (b *Bus) addQueue(q *Queue) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.queues.Add(q) +} + +func (b *Bus) deleteQueue(q *Queue) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.queues.Delete(q) +} + +func (b *Bus) addPublisher(p publisher) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.publishers.Add(p) +} + +func (b *Bus) deletePublisher(p publisher) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.publishers.Delete(p) +} + +func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) { + ctx, cancel := context.WithCancel(context.Background()) + + ctl := goroutineShutdownControl{ + startShutdown: cancel, + shutdownFinished: make(chan struct{}), + } + work := goroutineShutdownWorker{ + startShutdown: ctx.Done(), + shutdownFinished: ctl.shutdownFinished, + } + + return ctl, work +} + +// goroutineShutdownControl is a helper type to manage the shutdown of +// a worker goroutine. The worker goroutine should use the +// goroutineShutdownWorker related to this controller. +type goroutineShutdownControl struct { + startShutdown context.CancelFunc + shutdownFinished chan struct{} +} + +func (ctl *goroutineShutdownControl) Stop() { + ctl.startShutdown() +} + +func (ctl *goroutineShutdownControl) Wait() { + <-ctl.shutdownFinished +} + +func (ctl *goroutineShutdownControl) WaitChan() <-chan struct{} { + return ctl.shutdownFinished +} + +func (ctl *goroutineShutdownControl) StopAndWait() { + ctl.Stop() + ctl.Wait() +} + +// goroutineShutdownWorker is a helper type for a worker goroutine to +// be notified that it should shut down, and to report that shutdown +// has completed. The notification is triggered by the related +// goroutineShutdownControl. +type goroutineShutdownWorker struct { + startShutdown <-chan struct{} + shutdownFinished chan struct{} +} + +func (work *goroutineShutdownWorker) Stop() <-chan struct{} { + return work.startShutdown +} + +func (work *goroutineShutdownWorker) Done() { + close(work.shutdownFinished) +} diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go new file mode 100644 index 000000000..180f4164a --- /dev/null +++ b/util/eventbus/bus_test.go @@ -0,0 +1,196 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus_test + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/creachadair/taskgroup" + "github.com/google/go-cmp/cmp" + "tailscale.com/util/eventbus" +) + +type EventA struct { + Counter int +} + +type EventB struct { + Counter int +} + +func TestBus(t *testing.T) { + b := eventbus.New() + defer b.Close() + + q := b.Queue("TestBus") + defer q.Close() + s := eventbus.Subscribe[EventA](q) + + go func() { + pa := eventbus.PublisherOf[EventA](b, "TestBusA") + defer pa.Close() + pb := eventbus.PublisherOf[EventB](b, "TestBusB") + defer pb.Close() + pa.Publish(EventA{1}) + pb.Publish(EventB{2}) + pa.Publish(EventA{3}) + }() + + want := expectEvents(t, EventA{1}, EventA{3}) + for !want.Empty() { + select { + case got := <-s.Events(): + want.Got(got) + case <-q.Done(): + t.Fatalf("queue closed unexpectedly") + case <-time.After(time.Second): + t.Fatalf("timed out waiting for event") + } + } +} + +func TestBusMultipleConsumers(t *testing.T) { + b := eventbus.New() + defer b.Close() + + q1 := b.Queue("TestBusA") + defer q1.Close() + s1 := eventbus.Subscribe[EventA](q1) + + q2 := b.Queue("TestBusAB") + defer q2.Close() + s2A := eventbus.Subscribe[EventA](q2) + s2B := eventbus.Subscribe[EventB](q2) + + go func() { + pa := eventbus.PublisherOf[EventA](b, "TestBusA") + defer pa.Close() + pb := eventbus.PublisherOf[EventB](b, "TestBusB") + defer pb.Close() + pa.Publish(EventA{1}) + pb.Publish(EventB{2}) + pa.Publish(EventA{3}) + }() + + wantA := expectEvents(t, EventA{1}, EventA{3}) + wantB := expectEvents(t, EventA{1}, EventB{2}, EventA{3}) + for !wantA.Empty() || !wantB.Empty() { + select { + case got := <-s1.Events(): + wantA.Got(got) + case got := <-s2A.Events(): + wantB.Got(got) + case got := <-s2B.Events(): + wantB.Got(got) + case <-q1.Done(): + t.Fatalf("queue closed unexpectedly") + case <-q2.Done(): + t.Fatalf("queue closed unexpectedly") + case <-time.After(time.Second): + t.Fatalf("timed out waiting for event") + } + } +} + +func TestSpam(t *testing.T) { + b := eventbus.New() + defer b.Close() + + const ( + publishers = 100 + eventsPerPublisher = 20 + wantEvents = publishers * eventsPerPublisher + subscribers = 100 + ) + + var g taskgroup.Group + + received := make([][]EventA, subscribers) + for i := range subscribers { + q := b.Queue(fmt.Sprintf("Subscriber%d", i)) + defer q.Close() + s := eventbus.Subscribe[EventA](q) + g.Go(func() error { + for range wantEvents { + select { + case evt := <-s.Events(): + received[i] = append(received[i], evt) + case <-q.Done(): + t.Errorf("queue done before expected number of events received") + return errors.New("queue prematurely closed") + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for expected bus event after %d events", len(received[i])) + return errors.New("timeout") + } + } + return nil + }) + } + + published := make([][]EventA, publishers) + for i := range publishers { + g.Run(func() { + p := eventbus.PublisherOf[EventA](b, fmt.Sprintf("Publisher%d", i)) + for j := range eventsPerPublisher { + evt := EventA{i*eventsPerPublisher + j} + p.Publish(evt) + published[i] = append(published[i], evt) + } + }) + } + + if err := g.Wait(); err != nil { + t.Fatal(err) + } + var last []EventA + for i, got := range received { + if len(got) != wantEvents { + // Receiving goroutine already reported an error, we just need + // to fail early within the main test goroutine. + t.FailNow() + } + if last == nil { + continue + } + if diff := cmp.Diff(got, last); diff != "" { + t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff) + } + last = got + } + for i, sent := range published { + if got := len(sent); got != eventsPerPublisher { + t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher) + } + } + + // TODO: check that the published sequences are proper + // subsequences of the received slices. +} + +type queueChecker struct { + t *testing.T + want []any +} + +func expectEvents(t *testing.T, want ...any) *queueChecker { + return &queueChecker{t, want} +} + +func (q *queueChecker) Got(v any) { + q.t.Helper() + if q.Empty() { + q.t.Fatalf("queue got unexpected %v", v) + } + if v != q.want[0] { + q.t.Fatalf("queue got %#v, want %#v", v, q.want[0]) + } + q.want = q.want[1:] +} + +func (q *queueChecker) Empty() bool { + return len(q.want) == 0 +} diff --git a/util/eventbus/doc.go b/util/eventbus/doc.go new file mode 100644 index 000000000..136823c42 --- /dev/null +++ b/util/eventbus/doc.go @@ -0,0 +1,100 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package eventbus provides an in-process event bus. +// +// The event bus connects publishers of typed events with subscribers +// interested in those events. +// +// # Usage +// +// To publish events, use [PublisherOf] to get a typed publisher for +// your event type, then call [Publisher.Publish] as needed. If your +// event is expensive to construct, you can optionally use +// [Publisher.ShouldPublish] to skip the work if nobody is listening +// for the event. +// +// To receive events, first use [Bus.Queue] to create an event +// delivery queue, then use [Subscribe] to get a [Subscriber] for each +// event type you're interested in. Receive the events themselves by +// selecting over all your [Subscriber.Chan] channels, as well as +// [Queue.Done] for shutdown notifications. +// +// # Concurrency properties +// +// The bus serializes all published events, and preserves that +// ordering when delivering to subscribers that are attached to the +// same Queue. In more detail: +// +// - An event is published to the bus at some instant between the +// start and end of the call to [Publisher.Publish]. +// - Events cannot be published at the same instant, and so are +// totally ordered by their publication time. Given two events E1 +// and E2, either E1 happens before E2, or E2 happens before E1. +// - Queues dispatch events to their Subscribers in publication +// order: if E1 happens before E2, the queue always delivers E1 +// before E2. +// - Queues do not synchronize with each other: given queues Q1 and +// Q2, both subscribed to events E1 and E2, Q1 may deliver both E1 +// and E2 before Q2 delivers E1. +// +// Less formally: there is one true timeline of all published events. +// If you make a Queue and subscribe to events on it, you will receive +// those events one at a time, in the same order as the one true +// timeline. You will "skip over" events you didn't subscribe to, but +// your view of the world always moves forward in time, never +// backwards, and you will observe events in the same order as +// everyone else. +// +// However, you cannot assume that what your subscribers on your queue +// see as "now" is the same as what other subscribers on other +// queues. Their queue may be further behind you in the timeline, or +// running ahead of you. This means you should be careful about +// reaching out to another component directly after receiving an +// event, as its view of the world may not yet (or ever) be exactly +// consistent with yours. +// +// To make your code more testable and understandable, you should try +// to structure it following the actor model: you have some local +// state over which you have authority, but your only way to interact +// with state elsewhere in the program is to receive and process +// events coming from elsewhere, or to emit events of your own. +// +// # Expected subscriber behavior +// +// Subscribers are expected to promptly receive their events on +// [Subscriber.Chan]. The bus has a small, fixed amount of internal +// buffering, meaning that a slow subscriber will eventually cause +// backpressure and block publication of all further events. +// +// In general, you should receive from your subscriber(s) in a loop, +// and only do fast state updates within that loop. Any heavier work +// should be offloaded to another goroutine. +// +// Causing publishers to block from backpressure is considered a bug +// in the slow subscriber causing the backpressure, and should be +// addressed there. Publishers should assume that Publish will not +// block for extended periods of time, and should not make exceptional +// effort to behave gracefully if they do get blocked. +// +// These blocking semantics are provisional and subject to +// change. Please speak up if this causes development pain, so that we +// can adapt the semantics to better suit our needs. +// +// # Debugging facilities +// +// (TODO, not implemented yet, sorry, I promise we're working on it next!) +// +// The bus comes with introspection facilities to help reason about +// the state of the client, and diagnose issues such as slow +// subscribers. +// +// The bus provide a tsweb debugging page that shows the current state +// of the bus, including all publishers, subscribers, and queued +// events. +// +// The bus also has a snooping and tracing facility, which lets you +// observe all events flowing through the bus, along with their +// source, destination(s) and timing information such as the time of +// delivery to each subscriber and end-to-end bus delays. +package eventbus diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go new file mode 100644 index 000000000..14828812b --- /dev/null +++ b/util/eventbus/publish.go @@ -0,0 +1,79 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "context" + "reflect" +) + +// publisher is a uniformly typed wrapper around Publisher[T], so that +// debugging facilities can look at active publishers. +type publisher interface { + publisherName() string +} + +// A Publisher publishes events on the bus. +type Publisher[T any] struct { + bus *Bus + name string + stopCtx context.Context + stop context.CancelFunc +} + +// PublisherOf returns a publisher for event type T on the given bus. +// +// The publisher's name should be a short, human-readable string that +// identifies this event publisher. The name is only visible through +// debugging APIs. +func PublisherOf[T any](b *Bus, name string) *Publisher[T] { + ctx, cancel := context.WithCancel(context.Background()) + ret := &Publisher[T]{ + bus: b, + name: name, + stopCtx: ctx, + stop: cancel, + } + b.addPublisher(ret) + return ret +} + +func (p *Publisher[T]) publisherName() string { return p.name } + +// Publish publishes event v on the bus. +func (p *Publisher[T]) Publish(v T) { + // Check for just a stopped publisher or bus before trying to + // write, so that once closed Publish consistently does nothing. + select { + case <-p.stopCtx.Done(): + return + case <-p.bus.stop.WaitChan(): + return + default: + } + + select { + case p.bus.write <- v: + case <-p.stopCtx.Done(): + case <-p.bus.stop.WaitChan(): + } +} + +// ShouldPublish reports whether anyone is subscribed to events of +// type T. +// +// ShouldPublish can be used to skip expensive event construction if +// nobody seems to care. Publishers must not assume that someone will +// definitely receive an event if ShouldPublish returns true. +func (p *Publisher[T]) ShouldPublish() bool { + dests := p.bus.dest(reflect.TypeFor[T]()) + return len(dests) > 0 +} + +// Close closes the publisher, indicating that no further events will +// be published with it. +func (p *Publisher[T]) Close() { + p.stop() + p.bus.deletePublisher(p) +} diff --git a/util/eventbus/queue.go b/util/eventbus/queue.go new file mode 100644 index 000000000..8f6bda748 --- /dev/null +++ b/util/eventbus/queue.go @@ -0,0 +1,83 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "slices" +) + +const maxQueuedItems = 16 + +// queue is an ordered queue of length up to maxQueuedItems. +type queue struct { + vals []any + start int +} + +// canAppend reports whether a value can be appended to q.vals without +// shifting values around. +func (q *queue) canAppend() bool { + return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals) +} + +func (q *queue) Full() bool { + return q.start == 0 && !q.canAppend() +} + +func (q *queue) Empty() bool { + return q.start == len(q.vals) +} + +func (q *queue) Len() int { + return len(q.vals) - q.start +} + +// Add adds v to the end of the queue. Blocks until append can be +// done. +func (q *queue) Add(v any) { + if !q.canAppend() { + if q.start == 0 { + panic("Add on a full queue") + } + + // Slide remaining values back to the start of the array. + n := copy(q.vals, q.vals[q.start:]) + toClear := len(q.vals) - n + clear(q.vals[len(q.vals)-toClear:]) + q.vals = q.vals[:n] + q.start = 0 + } + + q.vals = append(q.vals, v) +} + +// Peek returns the first value in the queue, without removing it from +// the queue, or nil if the queue is empty. +func (q *queue) Peek() any { + if q.Empty() { + return nil + } + + return q.vals[q.start] +} + +// Drop discards the first value in the queue, if any. +func (q *queue) Drop() { + if q.Empty() { + return + } + + q.vals[q.start] = nil + q.start++ + if q.Empty() { + // Reset cursor to start of array, it's free to do. + q.start = 0 + q.vals = q.vals[:0] + } +} + +// Snapshot returns a copy of the queue's contents. +func (q *queue) Snapshot() []any { + return slices.Clone(q.vals[q.start:]) +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go new file mode 100644 index 000000000..ade834d77 --- /dev/null +++ b/util/eventbus/subscribe.go @@ -0,0 +1,170 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "fmt" + "reflect" + "sync" +) + +type dispatchFn func(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool + +// A Queue receives events from a Bus. +// +// To receive events through the queue, see [Subscribe]. Subscribers +// that share the same Queue receive events one at time, in the order +// they were published. +type Queue struct { + bus *Bus + name string + + write chan any + stop goroutineShutdownControl + snapshot chan chan []any + + outputsMu sync.Mutex + outputs map[reflect.Type]dispatchFn +} + +func newQueue(b *Bus, name string) *Queue { + stopCtl, stopWorker := newGoroutineShutdown() + ret := &Queue{ + bus: b, + name: name, + write: make(chan any), + stop: stopCtl, + snapshot: make(chan chan []any), + outputs: map[reflect.Type]dispatchFn{}, + } + b.addQueue(ret) + go ret.pump(stopWorker) + return ret +} + +func (q *Queue) pump(stop goroutineShutdownWorker) { + defer stop.Done() + var vals queue + acceptCh := func() chan any { + if vals.Full() { + return nil + } + return q.write + } + for { + if !vals.Empty() { + val := vals.Peek() + fn := q.dispatchFn(val) + if fn == nil { + // Raced with unsubscribe. + vals.Drop() + continue + } + if !fn(&vals, stop, acceptCh) { + return + } + } else { + // Keep the cases in this select in sync with + // Subscriber.dispatch below. The only different should be + // that this select doesn't deliver queued values to + // anyone, and unconditionally accepts new values. + select { + case val := <-q.write: + vals.Add(val) + case <-stop.Stop(): + return + case ch := <-q.snapshot: + ch <- vals.Snapshot() + } + } + } +} + +// A Subscriber delivers one type of event from a [Queue]. +type Subscriber[T any] struct { + recv *Queue + read chan T +} + +func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool { + t := vals.Peek().(T) + for { + // Keep the cases in this select in sync with Queue.pump + // above. The only different should be that this select + // delivers a value on s.read. + select { + case s.read <- t: + vals.Drop() + return true + case val := <-acceptCh(): + vals.Add(val) + case <-stop.Stop(): + return false + case ch := <-s.recv.snapshot: + ch <- vals.Snapshot() + } + } +} + +// Events returns a channel on which the subscriber's events are +// delivered. +func (s *Subscriber[T]) Events() <-chan T { + return s.read +} + +// Close shuts down the Subscriber, indicating the caller no longer +// wishes to receive these events. After Close, receives on +// [Subscriber.Chan] block for ever. +func (s *Subscriber[T]) Close() { + t := reflect.TypeFor[T]() + s.recv.bus.unsubscribe(t, s.recv) + s.recv.deleteDispatchFn(t) +} + +func (q *Queue) dispatchFn(val any) dispatchFn { + q.outputsMu.Lock() + defer q.outputsMu.Unlock() + return q.outputs[reflect.ValueOf(val).Type()] +} + +func (q *Queue) addDispatchFn(t reflect.Type, fn dispatchFn) { + q.outputsMu.Lock() + defer q.outputsMu.Unlock() + if q.outputs[t] != nil { + panic(fmt.Errorf("double subscription for event %s", t)) + } + q.outputs[t] = fn +} + +func (q *Queue) deleteDispatchFn(t reflect.Type) { + q.outputsMu.Lock() + defer q.outputsMu.Unlock() + delete(q.outputs, t) +} + +// Done returns a channel that is closed when the Queue is closed. +func (q *Queue) Done() <-chan struct{} { + return q.stop.WaitChan() +} + +// Close closes the queue. All Subscribers attached to the queue are +// implicitly closed, and any pending events are discarded. +func (q *Queue) Close() { + q.stop.StopAndWait() + q.bus.deleteQueue(q) +} + +// Subscribe requests delivery of events of type T through the given +// Queue. Panics if the queue already has a subscriber for T. +func Subscribe[T any](r *Queue) Subscriber[T] { + t := reflect.TypeFor[T]() + ret := Subscriber[T]{ + recv: r, + read: make(chan T), + } + r.addDispatchFn(t, ret.dispatch) + r.bus.subscribe(t, r) + + return ret +}