util/eventbus: use unbounded event queues for DeliveredEvents in subscribers

Bounded DeliveredEvent queues reduce memory usage, but they can deadlock under load.
Two common scenarios trigger deadlocks when the number of events published in a short
period exceeds twice the queue capacity (there's a PublishedEvent queue of the same size):
 - a subscriber tries to acquire the same mutex as held by a publisher, or
 - a subscriber for A events publishes B events

Avoiding these scenarios is not practical and would limit eventbus usefulness and reduce its adoption,
pushing us back to callbacks and other legacy mechanisms. These deadlocks already occurred in customer
devices, dev machines, and tests. They also make it harder to identify and fix slow subscribers and similar
issues we have been seeing recently.

Choosing an arbitrary large fixed queue capacity would only mask the problem. A client running
on a sufficiently large and complex customer environment can exceed any meaningful constant limit,
since event volume depends on the number of peers and other factors. Behavior also changes
based on scheduling of publishers and subscribers by the Go runtime, OS, and hardware, as the issue
is essentially a race between publishers and subscribers. Additionally, on lower-end devices,
an unreasonably high constant capacity is practically the same as using unbounded queues.

Therefore, this PR changes the event queue implementation to be unbounded by default.
The PublishedEvent queue keeps its existing capacity of 16 items, while subscribers'
DeliveredEvent queues become unbounded.

This change fixes known deadlocks and makes the system stable under load,
at the cost of higher potential memory usage, including cases where a queue grows
during an event burst and does not shrink when load decreases.

Further improvements can be implemented in the future as needed.

Fixes #17973
Fixes #18012

Signed-off-by: Nick Khyl <nickk@tailscale.com>
This commit is contained in:
Nick Khyl
2025-11-21 07:53:23 -06:00
committed by Nick Khyl
parent 9245c7131b
commit 1ccece0f78
3 changed files with 16 additions and 39 deletions

View File

@@ -120,7 +120,14 @@ func (b *Bus) Close() {
}
func (b *Bus) pump(ctx context.Context) {
var vals queue[PublishedEvent]
// Limit how many published events we can buffer in the PublishedEvent queue.
//
// Subscribers have unbounded DeliveredEvent queues (see tailscale/tailscale#18020),
// so this queue doesn't need to be unbounded. Keeping it bounded may also help
// catch cases where subscribers stop pumping events completely, such as due to a bug
// in [subscribeState.pump], [Subscriber.dispatch], or [SubscriberFunc.dispatch]).
const maxPublishedEvents = 16
vals := queue[PublishedEvent]{capacity: maxPublishedEvents}
acceptCh := func() chan PublishedEvent {
if vals.Full() {
return nil

View File

@@ -594,23 +594,8 @@ func TestRegression(t *testing.T) {
})
}
const (
maxQueuedItems = 16 // same as in queue.go
totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides
)
func TestPublishWithMutex(t *testing.T) {
t.Run("FewEvents", func(t *testing.T) {
// As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine.
testPublishWithMutex(t, totalMaxQueuedItems)
})
t.Run("ManyEvents", func(t *testing.T) {
// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973")
const N = 3 // N larger than one increases the chance of deadlock.
testPublishWithMutex(t, totalMaxQueuedItems+N)
})
testPublishWithMutex(t, 1024) // arbitrary large number of events
}
// testPublishWithMutex publishes the specified number of events,
@@ -637,13 +622,10 @@ func testPublishWithMutex(t *testing.T, n int) {
var mu sync.Mutex
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
// Acquire the same mutex as the publisher.
// As of 2025-11-20, this can deadlock if n is large enough
// and event queues fill up.
mu.Lock()
mu.Unlock()
// Mark event as received, so we can check for lost events.
// Not required for the deadlock to occur.
exp.Got(e)
})
@@ -666,17 +648,7 @@ func testPublishWithMutex(t *testing.T, n int) {
}
func TestPublishFromSubscriber(t *testing.T) {
t.Run("FewEvents", func(t *testing.T) {
// Publishing up to [totalMaxQueuedItems]-1 is fine.
testPublishFromSubscriber(t, totalMaxQueuedItems-1)
})
t.Run("ManyEvents", func(t *testing.T) {
// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/18012")
// Using 2x to increase chance of deadlock.
testPublishFromSubscriber(t, totalMaxQueuedItems*2)
})
testPublishFromSubscriber(t, 1024) // arbitrary large number of events
}
// testPublishFromSubscriber publishes the specified number of EventA events.
@@ -702,8 +674,6 @@ func testPublishFromSubscriber(t *testing.T, n int) {
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
// Upon receiving EventA, publish EventB.
// As of 2025-11-20, this can deadlock if n is large enough
// and event queues fill up.
pubB.Publish(EventB{Counter: e.Counter})
})
eventbus.SubscribeFunc[EventB](c, func(e EventB) {

View File

@@ -7,18 +7,18 @@ import (
"slices"
)
const maxQueuedItems = 16
// queue is an ordered queue of length up to maxQueuedItems.
// queue is an ordered queue of length up to capacity,
// if capacity is non-zero. Otherwise it is unbounded.
type queue[T any] struct {
vals []T
start int
vals []T
start int
capacity int // zero means unbounded
}
// canAppend reports whether a value can be appended to q.vals without
// shifting values around.
func (q *queue[T]) canAppend() bool {
return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals)
return q.capacity == 0 || cap(q.vals) < q.capacity || len(q.vals) < cap(q.vals)
}
func (q *queue[T]) Full() bool {