Files
tailscale/wgengine/netlog/netlog.go
Joe Tsai d7cedda677 wgengine/netlog: embed node information in network flow logs
This rewrites the netlog package to support embedding node information in network flow logs.
Some bit of complexity comes in trying to pre-compute the expected size of the log message
after JSON serialization to ensure that we can respect maximum body limits in log uploading.

We also fix a bug in tstun, where we were recording the IP address after SNAT,
which was resulting in non-sensible connection flows being logged.

Updates tailscale/corp#33352

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
2025-10-27 14:57:12 -07:00

485 lines
15 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !ts_omit_netlog && !ts_omit_logtail
// Package netlog provides a logger that monitors a TUN device and
// periodically records any traffic into a log stream.
package netlog
import (
"cmp"
"context"
"fmt"
"io"
"log"
"net/http"
"net/netip"
"sync"
"time"
"tailscale.com/health"
"tailscale.com/logpolicy"
"tailscale.com/logtail"
"tailscale.com/net/netmon"
"tailscale.com/net/sockstats"
"tailscale.com/net/tsaddr"
"tailscale.com/types/ipproto"
"tailscale.com/types/logid"
"tailscale.com/types/netlogfunc"
"tailscale.com/types/netlogtype"
"tailscale.com/types/netmap"
"tailscale.com/util/eventbus"
"tailscale.com/util/set"
"tailscale.com/wgengine/router"
jsonv2 "github.com/go-json-experiment/json"
"github.com/go-json-experiment/json/jsontext"
)
// pollPeriod specifies how often to poll for network traffic.
const pollPeriod = 5 * time.Second
// Device is an abstraction over a tunnel device or a magic socket.
// Both *tstun.Wrapper and *magicsock.Conn implement this interface.
type Device interface {
SetConnectionCounter(netlogfunc.ConnectionCounter)
}
type noopDevice struct{}
func (noopDevice) SetConnectionCounter(netlogfunc.ConnectionCounter) {}
// Logger logs statistics about every connection.
// At present, it only logs connections within a tailscale network.
// By default, exit node traffic is not logged for privacy reasons
// unless the Tailnet administrator opts-into explicit logging.
// The zero value is ready for use.
type Logger struct {
mu sync.Mutex // protects all fields below
// shutdownLocked shuts down the logger.
// The mutex must be held when calling.
shutdownLocked func(context.Context) error
record record // the current record of network connection flows
recordLen int // upper bound on JSON length of record
recordsChan chan record // set to nil when shutdown
flushTimer *time.Timer // fires when record should flush to recordsChan
// Information about Tailscale nodes.
// These are read-only once updated by ReconfigNetworkMap.
selfNode nodeUser
allNodes map[netip.Addr]nodeUser // includes selfNode; nodeUser values are always valid
// Information about routes.
// These are read-only once updated by ReconfigRoutes.
routeAddrs set.Set[netip.Addr]
routePrefixes []netip.Prefix
}
// Running reports whether the logger is running.
func (nl *Logger) Running() bool {
nl.mu.Lock()
defer nl.mu.Unlock()
return nl.shutdownLocked != nil
}
var testClient *http.Client
// Startup starts an asynchronous network logger that monitors
// statistics for the provided tun and/or sock device.
//
// The tun [Device] captures packets within the tailscale network,
// where at least one address is usually a tailscale IP address.
// The source is usually from the perspective of the current node.
// If one of the other endpoint is not a tailscale IP address,
// then it suggests the use of a subnet router or exit node.
// For example, when using a subnet router, the source address is
// the tailscale IP address of the current node, and
// the destination address is an IP address within the subnet range.
// In contrast, when acting as a subnet router, the source address is
// an IP address within the subnet range, and the destination is a
// tailscale IP address that initiated the subnet proxy connection.
// In this case, the node acting as a subnet router is acting on behalf
// of some remote endpoint within the subnet range.
// The tun is used to populate the VirtualTraffic, SubnetTraffic,
// and ExitTraffic fields in [netlogtype.Message].
//
// The sock [Device] captures packets at the magicsock layer.
// The source is always a tailscale IP address and the destination
// is a non-tailscale IP address to contact for that particular tailscale node.
// The IP protocol and source port are always zero.
// The sock is used to populated the PhysicalTraffic field in [netlogtype.Message].
//
// The netMon parameter is optional; if non-nil it's used to do faster interface lookups.
func (nl *Logger) Startup(nm *netmap.NetworkMap, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus, logExitFlowEnabledEnabled bool) error {
nl.mu.Lock()
defer nl.mu.Unlock()
if nl.shutdownLocked != nil {
return fmt.Errorf("network logger already running")
}
nl.selfNode, nl.allNodes = makeNodeMaps(nm)
// Startup a log stream to Tailscale's logging service.
logf := log.Printf
httpc := &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost, netMon, health, logf)}
if testClient != nil {
httpc = testClient
}
logger := logtail.NewLogger(logtail.Config{
Collection: "tailtraffic.log.tailscale.io",
PrivateID: nodeLogID,
CopyPrivateID: domainLogID,
Bus: bus,
Stderr: io.Discard,
CompressLogs: true,
HTTPC: httpc,
// TODO(joetsai): Set Buffer? Use an in-memory buffer for now.
// Include process sequence numbers to identify missing samples.
IncludeProcID: true,
IncludeProcSequence: true,
}, logf)
logger.SetSockstatsLabel(sockstats.LabelNetlogLogger)
// Register the connection tracker into the TUN device.
tun = cmp.Or[Device](tun, noopDevice{})
tun.SetConnectionCounter(nl.updateVirtConn)
// Register the connection tracker into magicsock.
sock = cmp.Or[Device](sock, noopDevice{})
sock.SetConnectionCounter(nl.updatePhysConn)
// Startup a goroutine to record log messages.
// This is done asynchronously so that the cost of serializing
// the network flow log message never stalls processing of packets.
nl.record = record{}
nl.recordLen = 0
nl.recordsChan = make(chan record, 100)
recorderDone := make(chan struct{})
go func(recordsChan chan record) {
defer close(recorderDone)
for rec := range recordsChan {
msg := rec.toMessage(false, !logExitFlowEnabledEnabled)
if b, err := jsonv2.Marshal(msg, jsontext.AllowInvalidUTF8(true)); err != nil {
logger.Logf("json.Marshal error: %v", err)
} else {
logger.Logf("%s", b)
}
}
}(nl.recordsChan)
// Register the mechanism for shutting down.
nl.shutdownLocked = func(ctx context.Context) error {
tun.SetConnectionCounter(nil)
sock.SetConnectionCounter(nil)
// Flush and process all pending records.
nl.flushRecordLocked()
close(nl.recordsChan)
nl.recordsChan = nil
<-recorderDone
recorderDone = nil
// Try to upload all pending records.
err := logger.Shutdown(ctx)
// Purge state.
nl.shutdownLocked = nil
nl.selfNode = nodeUser{}
nl.allNodes = nil
nl.routeAddrs = nil
nl.routePrefixes = nil
return err
}
return nil
}
var (
tailscaleServiceIPv4 = tsaddr.TailscaleServiceIP()
tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6()
)
func (nl *Logger) updateVirtConn(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) {
// Network logging is defined as traffic between two Tailscale nodes.
// Traffic with the internal Tailscale service is not with another node
// and should not be logged. It also happens to be a high volume
// amount of discrete traffic flows (e.g., DNS lookups).
switch dst.Addr() {
case tailscaleServiceIPv4, tailscaleServiceIPv6:
return
}
nl.mu.Lock()
defer nl.mu.Unlock()
// Lookup the connection and increment the counts.
nl.initRecordLocked()
conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst}
cnts, found := nl.record.virtConns[conn]
if !found {
cnts.connType = nl.addNewVirtConnLocked(conn)
}
if recv {
cnts.RxPackets += uint64(packets)
cnts.RxBytes += uint64(bytes)
} else {
cnts.TxPackets += uint64(packets)
cnts.TxBytes += uint64(bytes)
}
nl.record.virtConns[conn] = cnts
}
// addNewVirtConnLocked adds the first insertion of a physical connection.
// The [Logger.mu] must be held.
func (nl *Logger) addNewVirtConnLocked(c netlogtype.Connection) connType {
// Check whether this is the first insertion of the src and dst node.
// If so, compute the additional JSON bytes that would be added
// to the record for the node information.
var srcNodeLen, dstNodeLen int
srcNode, srcSeen := nl.record.seenNodes[c.Src.Addr()]
if !srcSeen {
srcNode = nl.allNodes[c.Src.Addr()]
if srcNode.Valid() {
srcNodeLen = srcNode.jsonLen()
}
}
dstNode, dstSeen := nl.record.seenNodes[c.Dst.Addr()]
if !dstSeen {
dstNode = nl.allNodes[c.Dst.Addr()]
if dstNode.Valid() {
dstNodeLen = dstNode.jsonLen()
}
}
// Check whether the additional [netlogtype.ConnectionCounts]
// and [netlogtype.Node] information would exceed [maxLogSize].
if nl.recordLen+netlogtype.MaxConnectionCountsJSONSize+srcNodeLen+dstNodeLen > maxLogSize {
nl.flushRecordLocked()
nl.initRecordLocked()
}
// Insert newly seen src and/or dst nodes.
if !srcSeen && srcNode.Valid() {
nl.record.seenNodes[c.Src.Addr()] = srcNode
}
if !dstSeen && dstNode.Valid() {
nl.record.seenNodes[c.Dst.Addr()] = dstNode
}
nl.recordLen += netlogtype.MaxConnectionCountsJSONSize + srcNodeLen + dstNodeLen
// Classify the traffic type.
var srcIsSelfNode bool
if nl.selfNode.Valid() {
srcIsSelfNode = nl.selfNode.Addresses().ContainsFunc(func(p netip.Prefix) bool {
return c.Src.Addr() == p.Addr() && p.IsSingleIP()
})
}
switch {
case srcIsSelfNode && dstNode.Valid():
return virtualTraffic
case srcIsSelfNode:
// TODO: Should we swap src for the node serving as the proxy?
// It is relatively useless always using the self IP address.
if nl.withinRoutesLocked(c.Dst.Addr()) {
return subnetTraffic // a client using another subnet router
} else {
return exitTraffic // a client using exit an exit node
}
case dstNode.Valid():
if nl.withinRoutesLocked(c.Src.Addr()) {
return subnetTraffic // serving as a subnet router
} else {
return exitTraffic // serving as an exit node
}
default:
return unknownTraffic
}
}
func (nl *Logger) updatePhysConn(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) {
nl.mu.Lock()
defer nl.mu.Unlock()
// Lookup the connection and increment the counts.
nl.initRecordLocked()
conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst}
cnts, found := nl.record.physConns[conn]
if !found {
nl.addNewPhysConnLocked(conn)
}
if recv {
cnts.RxPackets += uint64(packets)
cnts.RxBytes += uint64(bytes)
} else {
cnts.TxPackets += uint64(packets)
cnts.TxBytes += uint64(bytes)
}
nl.record.physConns[conn] = cnts
}
// addNewPhysConnLocked adds the first insertion of a physical connection.
// The [Logger.mu] must be held.
func (nl *Logger) addNewPhysConnLocked(c netlogtype.Connection) {
// Check whether this is the first insertion of the src node.
var srcNodeLen int
srcNode, srcSeen := nl.record.seenNodes[c.Src.Addr()]
if !srcSeen {
srcNode = nl.allNodes[c.Src.Addr()]
if srcNode.Valid() {
srcNodeLen = srcNode.jsonLen()
}
}
// Check whether the additional [netlogtype.ConnectionCounts]
// and [netlogtype.Node] information would exceed [maxLogSize].
if nl.recordLen+netlogtype.MaxConnectionCountsJSONSize+srcNodeLen > maxLogSize {
nl.flushRecordLocked()
nl.initRecordLocked()
}
// Insert newly seen src and/or dst nodes.
if !srcSeen && srcNode.Valid() {
nl.record.seenNodes[c.Src.Addr()] = srcNode
}
nl.recordLen += netlogtype.MaxConnectionCountsJSONSize + srcNodeLen
}
// initRecordLocked initialize the current record if uninitialized.
// The [Logger.mu] must be held.
func (nl *Logger) initRecordLocked() {
if nl.recordLen != 0 {
return
}
nl.record.selfNode = nl.selfNode
nl.record.start = time.Now().UTC()
nl.record.seenNodes = make(map[netip.Addr]nodeUser)
nl.record.virtConns = make(map[netlogtype.Connection]countsType)
nl.record.physConns = make(map[netlogtype.Connection]netlogtype.Counts)
nl.recordLen = netlogtype.MinMessageJSONSize + nl.selfNode.jsonLen()
// Start a time to auto-flush the record.
// Avoid tickers since continually waking up a goroutine
// is expensive on battery powered devices.
nl.flushTimer = time.AfterFunc(pollPeriod, func() {
nl.mu.Lock()
defer nl.mu.Unlock()
if !nl.record.start.IsZero() && time.Since(nl.record.start) > pollPeriod/2 {
nl.flushRecordLocked()
}
})
}
// flushRecordLocked flushes the current record if initialized.
// The [Logger.mu] must be held.
func (nl *Logger) flushRecordLocked() {
if nl.recordLen == 0 {
return
}
nl.record.end = time.Now().UTC()
if nl.recordsChan != nil {
select {
case nl.recordsChan <- nl.record:
default:
// TODO: Buffered channel is full, drop the message. Should we log this?
}
}
if nl.flushTimer != nil {
nl.flushTimer.Stop()
nl.flushTimer = nil
}
nl.record = record{}
nl.recordLen = 0
}
func makeNodeMaps(nm *netmap.NetworkMap) (selfNode nodeUser, allNodes map[netip.Addr]nodeUser) {
if nm == nil {
return
}
allNodes = make(map[netip.Addr]nodeUser)
if nm.SelfNode.Valid() {
selfNode = nodeUser{nm.SelfNode, nm.UserProfiles[nm.SelfNode.User()]}
for _, addr := range nm.SelfNode.Addresses().All() {
if addr.IsSingleIP() {
allNodes[addr.Addr()] = selfNode
}
}
}
for _, peer := range nm.Peers {
if peer.Valid() {
for _, addr := range peer.Addresses().All() {
if addr.IsSingleIP() {
allNodes[addr.Addr()] = nodeUser{peer, nm.UserProfiles[peer.User()]}
}
}
}
}
return selfNode, allNodes
}
// ReconfigNetworkMap configures the network logger with an updated netmap.
func (nl *Logger) ReconfigNetworkMap(nm *netmap.NetworkMap) {
selfNode, allNodes := makeNodeMaps(nm) // avoid holding lock while making maps
nl.mu.Lock()
nl.selfNode, nl.allNodes = selfNode, allNodes
nl.mu.Unlock()
}
func makeRouteMaps(cfg *router.Config) (addrs set.Set[netip.Addr], prefixes []netip.Prefix) {
addrs = make(set.Set[netip.Addr])
insertPrefixes := func(rs []netip.Prefix) {
for _, p := range rs {
if p.IsSingleIP() {
addrs.Add(p.Addr())
} else {
prefixes = append(prefixes, p)
}
}
}
insertPrefixes(cfg.LocalAddrs)
insertPrefixes(cfg.Routes)
insertPrefixes(cfg.SubnetRoutes)
return addrs, prefixes
}
// ReconfigRoutes configures the network logger with updated routes.
// The cfg is used to classify the types of connections captured by
// the tun Device passed to Startup.
func (nl *Logger) ReconfigRoutes(cfg *router.Config) {
addrs, prefixes := makeRouteMaps(cfg) // avoid holding lock while making maps
nl.mu.Lock()
nl.routeAddrs, nl.routePrefixes = addrs, prefixes
nl.mu.Unlock()
}
// withinRoutesLocked reports whether a is within the configured routes,
// which should only contain Tailscale addresses and subnet routes.
// The [Logger.mu] must be held.
func (nl *Logger) withinRoutesLocked(a netip.Addr) bool {
if nl.routeAddrs.Contains(a) {
return true
}
for _, p := range nl.routePrefixes {
if p.Contains(a) && p.Bits() > 0 {
return true
}
}
return false
}
// Shutdown shuts down the network logger.
// This attempts to flush out all pending log messages.
// Even if an error is returned, the logger is still shut down.
func (nl *Logger) Shutdown(ctx context.Context) error {
nl.mu.Lock()
defer nl.mu.Unlock()
if nl.shutdownLocked == nil {
return nil
}
return nl.shutdownLocked(ctx)
}