wgengine/magicsock: subscribe to portmapper updates

When an event bus is plumbed in, use it to subscribe and react to port mapping
updates instead of using the client's callback mechanism. For now, the callback
remains available as a fallback when an event bus is not provided.

Updates #15160

Change-Id: I026adca44bf6187692ee87ae8ec02641c12f7774
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-03-24 09:47:29 -07:00
parent 1f3ca21ca1
commit 48f5c7b892

View File

@ -138,6 +138,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
eventClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@ -547,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// If an event bus is enabled, subscribe to portmapping changes; otherwise
// use the callback mechanism of portmapper.Client.
//
// TODO(creachadair): Remove the switch once the event bus is mandatory.
onPortMapChanged := c.onPortMapChanged
if c.eventBus != nil {
c.eventClient = c.eventBus.Client("magicsock.Conn")
pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient)
go func() {
defer pmSub.Close()
for {
select {
case <-pmSub.Events():
c.onPortMapChanged()
case <-pmSub.Done():
return
}
}
}()
// Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil
}
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
@ -560,7 +586,7 @@ func NewConn(opts Options) (*Conn, error) {
NetMon: opts.NetMon,
DebugKnobs: portMapOpts,
ControlKnobs: opts.ControlKnobs,
OnChange: c.onPortMapChanged,
OnChange: onPortMapChanged,
})
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
@ -2478,6 +2504,9 @@ func (c *connBind) Close() error {
if c.closeDisco6 != nil {
c.closeDisco6.Close()
}
if c.eventClient != nil {
c.eventClient.Close()
}
// Send an empty read result to unblock receiveDERP,
// which will then check connBind.Closed.
// connBind.Closed takes c.mu, but c.derpRecvCh is buffered.