util/eventbus: add debugger methods to list pub/sub types

This lets debug tools list the types that clients are wielding, so
that they can build a dataflow graph and other debugging views.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-07 08:16:53 -08:00 committed by Dave Anderson
parent e71e95b841
commit 346a35f612
3 changed files with 50 additions and 0 deletions

View File

@ -59,6 +59,20 @@ func (c *Client) peekSubscribeState() *subscribeState {
return c.sub
}
func (c *Client) publishTypes() []reflect.Type {
c.mu.Lock()
defer c.mu.Unlock()
ret := make([]reflect.Type, 0, len(c.pub))
for pub := range c.pub {
ret = append(ret, pub.publishType())
}
return ret
}
func (c *Client) subscribeTypes() []reflect.Type {
return c.peekSubscribeState().subscribeTypes()
}
func (c *Client) subscribeState() *subscribeState {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -5,6 +5,7 @@ package eventbus
import (
"fmt"
"reflect"
"slices"
"sync"
"sync/atomic"
@ -108,6 +109,27 @@ func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] {
return newMonitor(client.subscribeState().debug.add)
}
// PublishTypes returns the list of types being published by client.
//
// The returned types are those for which the client has obtained a
// [Publisher]. The client may not have ever sent the type in
// question.
func (d *Debugger) PublishTypes(client *Client) []reflect.Type {
d.checkClient(client)
return client.publishTypes()
}
// SubscribeTypes returns the list of types being subscribed to by
// client.
//
// The returned types are those for which the client has obtained a
// [Subscriber]. The client may not have ever received the type in
// question, and here may not be any publishers of the type.
func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
d.checkClient(client)
return client.subscribeTypes()
}
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
sync.Mutex

View File

@ -120,6 +120,20 @@ func (s *subscribeState) snapshotQueue() []DeliveredEvent {
}
}
func (s *subscribeState) subscribeTypes() []reflect.Type {
if s == nil {
return nil
}
s.outputsMu.Lock()
defer s.outputsMu.Unlock()
ret := make([]reflect.Type, 0, len(s.outputs))
for t := range s.outputs {
ret = append(ret, t)
}
return ret
}
func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) {
s.outputsMu.Lock()
defer s.outputsMu.Unlock()