mirror of
https://github.com/tailscale/tailscale.git
synced 2025-04-03 23:05:50 +00:00
189 lines
5.7 KiB
Go
189 lines
5.7 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"cmp"
|
|
"fmt"
|
|
"reflect"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"tailscale.com/tsweb"
|
|
)
|
|
|
|
// A Debugger offers access to a bus's privileged introspection and
|
|
// debugging facilities.
|
|
//
|
|
// The debugger's functionality is intended for humans and their tools
|
|
// to examine and troubleshoot bus clients, and should not be used in
|
|
// normal codepaths.
|
|
//
|
|
// In particular, the debugger provides access to information that is
|
|
// deliberately withheld from bus clients to encourage more robust and
|
|
// maintainable code - for example, the sender of an event, or the
|
|
// event streams of other clients. Please don't use the debugger to
|
|
// circumvent these restrictions for purposes other than debugging.
|
|
type Debugger struct {
|
|
bus *Bus
|
|
}
|
|
|
|
// Clients returns a list of all clients attached to the bus.
|
|
func (d *Debugger) Clients() []*Client {
|
|
ret := d.bus.listClients()
|
|
slices.SortFunc(ret, func(a, b *Client) int {
|
|
return cmp.Compare(a.Name(), b.Name())
|
|
})
|
|
return ret
|
|
}
|
|
|
|
// PublishQueue returns the contents of the publish queue.
|
|
//
|
|
// The publish queue contains events that have been accepted by the
|
|
// bus from Publish() calls, but have not yet been routed to relevant
|
|
// subscribers.
|
|
//
|
|
// This queue is expected to be almost empty in normal operation. A
|
|
// full publish queue indicates that a slow subscriber downstream is
|
|
// causing backpressure and stalling the bus.
|
|
func (d *Debugger) PublishQueue() []PublishedEvent {
|
|
return d.bus.snapshotPublishQueue()
|
|
}
|
|
|
|
// checkClient verifies that client is attached to the same bus as the
|
|
// Debugger, and panics if not.
|
|
func (d *Debugger) checkClient(client *Client) {
|
|
if client.bus != d.bus {
|
|
panic(fmt.Errorf("SubscribeQueue given client belonging to wrong bus"))
|
|
}
|
|
}
|
|
|
|
// SubscribeQueue returns the contents of the given client's subscribe
|
|
// queue.
|
|
//
|
|
// The subscribe queue contains events that are to be delivered to the
|
|
// client, but haven't yet been handed off to the relevant
|
|
// [Subscriber].
|
|
//
|
|
// This queue is expected to be almost empty in normal operation. A
|
|
// full subscribe queue indicates that the client is accepting events
|
|
// too slowly, and may be causing the rest of the bus to stall.
|
|
func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent {
|
|
d.checkClient(client)
|
|
return client.snapshotSubscribeQueue()
|
|
}
|
|
|
|
// WatchBus streams information about all events passing through the
|
|
// bus.
|
|
//
|
|
// Monitored events are delivered in the bus's global publication
|
|
// order (see "Concurrency properties" in the package docs).
|
|
//
|
|
// The caller must consume monitoring events promptly to avoid
|
|
// stalling the bus (see "Expected subscriber behavior" in the package
|
|
// docs).
|
|
func (d *Debugger) WatchBus() *Subscriber[RoutedEvent] {
|
|
return newMonitor(d.bus.routeDebug.add)
|
|
}
|
|
|
|
// WatchPublish streams information about all events published by the
|
|
// given client.
|
|
//
|
|
// Monitored events are delivered in the bus's global publication
|
|
// order (see "Concurrency properties" in the package docs).
|
|
//
|
|
// The caller must consume monitoring events promptly to avoid
|
|
// stalling the bus (see "Expected subscriber behavior" in the package
|
|
// docs).
|
|
func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent] {
|
|
d.checkClient(client)
|
|
return newMonitor(client.publishDebug.add)
|
|
}
|
|
|
|
// WatchSubscribe streams information about all events received by the
|
|
// given client.
|
|
//
|
|
// Monitored events are delivered in the bus's global publication
|
|
// order (see "Concurrency properties" in the package docs).
|
|
//
|
|
// The caller must consume monitoring events promptly to avoid
|
|
// stalling the bus (see "Expected subscriber behavior" in the package
|
|
// docs).
|
|
func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] {
|
|
d.checkClient(client)
|
|
return newMonitor(client.subscribeState().debug.add)
|
|
}
|
|
|
|
// PublishTypes returns the list of types being published by client.
|
|
//
|
|
// The returned types are those for which the client has obtained a
|
|
// [Publisher]. The client may not have ever sent the type in
|
|
// question.
|
|
func (d *Debugger) PublishTypes(client *Client) []reflect.Type {
|
|
d.checkClient(client)
|
|
return client.publishTypes()
|
|
}
|
|
|
|
// SubscribeTypes returns the list of types being subscribed to by
|
|
// client.
|
|
//
|
|
// The returned types are those for which the client has obtained a
|
|
// [Subscriber]. The client may not have ever received the type in
|
|
// question, and here may not be any publishers of the type.
|
|
func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
|
|
d.checkClient(client)
|
|
return client.subscribeTypes()
|
|
}
|
|
|
|
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) }
|
|
|
|
// A hook collects hook functions that can be run as a group.
|
|
type hook[T any] struct {
|
|
sync.Mutex
|
|
fns []hookFn[T]
|
|
}
|
|
|
|
var hookID atomic.Uint64
|
|
|
|
// add registers fn to be called when the hook is run. Returns an
|
|
// unregistration function that removes fn from the hook when called.
|
|
func (h *hook[T]) add(fn func(T)) (remove func()) {
|
|
id := hookID.Add(1)
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
h.fns = append(h.fns, hookFn[T]{id, fn})
|
|
return func() { h.remove(id) }
|
|
}
|
|
|
|
// remove removes the function with the given ID from the hook.
|
|
func (h *hook[T]) remove(id uint64) {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id })
|
|
}
|
|
|
|
// active reports whether any functions are registered with the
|
|
// hook. This can be used to skip expensive work when the hook is
|
|
// inactive.
|
|
func (h *hook[T]) active() bool {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
return len(h.fns) > 0
|
|
}
|
|
|
|
// run calls all registered functions with the value v.
|
|
func (h *hook[T]) run(v T) {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
for _, fn := range h.fns {
|
|
fn.Fn(v)
|
|
}
|
|
}
|
|
|
|
type hookFn[T any] struct {
|
|
ID uint64
|
|
Fn func(T)
|
|
}
|