mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-25 20:23:43 +00:00
util/eventbus: block for the subscriber during SubscribeFunc close (#17642)
Prior to this change a SubscriberFunc treated the call to the subscriber's function as the completion of delivery. But that means when we are closing the subscriber, that callback could continue to execute for some time after the close returns. For channel-based subscribers that works OK because the close takes effect before the subscriber ever sees the event. To make the two subscriber types symmetric, we should also wait for the callback to finish before returning. This ensures that a Close of the client means the same thing with both kinds of subscriber. Updates #17638 Change-Id: I82fd31bcaa4e92fab07981ac0e57e6e3a7d9d60b Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
@@ -89,6 +89,61 @@ func TestSubscriberFunc(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CloseWait", func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
b := eventbus.New()
|
||||
defer b.Close()
|
||||
|
||||
c := b.Client(t.Name())
|
||||
|
||||
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
|
||||
time.Sleep(2 * time.Second)
|
||||
})
|
||||
|
||||
p := eventbus.Publish[EventA](c)
|
||||
p.Publish(EventA{12345})
|
||||
|
||||
synctest.Wait() // subscriber has the event
|
||||
c.Close()
|
||||
|
||||
// If close does not wait for the subscriber, the test will fail
|
||||
// because an active goroutine remains in the bubble.
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("CloseWait/Belated", func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
buf := swapLogBuf(t)
|
||||
|
||||
b := eventbus.New()
|
||||
defer b.Close()
|
||||
|
||||
c := b.Client(t.Name())
|
||||
|
||||
// This subscriber stalls for a long time, so that when we try to
|
||||
// close the client it gives up and returns in the timeout condition.
|
||||
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
|
||||
time.Sleep(time.Minute) // notably, longer than the wait period
|
||||
})
|
||||
|
||||
p := eventbus.Publish[EventA](c)
|
||||
p.Publish(EventA{12345})
|
||||
|
||||
synctest.Wait() // subscriber has the event
|
||||
c.Close()
|
||||
|
||||
// Verify that the logger recorded that Close gave up on the slowpoke.
|
||||
want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` +
|
||||
`giving up on subscriber for eventbus_test.EventA after \d+s at close.*`)
|
||||
if got := buf.String(); !want.MatchString(got) {
|
||||
t.Errorf("Wrong log output\ngot: %q\nwant %s", got, want)
|
||||
}
|
||||
|
||||
// Wait for the subscriber to actually finish to clean up the goroutine.
|
||||
time.Sleep(2 * time.Minute)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("SubscriberPublishes", func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
b := eventbus.New()
|
||||
@@ -440,14 +495,6 @@ func TestMonitor(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSlowSubs(t *testing.T) {
|
||||
swapLogBuf := func(t *testing.T) *bytes.Buffer {
|
||||
logBuf := new(bytes.Buffer)
|
||||
save := log.Writer()
|
||||
log.SetOutput(logBuf)
|
||||
t.Cleanup(func() { log.SetOutput(save) })
|
||||
return logBuf
|
||||
}
|
||||
|
||||
t.Run("Subscriber", func(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
buf := swapLogBuf(t)
|
||||
@@ -571,3 +618,11 @@ func (q *queueChecker) Got(v any) {
|
||||
func (q *queueChecker) Empty() bool {
|
||||
return len(q.want) == 0
|
||||
}
|
||||
|
||||
func swapLogBuf(t *testing.T) *bytes.Buffer {
|
||||
logBuf := new(bytes.Buffer)
|
||||
save := log.Writer()
|
||||
log.SetOutput(logBuf)
|
||||
t.Cleanup(func() { log.SetOutput(save) })
|
||||
return logBuf
|
||||
}
|
||||
|
||||
@@ -324,6 +324,13 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
|
||||
case val := <-acceptCh():
|
||||
vals.Add(val)
|
||||
case <-ctx.Done():
|
||||
// Wait for the callback to be complete, but not forever.
|
||||
s.slow.Reset(5 * slowSubscriberTimeout)
|
||||
select {
|
||||
case <-s.slow.C:
|
||||
s.logf("giving up on subscriber for %T after %v at close", t, time.Since(start))
|
||||
case <-callDone:
|
||||
}
|
||||
return false
|
||||
case ch := <-snapshot:
|
||||
ch <- vals.Snapshot()
|
||||
|
||||
Reference in New Issue
Block a user