diff --git a/util/eventbus/client.go b/util/eventbus/client.go index a9ef40771..a7a88c0a1 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -59,6 +59,20 @@ func (c *Client) peekSubscribeState() *subscribeState { return c.sub } +func (c *Client) publishTypes() []reflect.Type { + c.mu.Lock() + defer c.mu.Unlock() + ret := make([]reflect.Type, 0, len(c.pub)) + for pub := range c.pub { + ret = append(ret, pub.publishType()) + } + return ret +} + +func (c *Client) subscribeTypes() []reflect.Type { + return c.peekSubscribeState().subscribeTypes() +} + func (c *Client) subscribeState() *subscribeState { c.mu.Lock() defer c.mu.Unlock() diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index d41fc0385..31123e6ba 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -5,6 +5,7 @@ package eventbus import ( "fmt" + "reflect" "slices" "sync" "sync/atomic" @@ -108,6 +109,27 @@ func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] { return newMonitor(client.subscribeState().debug.add) } +// PublishTypes returns the list of types being published by client. +// +// The returned types are those for which the client has obtained a +// [Publisher]. The client may not have ever sent the type in +// question. +func (d *Debugger) PublishTypes(client *Client) []reflect.Type { + d.checkClient(client) + return client.publishTypes() +} + +// SubscribeTypes returns the list of types being subscribed to by +// client. +// +// The returned types are those for which the client has obtained a +// [Subscriber]. The client may not have ever received the type in +// question, and here may not be any publishers of the type. +func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type { + d.checkClient(client) + return client.subscribeTypes() +} + // A hook collects hook functions that can be run as a group. type hook[T any] struct { sync.Mutex diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 60e91edd5..ba17e8548 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -120,6 +120,20 @@ func (s *subscribeState) snapshotQueue() []DeliveredEvent { } } +func (s *subscribeState) subscribeTypes() []reflect.Type { + if s == nil { + return nil + } + + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + ret := make([]reflect.Type, 0, len(s.outputs)) + for t := range s.outputs { + ret = append(ret, t) + } + return ret +} + func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) { s.outputsMu.Lock() defer s.outputsMu.Unlock()