From 853abf86619d2994157012fec3cd123b64475d5f Mon Sep 17 00:00:00 2001 From: David Anderson Date: Thu, 6 Mar 2025 21:51:18 -0800 Subject: [PATCH] util/eventbus: initial debugging facilities for the event bus Enables monitoring events as they flow, listing bus clients, and snapshotting internal queues to troubleshoot stalls. Updates #15160 Signed-off-by: David Anderson --- util/eventbus/bus.go | 63 +++++++++++++++-------- util/eventbus/client.go | 16 +++++- util/eventbus/debug.go | 103 +++++++++++++++++++++++++++++++++++-- util/eventbus/doc.go | 17 ++---- util/eventbus/publish.go | 2 +- util/eventbus/subscribe.go | 72 +++++++++++++++++--------- 6 files changed, 207 insertions(+), 66 deletions(-) diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index a9b6f0dec..fc497add2 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -12,12 +12,12 @@ import ( "tailscale.com/util/set" ) -type publishedEvent struct { +type PublishedEvent struct { Event any From *Client } -type routedEvent struct { +type RoutedEvent struct { Event any From *Client To []*Client @@ -27,24 +27,25 @@ type routedEvent struct { // subscribers. type Bus struct { router *worker - write chan publishedEvent - snapshot chan chan []publishedEvent - routeDebug hook[routedEvent] + write chan PublishedEvent + snapshot chan chan []PublishedEvent + routeDebug hook[RoutedEvent] - topicsMu sync.Mutex // guards everything below. + topicsMu sync.Mutex topics map[reflect.Type][]*subscribeState // Used for introspection/debugging only, not in the normal event // publishing path. - clients set.Set[*Client] + clientsMu sync.Mutex + clients set.Set[*Client] } // New returns a new bus. Use [PublisherOf] to make event publishers, // and [Bus.Queue] and [Subscribe] to make event subscribers. func New() *Bus { ret := &Bus{ - write: make(chan publishedEvent), - snapshot: make(chan chan []publishedEvent), + write: make(chan PublishedEvent), + snapshot: make(chan chan []PublishedEvent), topics: map[reflect.Type][]*subscribeState{}, clients: set.Set[*Client]{}, } @@ -65,12 +66,17 @@ func (b *Bus) Client(name string) *Client { bus: b, pub: set.Set[publisher]{}, } - b.topicsMu.Lock() - defer b.topicsMu.Unlock() + b.clientsMu.Lock() + defer b.clientsMu.Unlock() b.clients.Add(ret) return ret } +// Debugger returns the debugging facility for the bus. +func (b *Bus) Debugger() Debugger { + return Debugger{b} +} + // Close closes the bus. Implicitly closes all clients, publishers and // subscribers attached to the bus. // @@ -79,19 +85,17 @@ func (b *Bus) Client(name string) *Client { func (b *Bus) Close() { b.router.StopAndWait() - var clients set.Set[*Client] - b.topicsMu.Lock() - clients, b.clients = b.clients, set.Set[*Client]{} - b.topicsMu.Unlock() - - for c := range clients { + b.clientsMu.Lock() + defer b.clientsMu.Unlock() + for c := range b.clients { c.Close() } + b.clients = nil } func (b *Bus) pump(ctx context.Context) { - var vals queue[publishedEvent] - acceptCh := func() chan publishedEvent { + var vals queue[PublishedEvent] + acceptCh := func() chan PublishedEvent { if vals.Full() { return nil } @@ -111,7 +115,7 @@ func (b *Bus) pump(ctx context.Context) { for i := range len(dests) { clients[i] = dests[i].client } - b.routeDebug.run(routedEvent{ + b.routeDebug.run(RoutedEvent{ Event: val.Event, From: val.From, To: clients, @@ -119,9 +123,10 @@ func (b *Bus) pump(ctx context.Context) { } for _, d := range dests { - evt := queuedEvent{ + evt := DeliveredEvent{ Event: val.Event, From: val.From, + To: d.client, } deliverOne: for { @@ -173,6 +178,22 @@ func (b *Bus) shouldPublish(t reflect.Type) bool { return len(b.topics[t]) > 0 } +func (b *Bus) listClients() []*Client { + b.clientsMu.Lock() + defer b.clientsMu.Unlock() + return b.clients.Slice() +} + +func (b *Bus) snapshotPublishQueue() []PublishedEvent { + resp := make(chan []PublishedEvent) + select { + case b.snapshot <- resp: + return <-resp + case <-b.router.Done(): + return nil + } +} + func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) { b.topicsMu.Lock() defer b.topicsMu.Unlock() diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 17f7e8608..5cf7f97f5 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -19,13 +19,15 @@ import ( type Client struct { name string bus *Bus - publishDebug hook[publishedEvent] + publishDebug hook[PublishedEvent] mu sync.Mutex pub set.Set[publisher] sub *subscribeState // Lazily created on first subscribe } +func (c *Client) Name() string { return c.name } + // Close closes the client. Implicitly closes all publishers and // subscribers obtained from this client. func (c *Client) Close() { @@ -47,6 +49,16 @@ func (c *Client) Close() { } } +func (c *Client) snapshotSubscribeQueue() []DeliveredEvent { + return c.peekSubscribeState().snapshotQueue() +} + +func (c *Client) peekSubscribeState() *subscribeState { + c.mu.Lock() + defer c.mu.Unlock() + return c.sub +} + func (c *Client) subscribeState() *subscribeState { c.mu.Lock() defer c.mu.Unlock() @@ -76,7 +88,7 @@ func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) { c.bus.unsubscribe(t, s) } -func (c *Client) publish() chan<- publishedEvent { +func (c *Client) publish() chan<- PublishedEvent { return c.bus.write } diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 912fe7623..d41fc0385 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -4,11 +4,110 @@ package eventbus import ( + "fmt" "slices" "sync" "sync/atomic" ) +// A Debugger offers access to a bus's privileged introspection and +// debugging facilities. +// +// The debugger's functionality is intended for humans and their tools +// to examine and troubleshoot bus clients, and should not be used in +// normal codepaths. +// +// In particular, the debugger provides access to information that is +// deliberately withheld from bus clients to encourage more robust and +// maintainable code - for example, the sender of an event, or the +// event streams of other clients. Please don't use the debugger to +// circumvent these restrictions for purposes other than debugging. +type Debugger struct { + bus *Bus +} + +// Clients returns a list of all clients attached to the bus. +func (d *Debugger) Clients() []*Client { + return d.bus.listClients() +} + +// PublishQueue returns the contents of the publish queue. +// +// The publish queue contains events that have been accepted by the +// bus from Publish() calls, but have not yet been routed to relevant +// subscribers. +// +// This queue is expected to be almost empty in normal operation. A +// full publish queue indicates that a slow subscriber downstream is +// causing backpressure and stalling the bus. +func (d *Debugger) PublishQueue() []PublishedEvent { + return d.bus.snapshotPublishQueue() +} + +// checkClient verifies that client is attached to the same bus as the +// Debugger, and panics if not. +func (d *Debugger) checkClient(client *Client) { + if client.bus != d.bus { + panic(fmt.Errorf("SubscribeQueue given client belonging to wrong bus")) + } +} + +// SubscribeQueue returns the contents of the given client's subscribe +// queue. +// +// The subscribe queue contains events that are to be delivered to the +// client, but haven't yet been handed off to the relevant +// [Subscriber]. +// +// This queue is expected to be almost empty in normal operation. A +// full subscribe queue indicates that the client is accepting events +// too slowly, and may be causing the rest of the bus to stall. +func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent { + d.checkClient(client) + return client.snapshotSubscribeQueue() +} + +// WatchBus streams information about all events passing through the +// bus. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchBus() *Subscriber[RoutedEvent] { + return newMonitor(d.bus.routeDebug.add) +} + +// WatchPublish streams information about all events published by the +// given client. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent] { + d.checkClient(client) + return newMonitor(client.publishDebug.add) +} + +// WatchSubscribe streams information about all events received by the +// given client. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] { + d.checkClient(client) + return newMonitor(client.subscribeState().debug.add) +} + // A hook collects hook functions that can be run as a group. type hook[T any] struct { sync.Mutex @@ -19,8 +118,6 @@ var hookID atomic.Uint64 // add registers fn to be called when the hook is run. Returns an // unregistration function that removes fn from the hook when called. -// -//lint:ignore U1000 Not used yet, but will be in an upcoming change func (h *hook[T]) add(fn func(T)) (remove func()) { id := hookID.Add(1) h.Lock() @@ -30,8 +127,6 @@ func (h *hook[T]) add(fn func(T)) (remove func()) { } // remove removes the function with the given ID from the hook. -// -//lint:ignore U1000 Not used yet, but will be in an upcoming change func (h *hook[T]) remove(id uint64) { h.Lock() defer h.Unlock() diff --git a/util/eventbus/doc.go b/util/eventbus/doc.go index b3509b48b..964a686ea 100644 --- a/util/eventbus/doc.go +++ b/util/eventbus/doc.go @@ -86,18 +86,7 @@ // // # Debugging facilities // -// (TODO, not implemented yet, sorry, I promise we're working on it next!) -// -// The bus comes with introspection facilities to help reason about -// the state of the client, and diagnose issues such as slow -// subscribers. -// -// The bus provide a tsweb debugging page that shows the current state -// of the bus, including all publishers, subscribers, and queued -// events. -// -// The bus also has a snooping and tracing facility, which lets you -// observe all events flowing through the bus, along with their -// source, destination(s) and timing information such as the time of -// delivery to each subscriber and end-to-end bus delays. +// The [Debugger], obtained through [Bus.Debugger], provides +// introspection facilities to monitor events flowing through the bus, +// and inspect publisher and subscriber state. package eventbus diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index b228708ac..9897114b6 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -52,7 +52,7 @@ func (p *Publisher[T]) Publish(v T) { default: } - evt := publishedEvent{ + evt := PublishedEvent{ Event: v, From: p.client, } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index c38949d9d..60e91edd5 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -10,17 +10,12 @@ import ( "sync" ) -type deliveredEvent struct { +type DeliveredEvent struct { Event any From *Client To *Client } -type queuedEvent struct { - Event any - From *Client -} - // subscriber is a uniformly typed wrapper around Subscriber[T], so // that debugging facilities can look at active subscribers. type subscriber interface { @@ -38,7 +33,7 @@ type subscriber interface { // processing other potential sources of wakeups, which is how we end // up at this awkward type signature and sharing of internal state // through dispatch. - dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool + dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool Close() } @@ -47,9 +42,9 @@ type subscribeState struct { client *Client dispatcher *worker - write chan queuedEvent - snapshot chan chan []queuedEvent - debug hook[deliveredEvent] + write chan DeliveredEvent + snapshot chan chan []DeliveredEvent + debug hook[DeliveredEvent] outputsMu sync.Mutex outputs map[reflect.Type]subscriber @@ -58,8 +53,8 @@ type subscribeState struct { func newSubscribeState(c *Client) *subscribeState { ret := &subscribeState{ client: c, - write: make(chan queuedEvent), - snapshot: make(chan chan []queuedEvent), + write: make(chan DeliveredEvent), + snapshot: make(chan chan []DeliveredEvent), outputs: map[reflect.Type]subscriber{}, } ret.dispatcher = runWorker(ret.pump) @@ -67,8 +62,8 @@ func newSubscribeState(c *Client) *subscribeState { } func (q *subscribeState) pump(ctx context.Context) { - var vals queue[queuedEvent] - acceptCh := func() chan queuedEvent { + var vals queue[DeliveredEvent] + acceptCh := func() chan DeliveredEvent { if vals.Full() { return nil } @@ -83,12 +78,12 @@ func (q *subscribeState) pump(ctx context.Context) { vals.Drop() continue } - if !sub.dispatch(ctx, &vals, acceptCh) { + if !sub.dispatch(ctx, &vals, acceptCh, q.snapshot) { return } if q.debug.active() { - q.debug.run(deliveredEvent{ + q.debug.run(DeliveredEvent{ Event: val.Event, From: val.From, To: q.client, @@ -111,6 +106,20 @@ func (q *subscribeState) pump(ctx context.Context) { } } +func (s *subscribeState) snapshotQueue() []DeliveredEvent { + if s == nil { + return nil + } + + resp := make(chan []DeliveredEvent) + select { + case s.snapshot <- resp: + return <-resp + case <-s.dispatcher.Done(): + return nil + } +} + func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) { s.outputsMu.Lock() defer s.outputsMu.Unlock() @@ -154,28 +163,43 @@ func (s *subscribeState) closed() <-chan struct{} { // A Subscriber delivers one type of event from a [Client]. type Subscriber[T any] struct { - stop stopFlag - recv *subscribeState - read chan T + stop stopFlag + read chan T + unregister func() } func newSubscriber[T any](r *subscribeState) *Subscriber[T] { t := reflect.TypeFor[T]() ret := &Subscriber[T]{ - recv: r, - read: make(chan T), + read: make(chan T), + unregister: func() { r.deleteSubscriber(t) }, } r.addSubscriber(t, ret) return ret } +func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { + ret := &Subscriber[T]{ + read: make(chan T, 100), // arbitrary, large + } + ret.unregister = attach(ret.monitor) + return ret +} + func (s *Subscriber[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } -func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool { +func (s *Subscriber[T]) monitor(debugEvent T) { + select { + case s.read <- debugEvent: + case <-s.stop.Done(): + } +} + +func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) for { // Keep the cases in this select in sync with subscribeState.pump @@ -189,7 +213,7 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[queuedEvent], vals.Add(val) case <-ctx.Done(): return false - case ch := <-s.recv.snapshot: + case ch := <-snapshot: ch <- vals.Snapshot() } } @@ -212,5 +236,5 @@ func (s *Subscriber[T]) Done() <-chan struct{} { // [Subscriber.Events] block for ever. func (s *Subscriber[T]) Close() { s.stop.Stop() // unblock receivers - s.recv.deleteSubscriber(reflect.TypeFor[T]()) + s.unregister() }