net/portmapper: fire an event when a port mapping is updated (#15371)

When an event bus is configured publish an event each time a new port mapping
is updated. Publication is unconditional and occurs prior to calling any
callback that is registered. For now, the callback is still fired in a separate
goroutine as before -- later, those callbacks should become subscriptions to
the published event.

For now, the event type is defined as a new type here in the package. We will
want to move it to a more central package when there are subscribers. The event
wrapper is effectively a subset of the data exported by the internal mapping
interface, but on a concrete struct so the bus plumbing can inspect it.

Updates #15160

Change-Id: I951f212429ac791223af8d75b6eb39a0d2a0053a
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-03-20 15:19:26 -07:00
parent 67fafe69ac
commit 1dcf8ee6a5
5 changed files with 115 additions and 50 deletions

View File

@ -19,6 +19,7 @@ import (
"tailscale.com/net/netmon"
"tailscale.com/syncs"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
// TestIGD is an IGD (Internet Gateway Device) for testing. It supports fake
@ -258,12 +259,16 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) {
}
}
func newTestClient(t *testing.T, igd *TestIGD) *Client {
// newTestClient configures a new test client connected to igd for mapping updates.
// If bus != nil, update events are published to it.
// A cleanup for the resulting client is added to t.
func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client {
var c *Client
c = NewClient(Config{
Logf: t.Logf,
NetMon: netmon.NewStatic(),
ControlKnobs: new(controlknobs.Knobs),
EventBus: bus,
OnChange: func() {
t.Logf("port map changed")
t.Logf("have mapping: %v", c.HaveMapping())
@ -273,5 +278,6 @@ func newTestClient(t *testing.T, igd *TestIGD) *Client {
c.testUPnPPort = igd.TestUPnPPort()
c.netMon = netmon.NewStatic()
c.SetGatewayLookupFunc(testIPAndGateway)
t.Cleanup(func() { c.Close() })
return c
}

View File

@ -85,7 +85,11 @@ const trustServiceStillAvailableDuration = 10 * time.Minute
// Client is a port mapping client.
type Client struct {
eventBus *eventbus.Bus
// The following two fields must either both be nil, or both non-nil.
// Both are immutable after construction.
pubClient *eventbus.Client
updates *eventbus.Publisher[Mapping]
logf logger.Logf
netMon *netmon.Monitor // optional; nil means interfaces will be looked up on-demand
controlKnobs *controlknobs.Knobs
@ -238,13 +242,16 @@ func NewClient(c Config) *Client {
panic("nil netMon")
}
ret := &Client{
eventBus: c.EventBus,
logf: c.Logf,
netMon: c.NetMon,
ipAndGateway: netmon.LikelyHomeRouterIP, // TODO(bradfitz): move this to method on netMon
onChange: c.OnChange,
controlKnobs: c.ControlKnobs,
}
if c.EventBus != nil {
ret.pubClient = c.EventBus.Client("portmapper")
ret.updates = eventbus.Publish[Mapping](ret.pubClient)
}
if ret.logf == nil {
ret.logf = logger.Discard
}
@ -279,6 +286,10 @@ func (c *Client) Close() error {
}
c.closed = true
c.invalidateMappingsLocked(true)
if c.updates != nil {
c.updates.Close()
c.pubClient.Close()
}
// TODO: close some future ever-listening UDP socket(s),
// waiting for multicast announcements from router.
return nil
@ -490,11 +501,30 @@ func (c *Client) createMapping() {
c.runningCreate = false
}()
if _, err := c.createOrGetMapping(ctx); err == nil && c.onChange != nil {
go c.onChange()
} else if err != nil && !IsNoMappingError(err) {
c.logf("createOrGetMapping: %v", err)
mapping, _, err := c.createOrGetMapping(ctx)
if err != nil {
if !IsNoMappingError(err) {
c.logf("createOrGetMapping: %v", err)
}
return
}
c.updates.Publish(Mapping{
External: mapping.External(),
Type: mapping.MappingType(),
GoodUntil: mapping.GoodUntil(),
})
if c.onChange != nil {
go c.onChange()
}
}
// Mapping is an event recording the allocation of a port mapping.
type Mapping struct {
External netip.AddrPort
Type string
GoodUntil time.Time
// TODO(creachadair): Record whether we reused an existing mapping?
}
// wildcardIP is used when the previous external IP is not known for PCP port mapping.
@ -505,19 +535,19 @@ var wildcardIP = netip.MustParseAddr("0.0.0.0")
//
// If no mapping is available, the error will be of type
// NoMappingError; see IsNoMappingError.
func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPort, err error) {
func (c *Client) createOrGetMapping(ctx context.Context) (mapping mapping, external netip.AddrPort, err error) {
if c.debug.disableAll() {
return netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled}
return nil, netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled}
}
if c.debug.DisableUPnP && c.debug.DisablePCP && c.debug.DisablePMP {
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
gw, myIP, ok := c.gatewayAndSelfIP()
if !ok {
return netip.AddrPort{}, NoMappingError{ErrGatewayRange}
return nil, netip.AddrPort{}, NoMappingError{ErrGatewayRange}
}
if gw.Is6() {
return netip.AddrPort{}, NoMappingError{ErrGatewayIPv6}
return nil, netip.AddrPort{}, NoMappingError{ErrGatewayIPv6}
}
now := time.Now()
@ -546,6 +576,17 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
return
}
// TODO(creachadair): This is more subtle than it should be. Ideally we
// would just return the mapping directly, but there are many different
// paths through the function with carefully-balanced locks, and not all
// the paths have a mapping to return. As a workaround, while we're here
// doing cleanup under the lock, grab the final mapping value and return
// it, so the caller does not need to grab the lock again and potentially
// race with a later update. The mapping itself is concurrency-safe.
//
// We should restructure this code so the locks are properly scoped.
mapping = c.mapping
// Print the internal details of each mapping if we're being verbose.
if c.debug.VerboseLogs {
c.logf("successfully obtained mapping: now=%d external=%v type=%s mapping=%s",
@ -571,7 +612,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if now.Before(m.RenewAfter()) {
defer c.mu.Unlock()
reusedExisting = true
return m.External(), nil
return nil, m.External(), nil
}
// The mapping might still be valid, so just try to renew it.
prevPort = m.External().Port()
@ -580,10 +621,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if c.debug.DisablePCP && c.debug.DisablePMP {
c.mu.Unlock()
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return external, nil
return nil, external, nil
}
c.vlogf("fallback to UPnP due to PCP and PMP being disabled failed")
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
// If we just did a Probe (e.g. via netchecker) but didn't
@ -610,16 +651,16 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Unlock()
// fallback to UPnP portmapping
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return external, nil
return nil, external, nil
}
c.vlogf("fallback to UPnP due to no PCP and PMP failed")
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
c.mu.Unlock()
uc, err := c.listenPacket(ctx, "udp4", ":0")
if err != nil {
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
defer uc.Close()
@ -639,7 +680,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
} else {
// Ask for our external address if needed.
@ -648,7 +689,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
}
@ -657,7 +698,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if neterror.TreatAsLostUDP(err) {
err = NoMappingError{ErrNoPortMappingServices}
}
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
}
@ -666,13 +707,13 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
n, src, err := uc.ReadFromUDPAddrPort(res)
if err != nil {
if ctx.Err() == context.Canceled {
return netip.AddrPort{}, err
return nil, netip.AddrPort{}, err
}
// fallback to UPnP portmapping
if mapping, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return mapping, nil
return nil, mapping, nil
}
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
src = netaddr.Unmap(src)
if !src.IsValid() {
@ -688,7 +729,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
continue
}
if pres.ResultCode != 0 {
return netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)}
return nil, netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)}
}
if pres.OpCode == pmpOpReply|pmpOpMapPublicAddr {
m.external = netip.AddrPortFrom(pres.PublicAddr, m.external.Port())
@ -706,7 +747,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
if err != nil {
c.logf("failed to get PCP mapping: %v", err)
// PCP should only have a single packet response
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
pcpMapping.c = c
pcpMapping.internal = m.internal
@ -714,10 +755,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Lock()
defer c.mu.Unlock()
c.mapping = pcpMapping
return pcpMapping.external, nil
return pcpMapping, pcpMapping.external, nil
default:
c.logf("unknown PMP/PCP version number: %d %v", version, res[:n])
return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
}
@ -725,7 +766,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor
c.mu.Lock()
defer c.mu.Unlock()
c.mapping = m
return m.external, nil
return nil, m.external, nil
}
}
}

View File

@ -12,6 +12,7 @@ import (
"time"
"tailscale.com/control/controlknobs"
"tailscale.com/util/eventbus"
)
func TestCreateOrGetMapping(t *testing.T) {
@ -25,7 +26,7 @@ func TestCreateOrGetMapping(t *testing.T) {
if i > 0 {
time.Sleep(100 * time.Millisecond)
}
ext, err := c.createOrGetMapping(context.Background())
_, ext, err := c.createOrGetMapping(context.Background())
t.Logf("Got: %v, %v", ext, err)
}
}
@ -55,7 +56,7 @@ func TestClientProbeThenMap(t *testing.T) {
c.SetLocalPort(1234)
res, err := c.Probe(context.Background())
t.Logf("Probe: %+v, %v", res, err)
ext, err := c.createOrGetMapping(context.Background())
_, ext, err := c.createOrGetMapping(context.Background())
t.Logf("createOrGetMapping: %v, %v", ext, err)
}
@ -66,9 +67,8 @@ func TestProbeIntegration(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on pxp=%v, upnp=%v", c.testPxPPort, c.testUPnPPort)
defer c.Close()
res, err := c.Probe(context.Background())
if err != nil {
@ -101,8 +101,7 @@ func TestPCPIntegration(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
res, err := c.Probe(context.Background())
if err != nil {
t.Fatalf("probe failed: %v", err)
@ -114,7 +113,7 @@ func TestPCPIntegration(t *testing.T) {
t.Fatalf("probe did not see pcp: %+v", res)
}
external, err := c.createOrGetMapping(context.Background())
_, external, err := c.createOrGetMapping(context.Background())
if err != nil {
t.Fatalf("failed to get mapping: %v", err)
}
@ -136,3 +135,29 @@ func TestGetUPnPErrorsMetric(t *testing.T) {
getUPnPErrorsMetric(0)
getUPnPErrorsMetric(-100)
}
func TestUpdateEvent(t *testing.T) {
igd, err := NewTestIGD(t.Logf, TestIGDOptions{PCP: true})
if err != nil {
t.Fatalf("Create test gateway: %v", err)
}
bus := eventbus.New()
defer bus.Close()
sub := eventbus.Subscribe[Mapping](bus.Client("TestUpdateEvent"))
c := newTestClient(t, igd, bus)
if _, err := c.Probe(t.Context()); err != nil {
t.Fatalf("Probe failed: %v", err)
}
c.GetCachedMappingOrStartCreatingOne()
select {
case evt := <-sub.Events():
t.Logf("Received portmap update: %+v", evt)
case <-sub.Done():
t.Error("Subscriber closed prematurely")
case <-time.After(5 * time.Second):
t.Error("Timed out waiting for an update event")
}
}

View File

@ -163,9 +163,8 @@ func TestSelectBestService(t *testing.T) {
Desc: rootDesc,
Control: tt.control,
})
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
// Ensure that we're using the HTTP client that talks to our test IGD server
ctx := context.Background()

View File

@ -586,9 +586,8 @@ func TestGetUPnPPortMapping(t *testing.T) {
},
})
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
c.debug.VerboseLogs = true
@ -689,10 +688,9 @@ func TestGetUPnPPortMapping_LeaseDuration(t *testing.T) {
})
ctx := context.Background()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
// Actually test the UPnP port mapping.
mustProbeUPnP(t, ctx, c)
@ -735,8 +733,7 @@ func TestGetUPnPPortMapping_NoValidServices(t *testing.T) {
Desc: noSupportedServicesRootDesc,
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()
@ -778,8 +775,7 @@ func TestGetUPnPPortMapping_Legacy(t *testing.T) {
},
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()
@ -806,9 +802,8 @@ func TestGetUPnPPortMappingNoResponses(t *testing.T) {
}
defer igd.Close()
c := newTestClient(t, igd)
c := newTestClient(t, igd, nil)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
defer c.Close()
c.debug.VerboseLogs = true
@ -939,8 +934,7 @@ func TestGetUPnPPortMapping_Invalid(t *testing.T) {
},
})
c := newTestClient(t, igd)
defer c.Close()
c := newTestClient(t, igd, nil)
c.debug.VerboseLogs = true
ctx := context.Background()