net/netcheck,wgengine/magicsock: reduce coupling between netcheck and magicsock

Netcheck no longer performs I/O itself, instead it makes requests via
SendPacket and expects users to route reply traffic to
ReceiveSTUNPacket.

Netcheck gains a Standalone function that stands up sockets and
goroutines to implement I/O when used in a standalone fashion.

Magicsock now unconditionally routes STUN traffic to the netcheck.Client
that it hosts, and plumbs the send packet sink.

The CLI is updated to make use of the Standalone mode.

Fixes #8723

Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
James Tucker
2023-07-26 17:01:32 -07:00
committed by James Tucker
parent d5ac18d2c4
commit de8e55fda6
5 changed files with 156 additions and 143 deletions

View File

@@ -52,7 +52,6 @@ func runNetcheck(ctx context.Context, args []string) error {
return err return err
} }
c := &netcheck.Client{ c := &netcheck.Client{
UDPBindAddr: envknob.String("TS_DEBUG_NETCHECK_UDP_BIND"),
PortMapper: portmapper.NewClient(logf, netMon, nil, nil), PortMapper: portmapper.NewClient(logf, netMon, nil, nil),
UseDNSCache: false, // always resolve, don't cache UseDNSCache: false, // always resolve, don't cache
} }
@@ -67,6 +66,10 @@ func runNetcheck(ctx context.Context, args []string) error {
fmt.Fprintln(Stderr, "# Warning: this JSON format is not yet considered a stable interface") fmt.Fprintln(Stderr, "# Warning: this JSON format is not yet considered a stable interface")
} }
if err := c.Standalone(ctx, envknob.String("TS_DEBUG_NETCHECK_UDP_BIND")); err != nil {
fmt.Fprintln(Stderr, "netcheck: UDP test failure:", err)
}
dm, err := localClient.CurrentDERPMap(ctx) dm, err := localClient.CurrentDERPMap(ctx)
noRegions := dm != nil && len(dm.Regions) == 0 noRegions := dm != nil && len(dm.Regions) == 0
if noRegions { if noRegions {

View File

@@ -27,7 +27,6 @@ import (
"tailscale.com/envknob" "tailscale.com/envknob"
"tailscale.com/net/dnscache" "tailscale.com/net/dnscache"
"tailscale.com/net/interfaces" "tailscale.com/net/interfaces"
"tailscale.com/net/netaddr"
"tailscale.com/net/neterror" "tailscale.com/net/neterror"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/net/netns" "tailscale.com/net/netns"
@@ -40,7 +39,6 @@ import (
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/nettype" "tailscale.com/types/nettype"
"tailscale.com/types/opt" "tailscale.com/types/opt"
"tailscale.com/types/ptr"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/cmpx" "tailscale.com/util/cmpx"
@@ -154,7 +152,12 @@ func cloneDurationMap(m map[int]time.Duration) map[int]time.Duration {
return m2 return m2
} }
// Client generates a netcheck Report. // Client generates Reports describing the result of both passive and active
// network configuration probing. It provides two different modes of report, a
// full report (see MakeNextReportFull) and a more lightweight incremental
// report. The client must be provided with SendPacket in order to perform
// active probes, and must receive STUN packet replies via ReceiveSTUNPacket.
// Client can be used in a standalone fashion via the Standalone method.
type Client struct { type Client struct {
// Verbose enables verbose logging. // Verbose enables verbose logging.
Verbose bool Verbose bool
@@ -173,23 +176,15 @@ type Client struct {
// TimeNow, if non-nil, is used instead of time.Now. // TimeNow, if non-nil, is used instead of time.Now.
TimeNow func() time.Time TimeNow func() time.Time
// GetSTUNConn4 optionally provides a func to return the // SendPacket is required to send a packet to the specified address. For
// connection to use for sending & receiving IPv4 packets. If // convenience it shares a signature with WriteToUDPAddrPort.
// nil, an ephemeral one is created as needed. SendPacket func([]byte, netip.AddrPort) (int, error)
GetSTUNConn4 func() STUNConn
// GetSTUNConn6 is like GetSTUNConn4, but for IPv6.
GetSTUNConn6 func() STUNConn
// SkipExternalNetwork controls whether the client should not try // SkipExternalNetwork controls whether the client should not try
// to reach things other than localhost. This is set to true // to reach things other than localhost. This is set to true
// in tests to avoid probing the local LAN's router, etc. // in tests to avoid probing the local LAN's router, etc.
SkipExternalNetwork bool SkipExternalNetwork bool
// UDPBindAddr, if non-empty, is the address to listen on for UDP.
// It defaults to ":0".
UDPBindAddr string
// PortMapper, if non-nil, is used for portmap queries. // PortMapper, if non-nil, is used for portmap queries.
// If nil, portmap discovery is not done. // If nil, portmap discovery is not done.
PortMapper *portmapper.Client // lazily initialized on first use PortMapper *portmapper.Client // lazily initialized on first use
@@ -216,13 +211,6 @@ type Client struct {
resolver *dnscache.Resolver // only set if UseDNSCache is true resolver *dnscache.Resolver // only set if UseDNSCache is true
} }
// STUNConn is the interface required by the netcheck Client when
// reusing an existing UDP connection.
type STUNConn interface {
WriteToUDPAddrPort([]byte, netip.AddrPort) (int, error)
ReadFromUDPAddrPort([]byte) (int, netip.AddrPort, error)
}
func (c *Client) enoughRegions() int { func (c *Client) enoughRegions() int {
if c.testEnoughRegions > 0 { if c.testEnoughRegions > 0 {
return c.testEnoughRegions return c.testEnoughRegions
@@ -282,6 +270,10 @@ func (c *Client) MakeNextReportFull() {
c.nextFull = true c.nextFull = true
} }
// ReceiveSTUNPacket must be called when a STUN packet is received as a reply to
// packet the client sent using SendPacket. In Standalone this is performed by
// the loop started by Standalone, in normal operation in tailscaled incoming
// STUN replies are routed to this method.
func (c *Client) ReceiveSTUNPacket(pkt []byte, src netip.AddrPort) { func (c *Client) ReceiveSTUNPacket(pkt []byte, src netip.AddrPort) {
c.vlogf("received STUN packet from %s", src) c.vlogf("received STUN packet from %s", src)
@@ -528,53 +520,12 @@ func nodeMight4(n *tailcfg.DERPNode) bool {
return ip.Is4() return ip.Is4()
} }
type packetReaderFromCloser interface {
ReadFromUDPAddrPort([]byte) (int, netip.AddrPort, error)
io.Closer
}
// readPackets reads STUN packets from pc until there's an error or ctx is done.
// In either case, it closes pc.
func (c *Client) readPackets(ctx context.Context, pc packetReaderFromCloser) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
case <-done:
}
pc.Close()
}()
var buf [64 << 10]byte
for {
n, addr, err := pc.ReadFromUDPAddrPort(buf[:])
if err != nil {
if ctx.Err() != nil {
return
}
c.logf("ReadFrom: %v", err)
return
}
pkt := buf[:n]
if !stun.Is(pkt) {
continue
}
if ap := netaddr.Unmap(addr); ap.IsValid() {
c.ReceiveSTUNPacket(pkt, ap)
}
}
}
// reportState holds the state for a single invocation of Client.GetReport. // reportState holds the state for a single invocation of Client.GetReport.
type reportState struct { type reportState struct {
c *Client c *Client
hairTX stun.TxID hairTX stun.TxID
gotHairSTUN chan netip.AddrPort gotHairSTUN chan netip.AddrPort
hairTimeout chan struct{} // closed on timeout hairTimeout chan struct{} // closed on timeout
pc4 STUNConn
pc6 STUNConn
pc4Hair nettype.PacketConn pc4Hair nettype.PacketConn
incremental bool // doing a lite, follow-up netcheck incremental bool // doing a lite, follow-up netcheck
stopProbeCh chan struct{} stopProbeCh chan struct{}
@@ -785,13 +736,6 @@ func newReport() *Report {
} }
} }
func (c *Client) udpBindAddr() string {
if v := c.UDPBindAddr; v != "" {
return v
}
return ":0"
}
// GetReport gets a report. // GetReport gets a report.
// //
// It may not be called concurrently with itself. // It may not be called concurrently with itself.
@@ -924,42 +868,6 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap) (_ *Report,
[]byte("tailscale netcheck; see https://github.com/tailscale/tailscale/issues/188"), []byte("tailscale netcheck; see https://github.com/tailscale/tailscale/issues/188"),
netip.AddrPortFrom(netip.MustParseAddr(documentationIP), 12345)) netip.AddrPortFrom(netip.MustParseAddr(documentationIP), 12345))
if f := c.GetSTUNConn4; f != nil {
rs.pc4 = f()
} else {
u4, err := nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, nil)).ListenPacket(ctx, "udp4", c.udpBindAddr())
if err != nil {
c.logf("udp4: %v", err)
return nil, err
}
rs.pc4 = u4
go c.readPackets(ctx, u4)
}
if ifState.HaveV6 {
if f := c.GetSTUNConn6; f != nil {
rs.pc6 = f()
} else {
u6, err := nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, nil)).ListenPacket(ctx, "udp6", c.udpBindAddr())
if err != nil {
c.logf("udp6: %v", err)
} else {
rs.pc6 = u6
go c.readPackets(ctx, u6)
}
}
// If our interfaces.State suggested we have IPv6 support but then we
// failed to get an IPv6 sending socket (as in
// https://github.com/tailscale/tailscale/issues/7949), then change
// ifState.HaveV6 before we make a probe plan that involves sending IPv6
// packets and thus assuming rs.pc6 is non-nil.
if rs.pc6 == nil {
ifState = ptr.To(*ifState) // shallow clone
ifState.HaveV6 = false
}
}
plan := makeProbePlan(dm, ifState, last) plan := makeProbePlan(dm, ifState, last)
// If we're doing a full probe, also check for a captive portal. We // If we're doing a full probe, also check for a captive portal. We
@@ -1614,26 +1522,35 @@ func (rs *reportState) runProbe(ctx context.Context, dm *tailcfg.DERPMap, probe
} }
rs.mu.Unlock() rs.mu.Unlock()
if rs.c.SendPacket == nil {
rs.mu.Lock()
rs.report.IPv4CanSend = false
rs.report.IPv6CanSend = false
rs.mu.Unlock()
return
}
switch probe.proto { switch probe.proto {
case probeIPv4: case probeIPv4:
metricSTUNSend4.Add(1) metricSTUNSend4.Add(1)
n, err := rs.pc4.WriteToUDPAddrPort(req, addr)
if n == len(req) && err == nil || neterror.TreatAsLostUDP(err) {
rs.mu.Lock()
rs.report.IPv4CanSend = true
rs.mu.Unlock()
}
case probeIPv6: case probeIPv6:
metricSTUNSend6.Add(1) metricSTUNSend6.Add(1)
n, err := rs.pc6.WriteToUDPAddrPort(req, addr)
if n == len(req) && err == nil || neterror.TreatAsLostUDP(err) {
rs.mu.Lock()
rs.report.IPv6CanSend = true
rs.mu.Unlock()
}
default: default:
panic("bad probe proto " + fmt.Sprint(probe.proto)) panic("bad probe proto " + fmt.Sprint(probe.proto))
} }
n, err := rs.c.SendPacket(req, addr)
if n == len(req) && err == nil || neterror.TreatAsLostUDP(err) {
rs.mu.Lock()
switch probe.proto {
case probeIPv4:
rs.report.IPv4CanSend = true
case probeIPv6:
rs.report.IPv6CanSend = true
}
rs.mu.Unlock()
}
c.vlogf("sent to %v", addr) c.vlogf("sent to %v", addr)
} }

View File

@@ -159,13 +159,16 @@ func TestBasic(t *testing.T) {
defer cleanup() defer cleanup()
c := &Client{ c := &Client{
Logf: t.Logf, Logf: t.Logf,
UDPBindAddr: "127.0.0.1:0",
} }
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() defer cancel()
if err := c.Standalone(ctx, "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String())) r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -851,7 +854,6 @@ func TestNoCaptivePortalWhenUDP(t *testing.T) {
c := &Client{ c := &Client{
Logf: t.Logf, Logf: t.Logf,
UDPBindAddr: "127.0.0.1:0",
testEnoughRegions: 1, testEnoughRegions: 1,
// Set the delay long enough that we have time to cancel it // Set the delay long enough that we have time to cancel it
@@ -862,6 +864,10 @@ func TestNoCaptivePortalWhenUDP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() defer cancel()
if err := c.Standalone(ctx, "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String())) r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -885,7 +891,6 @@ func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
func TestNodeAddrResolve(t *testing.T) { func TestNodeAddrResolve(t *testing.T) {
c := &Client{ c := &Client{
Logf: t.Logf, Logf: t.Logf,
UDPBindAddr: "127.0.0.1:0",
UseDNSCache: true, UseDNSCache: true,
} }

View File

@@ -0,0 +1,99 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package netcheck
import (
"context"
"errors"
"net/netip"
"tailscale.com/net/netaddr"
"tailscale.com/net/netns"
"tailscale.com/net/stun"
"tailscale.com/types/logger"
"tailscale.com/types/nettype"
"tailscale.com/util/multierr"
)
// Standalone creates the necessary UDP sockets on the given bindAddr and starts
// an IO loop so that the Client can perform active probes with no further need
// for external driving of IO (no need to set/implement SendPacket, or call
// ReceiveSTUNPacket). It must be called prior to starting any reports and is
// shut down by cancellation of the provided context. If both IPv4 and IPv6 fail
// to bind, errors will be returned, if one or both protocols can bind no error
// is returned.
func (c *Client) Standalone(ctx context.Context, bindAddr string) error {
if bindAddr == "" {
bindAddr = ":0"
}
var errs []error
u4, err := nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, nil)).ListenPacket(ctx, "udp4", bindAddr)
if err != nil {
c.logf("udp4: %v", err)
errs = append(errs, err)
} else {
go readPackets(ctx, c.logf, u4, c.ReceiveSTUNPacket)
}
u6, err := nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, nil)).ListenPacket(ctx, "udp6", bindAddr)
if err != nil {
c.logf("udp6: %v", err)
errs = append(errs, err)
} else {
go readPackets(ctx, c.logf, u6, c.ReceiveSTUNPacket)
}
c.SendPacket = func(pkt []byte, dst netip.AddrPort) (int, error) {
pc := u4
if dst.Addr().Is6() {
pc = u6
}
if pc == nil {
return 0, errors.New("no UDP socket")
}
return pc.WriteToUDPAddrPort(pkt, dst)
}
// If both v4 and v6 failed, report an error, otherwise let one succeed.
if len(errs) == 2 {
return multierr.New(errs...)
}
return nil
}
// readPackets reads STUN packets from pc until there's an error or ctx is done.
// In either case, it closes pc.
func readPackets(ctx context.Context, logf logger.Logf, pc nettype.PacketConn, recv func([]byte, netip.AddrPort)) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
case <-done:
}
pc.Close()
}()
var buf [64 << 10]byte
for {
n, addr, err := pc.ReadFromUDPAddrPort(buf[:])
if err != nil {
if ctx.Err() != nil {
return
}
logf("ReadFrom: %v", err)
return
}
pkt := buf[:n]
if !stun.Is(pkt) {
continue
}
if ap := netaddr.Unmap(addr); ap.IsValid() {
recv(pkt, ap)
}
}
}

View File

@@ -118,10 +118,6 @@ type Conn struct {
// port mappings from NAT devices. // port mappings from NAT devices.
portMapper *portmapper.Client portMapper *portmapper.Client
// stunReceiveFunc holds the current STUN packet processing func.
// Its Loaded value is always non-nil.
stunReceiveFunc syncs.AtomicValue[func(p []byte, fromAddr netip.AddrPort)]
// derpRecvCh is used by receiveDERP to read DERP messages. // derpRecvCh is used by receiveDERP to read DERP messages.
// It must have buffer size > 0; see issue 3736. // It must have buffer size > 0; see issue 3736.
derpRecvCh chan derpReadResult derpRecvCh chan derpReadResult
@@ -422,17 +418,20 @@ func NewConn(opts Options) (*Conn, error) {
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done() c.donec = c.connCtx.Done()
c.netChecker = &netcheck.Client{ c.netChecker = &netcheck.Client{
Logf: logger.WithPrefix(c.logf, "netcheck: "), Logf: logger.WithPrefix(c.logf, "netcheck: "),
NetMon: c.netMon, NetMon: c.netMon,
GetSTUNConn4: func() netcheck.STUNConn { return &c.pconn4 }, SendPacket: func(b []byte, ap netip.AddrPort) (int, error) {
GetSTUNConn6: func() netcheck.STUNConn { return &c.pconn6 }, ok, err := c.sendUDP(ap, b)
if !ok {
return 0, err
}
return len(b), err
},
SkipExternalNetwork: inTest(), SkipExternalNetwork: inTest(),
PortMapper: c.portMapper, PortMapper: c.portMapper,
UseDNSCache: true, UseDNSCache: true,
} }
c.ignoreSTUNPackets()
if d4, err := c.listenRawDisco("ip4"); err == nil { if d4, err := c.listenRawDisco("ip4"); err == nil {
c.logf("[v1] using BPF disco receiver for IPv4") c.logf("[v1] using BPF disco receiver for IPv4")
c.closeDisco4 = d4 c.closeDisco4 = d4
@@ -458,11 +457,6 @@ func (c *Conn) InstallCaptureHook(cb capture.Callback) {
c.captureHook.Store(cb) c.captureHook.Store(cb)
} }
// ignoreSTUNPackets sets a STUN packet processing func that does nothing.
func (c *Conn) ignoreSTUNPackets() {
c.stunReceiveFunc.Store(func([]byte, netip.AddrPort) {})
}
// doPeriodicSTUN is called (in a new goroutine) by // doPeriodicSTUN is called (in a new goroutine) by
// periodicReSTUNTimer when periodic STUNs are active. // periodicReSTUNTimer when periodic STUNs are active.
func (c *Conn) doPeriodicSTUN() { c.ReSTUN("periodic") } func (c *Conn) doPeriodicSTUN() { c.ReSTUN("periodic") }
@@ -608,9 +602,6 @@ func (c *Conn) updateNetInfo(ctx context.Context) (*netcheck.Report, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second) ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel() defer cancel()
c.stunReceiveFunc.Store(c.netChecker.ReceiveSTUNPacket)
defer c.ignoreSTUNPackets()
report, err := c.netChecker.GetReport(ctx, dm) report, err := c.netChecker.GetReport(ctx, dm)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -856,8 +847,6 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro
addAddr(ipp(nr.GlobalV6), tailcfg.EndpointSTUN) addAddr(ipp(nr.GlobalV6), tailcfg.EndpointSTUN)
} }
c.ignoreSTUNPackets()
// Update our set of endpoints by adding any endpoints that we // Update our set of endpoints by adding any endpoints that we
// previously found but haven't expired yet. This also updates the // previously found but haven't expired yet. This also updates the
// cache with the set of endpoints discovered in this function. // cache with the set of endpoints discovered in this function.
@@ -1173,7 +1162,7 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu
// caller). // caller).
func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache) (ep *endpoint, ok bool) { func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache) (ep *endpoint, ok bool) {
if stun.Is(b) { if stun.Is(b) {
c.stunReceiveFunc.Load()(b, ipp) c.netChecker.ReceiveSTUNPacket(b, ipp)
return nil, false return nil, false
} }
if c.handleDiscoMessage(b, ipp, key.NodePublic{}, discoRXPathUDP) { if c.handleDiscoMessage(b, ipp, key.NodePublic{}, discoRXPathUDP) {