Merge 48f5c7b892e8d1fdc3bc701eb63c6127c8a48693 into 1f3ca21ca18f00ad8d0a9984315fdc6582cb0b70

This commit is contained in:
M. J. Fromberger 2025-03-24 14:47:22 -07:00 committed by GitHub
commit 1fdadc04b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -138,6 +138,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
eventClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@ -547,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// If an event bus is enabled, subscribe to portmapping changes; otherwise
// use the callback mechanism of portmapper.Client.
//
// TODO(creachadair): Remove the switch once the event bus is mandatory.
onPortMapChanged := c.onPortMapChanged
if c.eventBus != nil {
c.eventClient = c.eventBus.Client("magicsock.Conn")
pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient)
go func() {
defer pmSub.Close()
for {
select {
case <-pmSub.Events():
c.onPortMapChanged()
case <-pmSub.Done():
return
}
}
}()
// Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil
}
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger.WithPrefix(c.logf, "portmapper: ")
@ -560,7 +586,7 @@ func NewConn(opts Options) (*Conn, error) {
NetMon: opts.NetMon,
DebugKnobs: portMapOpts,
ControlKnobs: opts.ControlKnobs,
OnChange: c.onPortMapChanged,
OnChange: onPortMapChanged,
})
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
@ -2478,6 +2504,9 @@ func (c *connBind) Close() error {
if c.closeDisco6 != nil {
c.closeDisco6.Close()
}
if c.eventClient != nil {
c.eventClient.Close()
}
// Send an empty read result to unblock receiveDERP,
// which will then check connBind.Closed.
// connBind.Closed takes c.mu, but c.derpRecvCh is buffered.