wgengine: use eventbus.Client.Monitor to simplify subscriber maintenance (#17203)

This commit does not change the order or meaning of any eventbus activity, it
only updates the way the plumbing is set up.

Updates #15160

Change-Id: I40c23b183c2a6a6ea3feec7767c8e5417019fc07
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger
2025-09-19 13:20:50 -07:00
committed by GitHub
parent ca9d795006
commit 2b6bc11586
2 changed files with 56 additions and 71 deletions

View File

@@ -156,7 +156,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant. // struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus eventBus *eventbus.Bus
eventClient *eventbus.Client eventSubs eventbus.Monitor
logf logger.Logf logf logger.Logf
epFunc func([]tailcfg.Endpoint) epFunc func([]tailcfg.Endpoint)
derpActiveFunc func() derpActiveFunc func()
@@ -176,17 +176,10 @@ type Conn struct {
connCtxCancel func() // closes connCtx connCtxCancel func() // closes connCtx
donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call
// These [eventbus.Subscriber] fields are solely accessed by // A publisher for synchronization points to ensure correct ordering of
// consumeEventbusTopics once initialized. // config changes between magicsock and wireguard.
pmSub *eventbus.Subscriber[portmappertype.Mapping]
filterSub *eventbus.Subscriber[FilterUpdate]
nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate]
nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate]
syncSub *eventbus.Subscriber[syncPoint]
syncPub *eventbus.Publisher[syncPoint] syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
// pconn4 and pconn6 are the underlying UDP sockets used to // pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock // send/receive packets for wireguard and other magicsock
@@ -643,26 +636,34 @@ func newConn(logf logger.Logf) *Conn {
// always handled in the order they are received, i.e. the next event is not // always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the // read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed. // [eventbus.Client] is closed.
func (c *Conn) consumeEventbusTopics() { func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
defer close(c.subsDoneCh) // Subscribe calls must return before NewConn otherwise published
// events can be missed.
for { pmSub := eventbus.Subscribe[portmappertype.Mapping](cli)
select { filterSub := eventbus.Subscribe[FilterUpdate](cli)
case <-c.eventClient.Done(): nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli)
return nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli)
case <-c.pmSub.Events(): syncSub := eventbus.Subscribe[syncPoint](cli)
c.onPortMapChanged() allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli)
case filterUpdate := <-c.filterSub.Events(): return func(cli *eventbus.Client) {
c.onFilterUpdate(filterUpdate) for {
case nodeViews := <-c.nodeViewsSub.Events(): select {
c.onNodeViewsUpdate(nodeViews) case <-cli.Done():
case nodeMuts := <-c.nodeMutsSub.Events(): return
c.onNodeMutationsUpdate(nodeMuts) case <-pmSub.Events():
case syncPoint := <-c.syncSub.Events(): c.onPortMapChanged()
c.dlogf("magicsock: received sync point after reconfig") case filterUpdate := <-filterSub.Events():
syncPoint.Signal() c.onFilterUpdate(filterUpdate)
case allocResp := <-c.allocRelayEndpointSub.Events(): case nodeViews := <-nodeViewsSub.Events():
c.onUDPRelayAllocResp(allocResp) c.onNodeViewsUpdate(nodeViews)
case nodeMuts := <-nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
case syncPoint := <-syncSub.Events():
c.dlogf("magicsock: received sync point after reconfig")
syncPoint.Signal()
case allocResp := <-allocRelayEndpointSub.Events():
c.onUDPRelayAllocResp(allocResp)
}
} }
} }
} }
@@ -729,20 +730,12 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity c.noteRecvActivity = opts.NoteRecvActivity
c.eventClient = c.eventBus.Client("magicsock.Conn") // Set up publishers and subscribers. Subscribe calls must return before
// NewConn otherwise published events can be missed.
// Subscribe calls must return before NewConn otherwise published cli := c.eventBus.Client("magicsock.Conn")
// events can be missed. c.syncPub = eventbus.Publish[syncPoint](cli)
c.pmSub = eventbus.Subscribe[portmappertype.Mapping](c.eventClient) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient)
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done() c.donec = c.connCtx.Done()
@@ -3313,14 +3306,13 @@ func (c *connBind) isClosed() bool {
// //
// Only the first close does anything. Any later closes return nil. // Only the first close does anything. Any later closes return nil.
func (c *Conn) Close() error { func (c *Conn) Close() error {
// Close the [eventbus.Client] and wait for Conn.consumeEventbusTopics to // Close the [eventbus.Client] and wait for c.consumeEventbusTopics to
// return. Do this before acquiring c.mu: // return before acquiring c.mu:
// 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can // 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can
// deadlock with c.Close(). // deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against // 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors. // undesirable post/in-progress Conn.Close() behaviors.
c.eventClient.Close() c.eventSubs.Close()
<-c.subsDoneCh
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View File

@@ -93,10 +93,8 @@ const networkLoggerUploadTimeout = 5 * time.Second
type userspaceEngine struct { type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil. // eventBus will eventually become required, but for now may be nil.
// TODO(creachadair): Enforce that this is non-nil at construction. // TODO(creachadair): Enforce that this is non-nil at construction.
eventBus *eventbus.Bus eventBus *eventbus.Bus
eventClient *eventbus.Client eventSubs eventbus.Monitor
changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
logf logger.Logf logf logger.Logf
wgLogger *wglog.Logger // a wireguard-go logging wrapper wgLogger *wglog.Logger // a wireguard-go logging wrapper
@@ -354,11 +352,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
controlKnobs: conf.ControlKnobs, controlKnobs: conf.ControlKnobs,
reconfigureVPN: conf.ReconfigureVPN, reconfigureVPN: conf.ReconfigureVPN,
health: conf.HealthTracker, health: conf.HealthTracker,
subsDoneCh: make(chan struct{}),
} }
e.eventClient = e.eventBus.Client("userspaceEngine")
e.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](e.eventClient)
closePool.addFunc(e.eventClient.Close)
if e.birdClient != nil { if e.birdClient != nil {
// Disable the protocol at start time. // Disable the protocol at start time.
@@ -545,8 +539,8 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
} }
} }
go e.consumeEventbusTopics() cli := e.eventBus.Client("userspaceEngine")
e.eventSubs = cli.Monitor(e.consumeEventbusTopics(cli))
e.logf("Engine created.") e.logf("Engine created.")
return e, nil return e, nil
} }
@@ -556,16 +550,17 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
// always handled in the order they are received, i.e. the next event is not // always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the // read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed. // [eventbus.Client] is closed.
func (e *userspaceEngine) consumeEventbusTopics() { func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
defer close(e.subsDoneCh) changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli)
return func(cli *eventbus.Client) {
for { for {
select { select {
case <-e.eventClient.Done(): case <-cli.Done():
return return
case changeDelta := <-e.changeDeltaSub.Events(): case changeDelta := <-changeDeltaSub.Events():
tshttpproxy.InvalidateCache() tshttpproxy.InvalidateCache()
e.linkChange(&changeDelta) e.linkChange(&changeDelta)
}
} }
} }
} }
@@ -1228,9 +1223,7 @@ func (e *userspaceEngine) RequestStatus() {
} }
func (e *userspaceEngine) Close() { func (e *userspaceEngine) Close() {
e.eventClient.Close() e.eventSubs.Close()
<-e.subsDoneCh
e.mu.Lock() e.mu.Lock()
if e.closing { if e.closing {
e.mu.Unlock() e.mu.Unlock()