diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 8190fa145..6a9bd7598 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -138,6 +138,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus + eventClient *eventbus.Client logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -547,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity + // If an event bus is enabled, subscribe to portmapping changes; otherwise + // use the callback mechanism of portmapper.Client. + // + // TODO(creachadair): Remove the switch once the event bus is mandatory. + onPortMapChanged := c.onPortMapChanged + if c.eventBus != nil { + c.eventClient = c.eventBus.Client("magicsock.Conn") + + pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient) + go func() { + defer pmSub.Close() + for { + select { + case <-pmSub.Events(): + c.onPortMapChanged() + case <-pmSub.Done(): + return + } + } + }() + + // Disable the explicit callback from the portmapper, the subscriber handles it. + onPortMapChanged = nil + } + // Don't log the same log messages possibly every few seconds in our // portmapper. portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ") @@ -560,7 +586,7 @@ func NewConn(opts Options) (*Conn, error) { NetMon: opts.NetMon, DebugKnobs: portMapOpts, ControlKnobs: opts.ControlKnobs, - OnChange: c.onPortMapChanged, + OnChange: onPortMapChanged, }) c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP) c.netMon = opts.NetMon @@ -2478,6 +2504,9 @@ func (c *connBind) Close() error { if c.closeDisco6 != nil { c.closeDisco6.Close() } + if c.eventClient != nil { + c.eventClient.Close() + } // Send an empty read result to unblock receiveDERP, // which will then check connBind.Closed. // connBind.Closed takes c.mu, but c.derpRecvCh is buffered.