diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 33c0ae84d..b479f3940 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -8,6 +8,7 @@ import ( "reflect" "slices" "sync" + "time" "tailscale.com/util/set" ) @@ -93,11 +94,18 @@ func (b *Bus) pump(ctx context.Context) { for !vals.Empty() { val := vals.Peek() dests := b.dest(reflect.ValueOf(val.Event).Type()) + routed := time.Now() for _, d := range dests { + evt := queuedEvent{ + Event: val.Event, + From: val.From, + Published: val.Published, + Routed: routed, + } deliverOne: for { select { - case d.write <- val.Event: + case d.write <- evt: break deliverOne case <-d.closed(): // Queue closed, don't block but continue diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 85aa1ff6a..71201aa40 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -8,8 +8,16 @@ import ( "fmt" "reflect" "sync" + "time" ) +type queuedEvent struct { + Event any + From *Client + Published time.Time + Routed time.Time +} + // subscriber is a uniformly typed wrapper around Subscriber[T], so // that debugging facilities can look at active subscribers. type subscriber interface { @@ -27,7 +35,7 @@ type subscriber interface { // processing other potential sources of wakeups, which is how we end // up at this awkward type signature and sharing of internal state // through dispatch. - dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool + dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool Close() } @@ -36,8 +44,8 @@ type subscribeState struct { client *Client dispatcher *worker - write chan any - snapshot chan chan []any + write chan queuedEvent + snapshot chan chan []queuedEvent outputsMu sync.Mutex outputs map[reflect.Type]subscriber @@ -46,8 +54,8 @@ type subscribeState struct { func newSubscribeState(c *Client) *subscribeState { ret := &subscribeState{ client: c, - write: make(chan any), - snapshot: make(chan chan []any), + write: make(chan queuedEvent), + snapshot: make(chan chan []queuedEvent), outputs: map[reflect.Type]subscriber{}, } ret.dispatcher = runWorker(ret.pump) @@ -55,8 +63,8 @@ func newSubscribeState(c *Client) *subscribeState { } func (q *subscribeState) pump(ctx context.Context) { - var vals queue[any] - acceptCh := func() chan any { + var vals queue[queuedEvent] + acceptCh := func() chan queuedEvent { if vals.Full() { return nil } @@ -65,7 +73,7 @@ func (q *subscribeState) pump(ctx context.Context) { for { if !vals.Empty() { val := vals.Peek() - sub := q.subscriberFor(val) + sub := q.subscriberFor(val.Event) if sub == nil { // Raced with unsubscribe. vals.Drop() @@ -155,8 +163,8 @@ func (s *Subscriber[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } -func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool { - t := vals.Peek().(T) +func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[queuedEvent], acceptCh func() chan queuedEvent) bool { + t := vals.Peek().Event.(T) for { // Keep the cases in this select in sync with subscribeState.pump // above. The only different should be that this select