From 43c9228f4a6750bc70d03d4af7130537171a81eb Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 5 Mar 2025 10:07:48 -0800 Subject: [PATCH] WIP: internal debugging machinery Signed-off-by: David Anderson --- util/eventbus/bus.go | 29 ++++++++++-- util/eventbus/debug.go | 90 ++++++++++++++++++++++++++++++++++++++ util/eventbus/publish.go | 12 +++++ util/eventbus/subscribe.go | 18 ++++++++ 4 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 util/eventbus/debug.go diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 3520be828..ce888a46b 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -8,6 +8,7 @@ import ( "reflect" "slices" "sync" + "time" "tailscale.com/util/set" ) @@ -18,12 +19,14 @@ type Bus struct { router *worker write chan any snapshot chan chan []any + debug hook[routedEvent] topicsMu sync.Mutex // guards everything below. topics map[reflect.Type][]*subscribeState // Used for introspection/debugging only, not in the normal event // publishing path. + debugMu sync.Mutex clients set.Set[*Client] } @@ -53,8 +56,8 @@ func (b *Bus) Client(name string) *Client { bus: b, pub: set.Set[publisher]{}, } - b.topicsMu.Lock() - defer b.topicsMu.Unlock() + b.debugMu.Lock() + defer b.debugMu.Unlock() b.clients.Add(ret) return ret } @@ -68,9 +71,9 @@ func (b *Bus) Close() { b.router.StopAndWait() var clients set.Set[*Client] - b.topicsMu.Lock() + b.debugMu.Lock() clients, b.clients = b.clients, set.Set[*Client]{} - b.topicsMu.Unlock() + b.debugMu.Unlock() for c := range clients { c.Close() @@ -91,8 +94,26 @@ func (b *Bus) pump(ctx context.Context) { // opportunistically accept more incoming events, if we have // queue space for it. for !vals.Empty() { + popped := time.Now() val := vals.Peek() dests := b.dest(reflect.ValueOf(val).Type()) + routed := time.Now() + + if !b.debug.active() { + subscribers := make([]*Client, len(dests)) + for i := range len(dests) { + subscribers[i] = dests[i].client + } + b.debug.run(routedEvent{ + Event: val, + From: nil, // TODO: publisher queue needs to be of publishedEvent + To: subscribers, + Published: time.Time{}, // TODO: same + ReachedRouter: popped, + DestinationsPicked: routed, + }) + } + for _, d := range dests { deliverOne: for { diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go new file mode 100644 index 000000000..9a7c2b643 --- /dev/null +++ b/util/eventbus/debug.go @@ -0,0 +1,90 @@ +package eventbus + +import ( + "slices" + "sync" + "sync/atomic" + "time" +) + +type publishedEvent struct { + Event any + From *Client + Published time.Time +} + +type routedEvent struct { + Event any + From *Client // publisher's name + To []*Client // target names + + Published time.Time + ReachedRouter time.Time + DestinationsPicked time.Time +} + +type subscribedEvent struct { + Event any + From *Client + To *Client + + Published time.Time + ReachedRouter time.Time + DestinationsPicked time.Time + QueuedAtSubscriber time.Time + NextToDeliver time.Time +} + +// A hook is a hook point to which functions can be attached. When +// the hook is run, attached callbacks are invoked synchronously, in +// the order they were added. +type hook[T any] struct { + sync.Mutex + fns []hookFn[T] +} + +// add registers fn to be called when the hook is run. +// +// Returns a cleanup function that unregisters fn when called. +func (h *hook[T]) add(fn func(T)) (remove func()) { + id := hookID.Add(1) + h.Lock() + defer h.Unlock() + h.fns = append(h.fns, hookFn[T]{id, fn}) + return func() { h.remove(id) } +} + +// remove unregisters the hook function with the given ID. +func (h *hook[T]) remove(id uint64) { + h.Lock() + defer h.Unlock() + h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id }) +} + +// run calls all registered hooks functions with v. +func (h *hook[T]) run(v T) { + h.Lock() + defer h.Unlock() + for _, f := range h.fns { + f.run(v) + } +} + +// active reports whether any hook functions are registered. Hook call +// sites can use this to skip doing work if nobody's listening. +func (h *hook[T]) active() bool { + h.Lock() + defer h.Unlock() + return len(h.fns) > 0 +} + +var hookID atomic.Uint64 + +// hookFn attaches a comparable ID to a hook function, so that hooks +// can be found and deleted during cleanup. +type hookFn[T any] struct { + ID uint64 + Fn func(T) +} + +func (h hookFn[T]) run(v T) { h.Fn(v) } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index b2d0641d9..a8eefdb97 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,6 +5,7 @@ package eventbus import ( "reflect" + "time" ) // publisher is a uniformly typed wrapper around Publisher[T], so that @@ -18,6 +19,7 @@ type publisher interface { type Publisher[T any] struct { client *Client stop stopFlag + debug hook[publishedEvent] } func newPublisher[T any](c *Client) *Publisher[T] { @@ -44,6 +46,8 @@ func (p *Publisher[T]) publishType() reflect.Type { // Publish publishes event v on the bus. func (p *Publisher[T]) Publish(v T) { + now := time.Now() + // Check for just a stopped publisher or bus before trying to // write, so that once closed Publish consistently does nothing. select { @@ -52,6 +56,14 @@ func (p *Publisher[T]) Publish(v T) { default: } + if p.debug.active() { + p.debug.run(publishedEvent{ + Event: v, + From: p.client, + Published: now, + }) + } + select { case p.client.publish() <- v: case <-p.stop.Done(): diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 606410c8e..77ed194ff 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -8,11 +8,13 @@ import ( "fmt" "reflect" "sync" + "time" ) // subscriber is a uniformly typed wrapper around Subscriber[T], so // that debugging facilities can look at active subscribers. type subscriber interface { + client() *Client subscribeType() reflect.Type // dispatch is a function that dispatches the head value in vals to // a subscriber, while also handling stop and incoming queue write @@ -38,6 +40,7 @@ type subscribeState struct { dispatcher *worker write chan any snapshot chan chan []any + debug hook[subscribedEvent] outputsMu sync.Mutex outputs map[reflect.Type]subscriber @@ -64,6 +67,7 @@ func (q *subscribeState) pump(ctx context.Context) { } for { if !vals.Empty() { + popped := time.Now() val := vals.Peek() sub := q.subscriberFor(val) if sub == nil { @@ -71,6 +75,20 @@ func (q *subscribeState) pump(ctx context.Context) { vals.Drop() continue } + + if q.debug.active() { + q.debug.run(subscribedEvent{ + Event: val, + From: nil, // TODO: plumb more + To: q.client, + Published: time.Time{}, // TODO: plumb + ReachedRouter: time.Time{}, + DestinationsPicked: time.Time{}, + QueuedAtSubscriber: time.Time{}, + NextToDeliver: popped, + }) + } + if !sub.dispatch(ctx, &vals, acceptCh) { return }