From 48f5c7b892e8d1fdc3bc701eb63c6127c8a48693 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Mon, 24 Mar 2025 09:47:29 -0700 Subject: [PATCH] wgengine/magicsock: subscribe to portmapper updates When an event bus is plumbed in, use it to subscribe and react to port mapping updates instead of using the client's callback mechanism. For now, the callback remains available as a fallback when an event bus is not provided. Updates #15160 Change-Id: I026adca44bf6187692ee87ae8ec02641c12f7774 Signed-off-by: M. J. Fromberger --- wgengine/magicsock/magicsock.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 8190fa145..6a9bd7598 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -138,6 +138,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus + eventClient *eventbus.Client logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -547,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity + // If an event bus is enabled, subscribe to portmapping changes; otherwise + // use the callback mechanism of portmapper.Client. + // + // TODO(creachadair): Remove the switch once the event bus is mandatory. + onPortMapChanged := c.onPortMapChanged + if c.eventBus != nil { + c.eventClient = c.eventBus.Client("magicsock.Conn") + + pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient) + go func() { + defer pmSub.Close() + for { + select { + case <-pmSub.Events(): + c.onPortMapChanged() + case <-pmSub.Done(): + return + } + } + }() + + // Disable the explicit callback from the portmapper, the subscriber handles it. + onPortMapChanged = nil + } + // Don't log the same log messages possibly every few seconds in our // portmapper. portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ") @@ -560,7 +586,7 @@ func NewConn(opts Options) (*Conn, error) { NetMon: opts.NetMon, DebugKnobs: portMapOpts, ControlKnobs: opts.ControlKnobs, - OnChange: c.onPortMapChanged, + OnChange: onPortMapChanged, }) c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP) c.netMon = opts.NetMon @@ -2478,6 +2504,9 @@ func (c *connBind) Close() error { if c.closeDisco6 != nil { c.closeDisco6.Close() } + if c.eventClient != nil { + c.eventClient.Close() + } // Send an empty read result to unblock receiveDERP, // which will then check connBind.Closed. // connBind.Closed takes c.mu, but c.derpRecvCh is buffered.