From 81fd259133f510be35114c456cb927f7ef7537ec Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Thu, 27 Oct 2022 16:26:52 -0700 Subject: [PATCH] wgengine/magicsock: gather physical-layer statistics (#5925) There is utility in logging traffic statistics that occurs at the physical layer. That is, in order to send packets virtually to a particular tailscale IP address, what physical endpoints did we need to communicate with? This functionality logs IP addresses identical to what had always been logged in magicsock prior to #5823, so there is no increase in PII being logged. ExtractStatistics returns a mapping of connections to counts. The source is always a Tailscale IP address (without port), while the destination is some endpoint reachable on WAN or LAN. As a special case, traffic routed through DERP will use 127.3.3.40 as the destination address with the port being the DERP region. This entire feature is only enabled if data-plane audit logging is enabled on the tailnet (by default it is disabled). Example of type of information logged: ------------------------------------ Tx[P/s] Tx[B/s] Rx[P/s] Rx[B/s] PhysicalTraffic: 25.80 3.39Ki 38.80 5.57Ki 100.1.2.3 -> 143.11.22.33:41641 15.40 2.00Ki 23.20 3.37Ki 100.4.5.6 -> 192.168.0.100:41641 10.20 1.38Ki 15.60 2.20Ki 100.7.8.9 -> 127.3.3.40:2 0.20 6.40 0.00 0.00 Signed-off-by: Joe Tsai --- wgengine/magicsock/magicsock.go | 73 ++++++++++++++++++++++++++-- wgengine/magicsock/magicsock_test.go | 34 +++++++++++++ wgengine/netlog/logger.go | 3 +- wgengine/userspace.go | 2 +- 4 files changed, 106 insertions(+), 6 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 596b6d6b1..95b84cdcb 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -52,6 +52,7 @@ "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/types/netlogtype" "tailscale.com/types/netmap" "tailscale.com/types/nettype" "tailscale.com/util/clientmetric" @@ -337,6 +338,21 @@ type Conn struct { // port is the preferred port from opts.Port; 0 means auto. port atomic.Uint32 + // stats maintains per-connection counters. + // See SetStatisticsEnabled and ExtractStatistics for details. + stats struct { + enabled atomic.Bool + + // TODO(joetsai): A per-Conn map of connections is easiest to implement. + // Since every packet occurs within the context of an endpoint, + // we could track the counts within the endpoint itself, + // and then merge the results when ExtractStatistics is called. + // That would avoid a map lookup for every packet. + + mu sync.Mutex + m map[netlogtype.Connection]netlogtype.Counts + } + // ============================================================ // mu guards all following fields; see userspaceEngine lock // ordering rules against the engine. For derphttp, mu must @@ -1750,6 +1766,9 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache, ep = de } ep.noteRecvActivity() + if c.stats.enabled.Load() { + c.updateStats(ep.nodeAddr, ipp, netlogtype.Counts{RxPackets: 1, RxBytes: uint64(len(b))}) + } return ep, true } @@ -1805,6 +1824,9 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en } ep.noteRecvActivity() + if c.stats.enabled.Load() { + c.updateStats(ep.nodeAddr, ipp, netlogtype.Counts{RxPackets: 1, RxBytes: uint64(dm.n)}) + } return n, ep } @@ -2406,6 +2428,9 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { endpointState: map[netip.AddrPort]*endpointState{}, heartbeatDisabled: heartbeatDisabled, } + if len(n.Addresses) > 0 { + ep.nodeAddr = n.Addresses[0].Addr() + } if !n.DiscoKey.IsZero() { ep.discoKey = n.DiscoKey ep.discoShort = n.DiscoKey.ShortString() @@ -3298,6 +3323,39 @@ func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) { }) } +// updateStats updates the statistics counters with the src, dst, and cnts. +// It is the caller's responsibility to check whether logging is enabled. +func (c *Conn) updateStats(src netip.Addr, dst netip.AddrPort, cnts netlogtype.Counts) { + conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst} + c.stats.mu.Lock() + defer c.stats.mu.Unlock() + mak.Set(&c.stats.m, conn, c.stats.m[conn].Add(cnts)) +} + +// SetStatisticsEnabled enables per-connection packet counters. +// Disabling statistics gathering does not reset the counters. +// ExtractStatistics must be called to reset the counters and +// be periodically called while enabled to avoid unbounded memory use. +func (c *Conn) SetStatisticsEnabled(enable bool) { + c.stats.enabled.Store(enable) +} + +// ExtractStatistics extracts and resets the counters for all active connections. +// It must be called periodically otherwise the memory used is unbounded. +// +// The source is always a peer's tailscale IP address, +// while the destination is the peer's physical IP address and port. +// As a special case, packets routed through DERP use a destination address +// of 127.3.3.40 with the port being the DERP region. +// This node's tailscale IP address never appears in the returned map. +func (c *Conn) ExtractStatistics() map[netlogtype.Connection]netlogtype.Counts { + c.stats.mu.Lock() + defer c.stats.mu.Unlock() + m := c.stats.m + c.stats.m = nil + return m +} + func ippDebugString(ua netip.AddrPort) string { if ua.Addr() == derpMagicIPAddr { return fmt.Sprintf("derp-%d", ua.Port()) @@ -3332,6 +3390,7 @@ type endpoint struct { publicKey key.NodePublic // peer public key (for WireGuard + DERP) publicKeyHex string // cached output of publicKey.UntypedHexString fakeWGAddr netip.AddrPort // the UDP address we tell wireguard-go we're using + nodeAddr netip.Addr // the node's first tailscale address (only used for logging) // mu protects all following fields. mu sync.Mutex // Lock ordering: Conn.mu, then endpoint.mu @@ -3676,11 +3735,19 @@ func (de *endpoint) send(b []byte) error { var err error if udpAddr.IsValid() { _, err = de.c.sendAddr(udpAddr, de.publicKey, b) + if err == nil && de.c.stats.enabled.Load() { + de.c.updateStats(de.nodeAddr, udpAddr, netlogtype.Counts{TxPackets: 1, TxBytes: uint64(len(b))}) + } } if derpAddr.IsValid() { - if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok && err != nil { - // UDP failed but DERP worked, so good enough: - return nil + if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok { + if de.c.stats.enabled.Load() { + de.c.updateStats(de.nodeAddr, derpAddr, netlogtype.Counts{TxPackets: 1, TxBytes: uint64(len(b))}) + } + if err != nil { + // UDP failed but DERP worked, so good enough: + return nil + } } } return err diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 8e802af5c..50833a30f 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -28,6 +28,7 @@ "unsafe" "go4.org/mem" + "golang.org/x/exp/maps" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun/tuntest" "tailscale.com/derp" @@ -42,6 +43,7 @@ "tailscale.com/tstest/natlab" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/types/netlogtype" "tailscale.com/types/netmap" "tailscale.com/types/nettype" "tailscale.com/util/cibuild" @@ -1093,17 +1095,45 @@ func testTwoDevicePing(t *testing.T, d *devices) { } } + m1.conn.SetStatisticsEnabled(true) + m2.conn.SetStatisticsEnabled(true) + + checkStats := func(t *testing.T, m *magicStack, wantConns []netlogtype.Connection) { + stats := m.conn.ExtractStatistics() + for _, conn := range wantConns { + if _, ok := stats[conn]; ok { + return + } + } + t.Helper() + t.Errorf("missing any connection to %s from %s", wantConns, maps.Keys(stats)) + } + + addrPort := netip.MustParseAddrPort + m1Conns := []netlogtype.Connection{ + {Src: addrPort("1.0.0.2:0"), Dst: m2.conn.pconn4.LocalAddr().AddrPort()}, + {Src: addrPort("1.0.0.2:0"), Dst: addrPort("127.3.3.40:1")}, + } + m2Conns := []netlogtype.Connection{ + {Src: addrPort("1.0.0.1:0"), Dst: m1.conn.pconn4.LocalAddr().AddrPort()}, + {Src: addrPort("1.0.0.1:0"), Dst: addrPort("127.3.3.40:1")}, + } + outerT := t t.Run("ping 1.0.0.1", func(t *testing.T) { setT(t) defer setT(outerT) ping1(t) + checkStats(t, m1, m1Conns) + checkStats(t, m2, m2Conns) }) t.Run("ping 1.0.0.2", func(t *testing.T) { setT(t) defer setT(outerT) ping2(t) + checkStats(t, m1, m1Conns) + checkStats(t, m2, m2Conns) }) t.Run("ping 1.0.0.2 via SendPacket", func(t *testing.T) { @@ -1120,6 +1150,8 @@ func testTwoDevicePing(t *testing.T, d *devices) { if err := sendWithTimeout(msg1to2, in, send); err != nil { t.Error(err) } + checkStats(t, m1, m1Conns) + checkStats(t, m2, m2Conns) }) t.Run("no-op dev1 reconfig", func(t *testing.T) { @@ -1130,6 +1162,8 @@ func testTwoDevicePing(t *testing.T, d *devices) { } ping1(t) ping2(t) + checkStats(t, m1, m1Conns) + checkStats(t, m2, m2Conns) }) } diff --git a/wgengine/netlog/logger.go b/wgengine/netlog/logger.go index f0e6ee4f7..69534cc15 100644 --- a/wgengine/netlog/logger.go +++ b/wgengine/netlog/logger.go @@ -31,8 +31,7 @@ // Device is an abstraction over a tunnel device or a magic socket. // *tstun.Wrapper implements this interface. -// -// TODO(joetsai): Make *magicsock.Conn implement this interface. +// *magicsock.Conn implements this interface. type Device interface { SetStatisticsEnabled(bool) ExtractStatistics() map[netlogtype.Connection]netlogtype.Counts diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 34f328513..ae03b5558 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -953,7 +953,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, nid := cfg.NetworkLogging.NodeID tid := cfg.NetworkLogging.DomainID e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public()) - if err := e.networkLogger.Startup(nid, tid, e.tundev, nil); err != nil { + if err := e.networkLogger.Startup(nid, tid, e.tundev, e.magicConn); err != nil { e.logf("wgengine: Reconfig: error starting up network logger: %v", err) } }