From 4c856078e4912a3f3a6d1e31d0db03e423685f47 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 31 Oct 2025 09:58:09 -0700 Subject: [PATCH] util/eventbus: block for the subscriber during SubscribeFunc close (#17642) Prior to this change a SubscriberFunc treated the call to the subscriber's function as the completion of delivery. But that means when we are closing the subscriber, that callback could continue to execute for some time after the close returns. For channel-based subscribers that works OK because the close takes effect before the subscriber ever sees the event. To make the two subscriber types symmetric, we should also wait for the callback to finish before returning. This ensures that a Close of the client means the same thing with both kinds of subscriber. Updates #17638 Change-Id: I82fd31bcaa4e92fab07981ac0e57e6e3a7d9d60b Signed-off-by: M. J. Fromberger --- util/eventbus/bus_test.go | 71 +++++++++++++++++++++++++++++++++----- util/eventbus/subscribe.go | 7 ++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 1e0cd8abf..61728fbfd 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -89,6 +89,61 @@ func TestSubscriberFunc(t *testing.T) { } }) + t.Run("CloseWait", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + time.Sleep(2 * time.Second) + }) + + p := eventbus.Publish[EventA](c) + p.Publish(EventA{12345}) + + synctest.Wait() // subscriber has the event + c.Close() + + // If close does not wait for the subscriber, the test will fail + // because an active goroutine remains in the bubble. + }) + }) + + t.Run("CloseWait/Belated", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + + // This subscriber stalls for a long time, so that when we try to + // close the client it gives up and returns in the timeout condition. + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + time.Sleep(time.Minute) // notably, longer than the wait period + }) + + p := eventbus.Publish[EventA](c) + p.Publish(EventA{12345}) + + synctest.Wait() // subscriber has the event + c.Close() + + // Verify that the logger recorded that Close gave up on the slowpoke. + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `giving up on subscriber for eventbus_test.EventA after \d+s at close.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant %s", got, want) + } + + // Wait for the subscriber to actually finish to clean up the goroutine. + time.Sleep(2 * time.Minute) + }) + }) + t.Run("SubscriberPublishes", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { b := eventbus.New() @@ -440,14 +495,6 @@ func TestMonitor(t *testing.T) { } func TestSlowSubs(t *testing.T) { - swapLogBuf := func(t *testing.T) *bytes.Buffer { - logBuf := new(bytes.Buffer) - save := log.Writer() - log.SetOutput(logBuf) - t.Cleanup(func() { log.SetOutput(save) }) - return logBuf - } - t.Run("Subscriber", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { buf := swapLogBuf(t) @@ -571,3 +618,11 @@ func (q *queueChecker) Got(v any) { func (q *queueChecker) Empty() bool { return len(q.want) == 0 } + +func swapLogBuf(t *testing.T) *bytes.Buffer { + logBuf := new(bytes.Buffer) + save := log.Writer() + log.SetOutput(logBuf) + t.Cleanup(func() { log.SetOutput(save) }) + return logBuf +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 0b821b3f5..03d577f27 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -324,6 +324,13 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE case val := <-acceptCh(): vals.Add(val) case <-ctx.Done(): + // Wait for the callback to be complete, but not forever. + s.slow.Reset(5 * slowSubscriberTimeout) + select { + case <-s.slow.C: + s.logf("giving up on subscriber for %T after %v at close", t, time.Since(start)) + case <-callDone: + } return false case ch := <-snapshot: ch <- vals.Snapshot()