util/eventbus: track additional event context in publish queue

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-05 10:39:06 -08:00 committed by Dave Anderson
parent bf40bc4fa0
commit a1192dd686
3 changed files with 23 additions and 10 deletions

View File

@ -16,8 +16,8 @@ import (
// subscribers. // subscribers.
type Bus struct { type Bus struct {
router *worker router *worker
write chan any write chan publishedEvent
snapshot chan chan []any snapshot chan chan []publishedEvent
topicsMu sync.Mutex // guards everything below. topicsMu sync.Mutex // guards everything below.
topics map[reflect.Type][]*subscribeState topics map[reflect.Type][]*subscribeState
@ -31,8 +31,8 @@ type Bus struct {
// and [Bus.Queue] and [Subscribe] to make event subscribers. // and [Bus.Queue] and [Subscribe] to make event subscribers.
func New() *Bus { func New() *Bus {
ret := &Bus{ ret := &Bus{
write: make(chan any), write: make(chan publishedEvent),
snapshot: make(chan chan []any), snapshot: make(chan chan []publishedEvent),
topics: map[reflect.Type][]*subscribeState{}, topics: map[reflect.Type][]*subscribeState{},
clients: set.Set[*Client]{}, clients: set.Set[*Client]{},
} }
@ -78,8 +78,8 @@ func (b *Bus) Close() {
} }
func (b *Bus) pump(ctx context.Context) { func (b *Bus) pump(ctx context.Context) {
var vals queue[any] var vals queue[publishedEvent]
acceptCh := func() chan any { acceptCh := func() chan publishedEvent {
if vals.Full() { if vals.Full() {
return nil return nil
} }
@ -92,12 +92,12 @@ func (b *Bus) pump(ctx context.Context) {
// queue space for it. // queue space for it.
for !vals.Empty() { for !vals.Empty() {
val := vals.Peek() val := vals.Peek()
dests := b.dest(reflect.ValueOf(val).Type()) dests := b.dest(reflect.ValueOf(val.Event).Type())
for _, d := range dests { for _, d := range dests {
deliverOne: deliverOne:
for { for {
select { select {
case d.write <- val: case d.write <- val.Event:
break deliverOne break deliverOne
case <-d.closed(): case <-d.closed():
// Queue closed, don't block but continue // Queue closed, don't block but continue

View File

@ -75,7 +75,7 @@ func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
c.bus.unsubscribe(t, s) c.bus.unsubscribe(t, s)
} }
func (c *Client) publish() chan<- any { func (c *Client) publish() chan<- publishedEvent {
return c.bus.write return c.bus.write
} }

View File

@ -5,8 +5,15 @@ package eventbus
import ( import (
"reflect" "reflect"
"time"
) )
type publishedEvent struct {
Event any
From *Client
Published time.Time
}
// publisher is a uniformly typed wrapper around Publisher[T], so that // publisher is a uniformly typed wrapper around Publisher[T], so that
// debugging facilities can look at active publishers. // debugging facilities can look at active publishers.
type publisher interface { type publisher interface {
@ -52,8 +59,14 @@ func (p *Publisher[T]) Publish(v T) {
default: default:
} }
evt := publishedEvent{
Event: v,
From: p.client,
Published: time.Now(),
}
select { select {
case p.client.publish() <- v: case p.client.publish() <- evt:
case <-p.stop.Done(): case <-p.stop.Done():
} }
} }