From ba11b5deb5e6a85b4868a79c8229da561b1ca35e Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 20 Mar 2025 15:19:26 -0700 Subject: [PATCH] net/portmapper: fire an event when a port mapping is updated (#15371) When an event bus is configured publish an event each time a new port mapping is updated. Publication is unconditional and occurs prior to calling any callback that is registered. For now, the callback is still fired in a separate goroutine as before -- later, those callbacks should become subscriptions to the published event. For now, the event type is defined as a new type here in the package. We will want to move it to a more central package when there are subscribers. The event wrapper is effectively a subset of the data exported by the internal mapping interface, but on a concrete struct so the bus plumbing can inspect it. Updates #15160 Change-Id: I951f212429ac791223af8d75b6eb39a0d2a0053a Signed-off-by: M. J. Fromberger --- net/portmapper/igd_test.go | 8 ++- net/portmapper/portmapper.go | 97 ++++++++++++++++++++++--------- net/portmapper/portmapper_test.go | 39 ++++++++++--- net/portmapper/select_test.go | 3 +- net/portmapper/upnp_test.go | 18 ++---- 5 files changed, 115 insertions(+), 50 deletions(-) diff --git a/net/portmapper/igd_test.go b/net/portmapper/igd_test.go index 67d873c35..319115896 100644 --- a/net/portmapper/igd_test.go +++ b/net/portmapper/igd_test.go @@ -19,6 +19,7 @@ import ( "tailscale.com/net/netmon" "tailscale.com/syncs" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) // TestIGD is an IGD (Internet Gateway Device) for testing. It supports fake @@ -258,12 +259,16 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) { } } -func newTestClient(t *testing.T, igd *TestIGD) *Client { +// newTestClient configures a new test client connected to igd for mapping updates. +// If bus != nil, update events are published to it. +// A cleanup for the resulting client is added to t. +func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client { var c *Client c = NewClient(Config{ Logf: t.Logf, NetMon: netmon.NewStatic(), ControlKnobs: new(controlknobs.Knobs), + EventBus: bus, OnChange: func() { t.Logf("port map changed") t.Logf("have mapping: %v", c.HaveMapping()) @@ -273,5 +278,6 @@ func newTestClient(t *testing.T, igd *TestIGD) *Client { c.testUPnPPort = igd.TestUPnPPort() c.netMon = netmon.NewStatic() c.SetGatewayLookupFunc(testIPAndGateway) + t.Cleanup(func() { c.Close() }) return c } diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go index 8fe9ba493..f95d6503a 100644 --- a/net/portmapper/portmapper.go +++ b/net/portmapper/portmapper.go @@ -85,7 +85,11 @@ const trustServiceStillAvailableDuration = 10 * time.Minute // Client is a port mapping client. type Client struct { - eventBus *eventbus.Bus + // The following two fields must either both be nil, or both non-nil. + // Both are immutable after construction. + pubClient *eventbus.Client + updates *eventbus.Publisher[Mapping] + logf logger.Logf netMon *netmon.Monitor // optional; nil means interfaces will be looked up on-demand controlKnobs *controlknobs.Knobs @@ -238,13 +242,16 @@ func NewClient(c Config) *Client { panic("nil netMon") } ret := &Client{ - eventBus: c.EventBus, logf: c.Logf, netMon: c.NetMon, ipAndGateway: netmon.LikelyHomeRouterIP, // TODO(bradfitz): move this to method on netMon onChange: c.OnChange, controlKnobs: c.ControlKnobs, } + if c.EventBus != nil { + ret.pubClient = c.EventBus.Client("portmapper") + ret.updates = eventbus.Publish[Mapping](ret.pubClient) + } if ret.logf == nil { ret.logf = logger.Discard } @@ -279,6 +286,10 @@ func (c *Client) Close() error { } c.closed = true c.invalidateMappingsLocked(true) + if c.updates != nil { + c.updates.Close() + c.pubClient.Close() + } // TODO: close some future ever-listening UDP socket(s), // waiting for multicast announcements from router. return nil @@ -490,11 +501,30 @@ func (c *Client) createMapping() { c.runningCreate = false }() - if _, err := c.createOrGetMapping(ctx); err == nil && c.onChange != nil { - go c.onChange() - } else if err != nil && !IsNoMappingError(err) { - c.logf("createOrGetMapping: %v", err) + mapping, _, err := c.createOrGetMapping(ctx) + if err != nil { + if !IsNoMappingError(err) { + c.logf("createOrGetMapping: %v", err) + } + return } + c.updates.Publish(Mapping{ + External: mapping.External(), + Type: mapping.MappingType(), + GoodUntil: mapping.GoodUntil(), + }) + if c.onChange != nil { + go c.onChange() + } +} + +// Mapping is an event recording the allocation of a port mapping. +type Mapping struct { + External netip.AddrPort + Type string + GoodUntil time.Time + + // TODO(creachadair): Record whether we reused an existing mapping? } // wildcardIP is used when the previous external IP is not known for PCP port mapping. @@ -505,19 +535,19 @@ var wildcardIP = netip.MustParseAddr("0.0.0.0") // // If no mapping is available, the error will be of type // NoMappingError; see IsNoMappingError. -func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPort, err error) { +func (c *Client) createOrGetMapping(ctx context.Context) (mapping mapping, external netip.AddrPort, err error) { if c.debug.disableAll() { - return netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled} + return nil, netip.AddrPort{}, NoMappingError{ErrPortMappingDisabled} } if c.debug.DisableUPnP && c.debug.DisablePCP && c.debug.DisablePMP { - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } gw, myIP, ok := c.gatewayAndSelfIP() if !ok { - return netip.AddrPort{}, NoMappingError{ErrGatewayRange} + return nil, netip.AddrPort{}, NoMappingError{ErrGatewayRange} } if gw.Is6() { - return netip.AddrPort{}, NoMappingError{ErrGatewayIPv6} + return nil, netip.AddrPort{}, NoMappingError{ErrGatewayIPv6} } now := time.Now() @@ -546,6 +576,17 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor return } + // TODO(creachadair): This is more subtle than it should be. Ideally we + // would just return the mapping directly, but there are many different + // paths through the function with carefully-balanced locks, and not all + // the paths have a mapping to return. As a workaround, while we're here + // doing cleanup under the lock, grab the final mapping value and return + // it, so the caller does not need to grab the lock again and potentially + // race with a later update. The mapping itself is concurrency-safe. + // + // We should restructure this code so the locks are properly scoped. + mapping = c.mapping + // Print the internal details of each mapping if we're being verbose. if c.debug.VerboseLogs { c.logf("successfully obtained mapping: now=%d external=%v type=%s mapping=%s", @@ -571,7 +612,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if now.Before(m.RenewAfter()) { defer c.mu.Unlock() reusedExisting = true - return m.External(), nil + return nil, m.External(), nil } // The mapping might still be valid, so just try to renew it. prevPort = m.External().Port() @@ -580,10 +621,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if c.debug.DisablePCP && c.debug.DisablePMP { c.mu.Unlock() if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok { - return external, nil + return nil, external, nil } c.vlogf("fallback to UPnP due to PCP and PMP being disabled failed") - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } // If we just did a Probe (e.g. via netchecker) but didn't @@ -610,16 +651,16 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor c.mu.Unlock() // fallback to UPnP portmapping if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok { - return external, nil + return nil, external, nil } c.vlogf("fallback to UPnP due to no PCP and PMP failed") - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } c.mu.Unlock() uc, err := c.listenPacket(ctx, "udp4", ":0") if err != nil { - return netip.AddrPort{}, err + return nil, netip.AddrPort{}, err } defer uc.Close() @@ -639,7 +680,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if neterror.TreatAsLostUDP(err) { err = NoMappingError{ErrNoPortMappingServices} } - return netip.AddrPort{}, err + return nil, netip.AddrPort{}, err } } else { // Ask for our external address if needed. @@ -648,7 +689,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if neterror.TreatAsLostUDP(err) { err = NoMappingError{ErrNoPortMappingServices} } - return netip.AddrPort{}, err + return nil, netip.AddrPort{}, err } } @@ -657,7 +698,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if neterror.TreatAsLostUDP(err) { err = NoMappingError{ErrNoPortMappingServices} } - return netip.AddrPort{}, err + return nil, netip.AddrPort{}, err } } @@ -666,13 +707,13 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor n, src, err := uc.ReadFromUDPAddrPort(res) if err != nil { if ctx.Err() == context.Canceled { - return netip.AddrPort{}, err + return nil, netip.AddrPort{}, err } // fallback to UPnP portmapping if mapping, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok { - return mapping, nil + return nil, mapping, nil } - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } src = netaddr.Unmap(src) if !src.IsValid() { @@ -688,7 +729,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor continue } if pres.ResultCode != 0 { - return netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)} + return nil, netip.AddrPort{}, NoMappingError{fmt.Errorf("PMP response Op=0x%x,Res=0x%x", pres.OpCode, pres.ResultCode)} } if pres.OpCode == pmpOpReply|pmpOpMapPublicAddr { m.external = netip.AddrPortFrom(pres.PublicAddr, m.external.Port()) @@ -706,7 +747,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor if err != nil { c.logf("failed to get PCP mapping: %v", err) // PCP should only have a single packet response - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } pcpMapping.c = c pcpMapping.internal = m.internal @@ -714,10 +755,10 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor c.mu.Lock() defer c.mu.Unlock() c.mapping = pcpMapping - return pcpMapping.external, nil + return pcpMapping, pcpMapping.external, nil default: c.logf("unknown PMP/PCP version number: %d %v", version, res[:n]) - return netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} + return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices} } } @@ -725,7 +766,7 @@ func (c *Client) createOrGetMapping(ctx context.Context) (external netip.AddrPor c.mu.Lock() defer c.mu.Unlock() c.mapping = m - return m.external, nil + return nil, m.external, nil } } } diff --git a/net/portmapper/portmapper_test.go b/net/portmapper/portmapper_test.go index c815f21d1..32302e461 100644 --- a/net/portmapper/portmapper_test.go +++ b/net/portmapper/portmapper_test.go @@ -12,6 +12,7 @@ import ( "time" "tailscale.com/control/controlknobs" + "tailscale.com/util/eventbus" ) func TestCreateOrGetMapping(t *testing.T) { @@ -25,7 +26,7 @@ func TestCreateOrGetMapping(t *testing.T) { if i > 0 { time.Sleep(100 * time.Millisecond) } - ext, err := c.createOrGetMapping(context.Background()) + _, ext, err := c.createOrGetMapping(context.Background()) t.Logf("Got: %v, %v", ext, err) } } @@ -55,7 +56,7 @@ func TestClientProbeThenMap(t *testing.T) { c.SetLocalPort(1234) res, err := c.Probe(context.Background()) t.Logf("Probe: %+v, %v", res, err) - ext, err := c.createOrGetMapping(context.Background()) + _, ext, err := c.createOrGetMapping(context.Background()) t.Logf("createOrGetMapping: %v, %v", ext, err) } @@ -66,9 +67,8 @@ func TestProbeIntegration(t *testing.T) { } defer igd.Close() - c := newTestClient(t, igd) + c := newTestClient(t, igd, nil) t.Logf("Listening on pxp=%v, upnp=%v", c.testPxPPort, c.testUPnPPort) - defer c.Close() res, err := c.Probe(context.Background()) if err != nil { @@ -101,8 +101,7 @@ func TestPCPIntegration(t *testing.T) { } defer igd.Close() - c := newTestClient(t, igd) - defer c.Close() + c := newTestClient(t, igd, nil) res, err := c.Probe(context.Background()) if err != nil { t.Fatalf("probe failed: %v", err) @@ -114,7 +113,7 @@ func TestPCPIntegration(t *testing.T) { t.Fatalf("probe did not see pcp: %+v", res) } - external, err := c.createOrGetMapping(context.Background()) + _, external, err := c.createOrGetMapping(context.Background()) if err != nil { t.Fatalf("failed to get mapping: %v", err) } @@ -136,3 +135,29 @@ func TestGetUPnPErrorsMetric(t *testing.T) { getUPnPErrorsMetric(0) getUPnPErrorsMetric(-100) } + +func TestUpdateEvent(t *testing.T) { + igd, err := NewTestIGD(t.Logf, TestIGDOptions{PCP: true}) + if err != nil { + t.Fatalf("Create test gateway: %v", err) + } + + bus := eventbus.New() + defer bus.Close() + + sub := eventbus.Subscribe[Mapping](bus.Client("TestUpdateEvent")) + c := newTestClient(t, igd, bus) + if _, err := c.Probe(t.Context()); err != nil { + t.Fatalf("Probe failed: %v", err) + } + c.GetCachedMappingOrStartCreatingOne() + + select { + case evt := <-sub.Events(): + t.Logf("Received portmap update: %+v", evt) + case <-sub.Done(): + t.Error("Subscriber closed prematurely") + case <-time.After(5 * time.Second): + t.Error("Timed out waiting for an update event") + } +} diff --git a/net/portmapper/select_test.go b/net/portmapper/select_test.go index 9e99c9a9d..6c210d70a 100644 --- a/net/portmapper/select_test.go +++ b/net/portmapper/select_test.go @@ -163,9 +163,8 @@ func TestSelectBestService(t *testing.T) { Desc: rootDesc, Control: tt.control, }) - c := newTestClient(t, igd) + c := newTestClient(t, igd, nil) t.Logf("Listening on upnp=%v", c.testUPnPPort) - defer c.Close() // Ensure that we're using the HTTP client that talks to our test IGD server ctx := context.Background() diff --git a/net/portmapper/upnp_test.go b/net/portmapper/upnp_test.go index 0c296813f..1e1278abc 100644 --- a/net/portmapper/upnp_test.go +++ b/net/portmapper/upnp_test.go @@ -586,9 +586,8 @@ func TestGetUPnPPortMapping(t *testing.T) { }, }) - c := newTestClient(t, igd) + c := newTestClient(t, igd, nil) t.Logf("Listening on upnp=%v", c.testUPnPPort) - defer c.Close() c.debug.VerboseLogs = true @@ -689,10 +688,9 @@ func TestGetUPnPPortMapping_LeaseDuration(t *testing.T) { }) ctx := context.Background() - c := newTestClient(t, igd) + c := newTestClient(t, igd, nil) c.debug.VerboseLogs = true t.Logf("Listening on upnp=%v", c.testUPnPPort) - defer c.Close() // Actually test the UPnP port mapping. mustProbeUPnP(t, ctx, c) @@ -735,8 +733,7 @@ func TestGetUPnPPortMapping_NoValidServices(t *testing.T) { Desc: noSupportedServicesRootDesc, }) - c := newTestClient(t, igd) - defer c.Close() + c := newTestClient(t, igd, nil) c.debug.VerboseLogs = true ctx := context.Background() @@ -778,8 +775,7 @@ func TestGetUPnPPortMapping_Legacy(t *testing.T) { }, }) - c := newTestClient(t, igd) - defer c.Close() + c := newTestClient(t, igd, nil) c.debug.VerboseLogs = true ctx := context.Background() @@ -806,9 +802,8 @@ func TestGetUPnPPortMappingNoResponses(t *testing.T) { } defer igd.Close() - c := newTestClient(t, igd) + c := newTestClient(t, igd, nil) t.Logf("Listening on upnp=%v", c.testUPnPPort) - defer c.Close() c.debug.VerboseLogs = true @@ -939,8 +934,7 @@ func TestGetUPnPPortMapping_Invalid(t *testing.T) { }, }) - c := newTestClient(t, igd) - defer c.Close() + c := newTestClient(t, igd, nil) c.debug.VerboseLogs = true ctx := context.Background()