From 866614202c96fa1e5116116acf50834ee787ed6c Mon Sep 17 00:00:00 2001 From: Nick Khyl Date: Fri, 13 Jun 2025 18:08:22 -0500 Subject: [PATCH] util/eventbus: remove redundant code from eventbus.Publish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit eventbus.Publish() calls newPublisher(), which in turn invokes (*Client).addPublisher(). That method adds the new publisher to c.pub, so we don’t need to add it again in eventbus.Publish. Updates #cleanup Signed-off-by: Nick Khyl --- util/eventbus/client.go | 13 +++++++------ util/eventbus/publish.go | 6 +----- util/eventbus/subscribe.go | 14 +++++--------- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/util/eventbus/client.go b/util/eventbus/client.go index a7a88c0a1..f4261b13c 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -113,15 +113,16 @@ func (c *Client) shouldPublish(t reflect.Type) bool { // Subscribe requests delivery of events of type T through the given // Queue. Panics if the queue already has a subscriber for T. func Subscribe[T any](c *Client) *Subscriber[T] { - return newSubscriber[T](c.subscribeState()) + r := c.subscribeState() + s := newSubscriber[T](r) + r.addSubscriber(s) + return s } // Publisher returns a publisher for event type T using the given // client. func Publish[T any](c *Client) *Publisher[T] { - ret := newPublisher[T](c) - c.mu.Lock() - defer c.mu.Unlock() - c.pub.Add(ret) - return ret + p := newPublisher[T](c) + c.addPublisher(p) + return p } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 9897114b6..4a4bdfb7e 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -21,11 +21,7 @@ type Publisher[T any] struct { } func newPublisher[T any](c *Client) *Publisher[T] { - ret := &Publisher[T]{ - client: c, - } - c.addPublisher(ret) - return ret + return &Publisher[T]{client: c} } // Close closes the publisher. diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ba17e8548..ee534781a 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -91,7 +91,7 @@ func (q *subscribeState) pump(ctx context.Context) { } } else { // Keep the cases in this select in sync with - // Subscriber.dispatch below. The only different should be + // Subscriber.dispatch below. The only difference should be // that this select doesn't deliver queued values to // anyone, and unconditionally accepts new values. select { @@ -134,9 +134,10 @@ func (s *subscribeState) subscribeTypes() []reflect.Type { return ret } -func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) { +func (s *subscribeState) addSubscriber(sub subscriber) { s.outputsMu.Lock() defer s.outputsMu.Unlock() + t := sub.subscribeType() if s.outputs[t] != nil { panic(fmt.Errorf("double subscription for event %s", t)) } @@ -183,15 +184,10 @@ type Subscriber[T any] struct { } func newSubscriber[T any](r *subscribeState) *Subscriber[T] { - t := reflect.TypeFor[T]() - - ret := &Subscriber[T]{ + return &Subscriber[T]{ read: make(chan T), - unregister: func() { r.deleteSubscriber(t) }, + unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, } - r.addSubscriber(t, ret) - - return ret } func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {