WIP: internal debugging machinery

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-05 10:07:48 -08:00
parent 24d4846f00
commit 43c9228f4a
4 changed files with 145 additions and 4 deletions

View File

@ -8,6 +8,7 @@ import (
"reflect"
"slices"
"sync"
"time"
"tailscale.com/util/set"
)
@ -18,12 +19,14 @@ type Bus struct {
router *worker
write chan any
snapshot chan chan []any
debug hook[routedEvent]
topicsMu sync.Mutex // guards everything below.
topics map[reflect.Type][]*subscribeState
// Used for introspection/debugging only, not in the normal event
// publishing path.
debugMu sync.Mutex
clients set.Set[*Client]
}
@ -53,8 +56,8 @@ func (b *Bus) Client(name string) *Client {
bus: b,
pub: set.Set[publisher]{},
}
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.debugMu.Lock()
defer b.debugMu.Unlock()
b.clients.Add(ret)
return ret
}
@ -68,9 +71,9 @@ func (b *Bus) Close() {
b.router.StopAndWait()
var clients set.Set[*Client]
b.topicsMu.Lock()
b.debugMu.Lock()
clients, b.clients = b.clients, set.Set[*Client]{}
b.topicsMu.Unlock()
b.debugMu.Unlock()
for c := range clients {
c.Close()
@ -91,8 +94,26 @@ func (b *Bus) pump(ctx context.Context) {
// opportunistically accept more incoming events, if we have
// queue space for it.
for !vals.Empty() {
popped := time.Now()
val := vals.Peek()
dests := b.dest(reflect.ValueOf(val).Type())
routed := time.Now()
if !b.debug.active() {
subscribers := make([]*Client, len(dests))
for i := range len(dests) {
subscribers[i] = dests[i].client
}
b.debug.run(routedEvent{
Event: val,
From: nil, // TODO: publisher queue needs to be of publishedEvent
To: subscribers,
Published: time.Time{}, // TODO: same
ReachedRouter: popped,
DestinationsPicked: routed,
})
}
for _, d := range dests {
deliverOne:
for {

90
util/eventbus/debug.go Normal file
View File

@ -0,0 +1,90 @@
package eventbus
import (
"slices"
"sync"
"sync/atomic"
"time"
)
type publishedEvent struct {
Event any
From *Client
Published time.Time
}
type routedEvent struct {
Event any
From *Client // publisher's name
To []*Client // target names
Published time.Time
ReachedRouter time.Time
DestinationsPicked time.Time
}
type subscribedEvent struct {
Event any
From *Client
To *Client
Published time.Time
ReachedRouter time.Time
DestinationsPicked time.Time
QueuedAtSubscriber time.Time
NextToDeliver time.Time
}
// A hook is a hook point to which functions can be attached. When
// the hook is run, attached callbacks are invoked synchronously, in
// the order they were added.
type hook[T any] struct {
sync.Mutex
fns []hookFn[T]
}
// add registers fn to be called when the hook is run.
//
// Returns a cleanup function that unregisters fn when called.
func (h *hook[T]) add(fn func(T)) (remove func()) {
id := hookID.Add(1)
h.Lock()
defer h.Unlock()
h.fns = append(h.fns, hookFn[T]{id, fn})
return func() { h.remove(id) }
}
// remove unregisters the hook function with the given ID.
func (h *hook[T]) remove(id uint64) {
h.Lock()
defer h.Unlock()
h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id })
}
// run calls all registered hooks functions with v.
func (h *hook[T]) run(v T) {
h.Lock()
defer h.Unlock()
for _, f := range h.fns {
f.run(v)
}
}
// active reports whether any hook functions are registered. Hook call
// sites can use this to skip doing work if nobody's listening.
func (h *hook[T]) active() bool {
h.Lock()
defer h.Unlock()
return len(h.fns) > 0
}
var hookID atomic.Uint64
// hookFn attaches a comparable ID to a hook function, so that hooks
// can be found and deleted during cleanup.
type hookFn[T any] struct {
ID uint64
Fn func(T)
}
func (h hookFn[T]) run(v T) { h.Fn(v) }

View File

@ -5,6 +5,7 @@ package eventbus
import (
"reflect"
"time"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
@ -18,6 +19,7 @@ type publisher interface {
type Publisher[T any] struct {
client *Client
stop stopFlag
debug hook[publishedEvent]
}
func newPublisher[T any](c *Client) *Publisher[T] {
@ -44,6 +46,8 @@ func (p *Publisher[T]) publishType() reflect.Type {
// Publish publishes event v on the bus.
func (p *Publisher[T]) Publish(v T) {
now := time.Now()
// Check for just a stopped publisher or bus before trying to
// write, so that once closed Publish consistently does nothing.
select {
@ -52,6 +56,14 @@ func (p *Publisher[T]) Publish(v T) {
default:
}
if p.debug.active() {
p.debug.run(publishedEvent{
Event: v,
From: p.client,
Published: now,
})
}
select {
case p.client.publish() <- v:
case <-p.stop.Done():

View File

@ -8,11 +8,13 @@ import (
"fmt"
"reflect"
"sync"
"time"
)
// subscriber is a uniformly typed wrapper around Subscriber[T], so
// that debugging facilities can look at active subscribers.
type subscriber interface {
client() *Client
subscribeType() reflect.Type
// dispatch is a function that dispatches the head value in vals to
// a subscriber, while also handling stop and incoming queue write
@ -38,6 +40,7 @@ type subscribeState struct {
dispatcher *worker
write chan any
snapshot chan chan []any
debug hook[subscribedEvent]
outputsMu sync.Mutex
outputs map[reflect.Type]subscriber
@ -64,6 +67,7 @@ func (q *subscribeState) pump(ctx context.Context) {
}
for {
if !vals.Empty() {
popped := time.Now()
val := vals.Peek()
sub := q.subscriberFor(val)
if sub == nil {
@ -71,6 +75,20 @@ func (q *subscribeState) pump(ctx context.Context) {
vals.Drop()
continue
}
if q.debug.active() {
q.debug.run(subscribedEvent{
Event: val,
From: nil, // TODO: plumb more
To: q.client,
Published: time.Time{}, // TODO: plumb
ReachedRouter: time.Time{},
DestinationsPicked: time.Time{},
QueuedAtSubscriber: time.Time{},
NextToDeliver: popped,
})
}
if !sub.dispatch(ctx, &vals, acceptCh) {
return
}