diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 9f6adbfb7..33c0ae84d 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -16,8 +16,8 @@ import ( // subscribers. type Bus struct { router *worker - write chan any - snapshot chan chan []any + write chan publishedEvent + snapshot chan chan []publishedEvent topicsMu sync.Mutex // guards everything below. topics map[reflect.Type][]*subscribeState @@ -31,8 +31,8 @@ type Bus struct { // and [Bus.Queue] and [Subscribe] to make event subscribers. func New() *Bus { ret := &Bus{ - write: make(chan any), - snapshot: make(chan chan []any), + write: make(chan publishedEvent), + snapshot: make(chan chan []publishedEvent), topics: map[reflect.Type][]*subscribeState{}, clients: set.Set[*Client]{}, } @@ -78,8 +78,8 @@ func (b *Bus) Close() { } func (b *Bus) pump(ctx context.Context) { - var vals queue[any] - acceptCh := func() chan any { + var vals queue[publishedEvent] + acceptCh := func() chan publishedEvent { if vals.Full() { return nil } @@ -92,12 +92,12 @@ func (b *Bus) pump(ctx context.Context) { // queue space for it. for !vals.Empty() { val := vals.Peek() - dests := b.dest(reflect.ValueOf(val).Type()) + dests := b.dest(reflect.ValueOf(val.Event).Type()) for _, d := range dests { deliverOne: for { select { - case d.write <- val: + case d.write <- val.Event: break deliverOne case <-d.closed(): // Queue closed, don't block but continue diff --git a/util/eventbus/client.go b/util/eventbus/client.go index ff8eea6ee..174cc5ea5 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -75,7 +75,7 @@ func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) { c.bus.unsubscribe(t, s) } -func (c *Client) publish() chan<- any { +func (c *Client) publish() chan<- publishedEvent { return c.bus.write } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index b2d0641d9..fdabdcb23 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,8 +5,15 @@ package eventbus import ( "reflect" + "time" ) +type publishedEvent struct { + Event any + From *Client + Published time.Time +} + // publisher is a uniformly typed wrapper around Publisher[T], so that // debugging facilities can look at active publishers. type publisher interface { @@ -52,8 +59,14 @@ func (p *Publisher[T]) Publish(v T) { default: } + evt := publishedEvent{ + Event: v, + From: p.client, + Published: time.Now(), + } + select { - case p.client.publish() <- v: + case p.client.publish() <- evt: case <-p.stop.Done(): } }