From d7cedda67765ce69c534b7dd285931fa47e7886a Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Wed, 22 Oct 2025 11:27:32 -0700 Subject: [PATCH] wgengine/netlog: embed node information in network flow logs This rewrites the netlog package to support embedding node information in network flow logs. Some bit of complexity comes in trying to pre-compute the expected size of the log message after JSON serialization to ensure that we can respect maximum body limits in log uploading. We also fix a bug in tstun, where we were recording the IP address after SNAT, which was resulting in non-sensible connection flows being logged. Updates tailscale/corp#33352 Signed-off-by: Joe Tsai --- net/tstun/wrap.go | 10 +- types/netlogtype/netlogtype.go | 47 +++- wgengine/netlog/netlog.go | 455 ++++++++++++++++++++++++--------- wgengine/netlog/netlog_omit.go | 9 +- wgengine/netlog/netlog_test.go | 236 +++++++++++++++++ wgengine/netlog/record.go | 196 ++++++++++++++ wgengine/netlog/record_test.go | 255 ++++++++++++++++++ wgengine/netlog/stats.go | 222 ---------------- wgengine/netlog/stats_test.go | 235 ----------------- wgengine/userspace.go | 5 +- 10 files changed, 1077 insertions(+), 593 deletions(-) create mode 100644 wgengine/netlog/netlog_test.go create mode 100644 wgengine/netlog/record.go create mode 100644 wgengine/netlog/record_test.go delete mode 100644 wgengine/netlog/stats.go delete mode 100644 wgengine/netlog/stats_test.go diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index 70cc7118a..db4f689bf 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -967,6 +967,11 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { continue } } + if buildfeatures.HasNetLog { + if update := t.connCounter.Load(); update != nil { + updateConnCounter(update, p.Buffer(), false) + } + } // Make sure to do SNAT after filtering, so that any flow tracking in // the filter sees the original source address. See #12133. @@ -976,11 +981,6 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { panic(fmt.Sprintf("short copy: %d != %d", n, len(data)-res.dataOffset)) } sizes[buffsPos] = n - if buildfeatures.HasNetLog { - if update := t.connCounter.Load(); update != nil { - updateConnCounter(update, p.Buffer(), false) - } - } buffsPos++ } if buffsGRO != nil { diff --git a/types/netlogtype/netlogtype.go b/types/netlogtype/netlogtype.go index a29ea6f03..86d645b35 100644 --- a/types/netlogtype/netlogtype.go +++ b/types/netlogtype/netlogtype.go @@ -21,6 +21,9 @@ type Message struct { Start time.Time `json:"start"` // inclusive End time.Time `json:"end"` // inclusive + SrcNode Node `json:"srcNode,omitzero"` + DstNodes []Node `json:"dstNodes,omitempty"` + VirtualTraffic []ConnectionCounts `json:"virtualTraffic,omitempty"` SubnetTraffic []ConnectionCounts `json:"subnetTraffic,omitempty"` ExitTraffic []ConnectionCounts `json:"exitTraffic,omitempty"` @@ -28,14 +31,30 @@ type Message struct { } const ( - messageJSON = `{"nodeId":"n0123456789abcdefCNTRL",` + maxJSONTimeRange + `,` + minJSONTraffic + `}` + messageJSON = `{"nodeId":` + maxJSONStableID + `,` + minJSONNodes + `,` + maxJSONTimeRange + `,` + minJSONTraffic + `}` + maxJSONStableID = `"n0123456789abcdefCNTRL"` + minJSONNodes = `"srcNode":{},"dstNodes":[]` maxJSONTimeRange = `"start":` + maxJSONRFC3339 + `,"end":` + maxJSONRFC3339 maxJSONRFC3339 = `"0001-01-01T00:00:00.000000000Z"` minJSONTraffic = `"virtualTraffic":{},"subnetTraffic":{},"exitTraffic":{},"physicalTraffic":{}` - // MaxMessageJSONSize is the overhead size of Message when it is - // serialized as JSON assuming that each traffic map is populated. - MaxMessageJSONSize = len(messageJSON) + // MinMessageJSONSize is the overhead size of Message when it is + // serialized as JSON assuming that each field is minimally populated. + // Each [Node] occupies at least [MinNodeJSONSize]. + // Each [ConnectionCounts] occupies at most [MaxConnectionCountsJSONSize]. + MinMessageJSONSize = len(messageJSON) + + nodeJSON = `{"nodeId":` + maxJSONStableID + `,"name":"","addresses":` + maxJSONAddrs + `,"user":"","tags":[]}` + maxJSONAddrV4 = `"255.255.255.255"` + maxJSONAddrV6 = `"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"` + maxJSONAddrs = `[` + maxJSONAddrV4 + `,` + maxJSONAddrV6 + `]` + + // MinNodeJSONSize is the overhead size of Node when it is + // serialized as JSON assuming that each field is minimally populated. + // It does not account for bytes occupied by + // [Node.Name], [Node.User], or [Node.Tags]. The [Node.Addresses] + // is assumed to contain a pair of IPv4 and IPv6 address. + MinNodeJSONSize = len(nodeJSON) maxJSONConnCounts = `{` + maxJSONConn + `,` + maxJSONCounts + `}` maxJSONConn = `"proto":` + maxJSONProto + `,"src":` + maxJSONAddrPort + `,"dst":` + maxJSONAddrPort @@ -52,6 +71,26 @@ const ( MaxConnectionCountsJSONSize = len(maxJSONConnCounts) ) +// Node is information about a node. +type Node struct { + // NodeID is the stable ID of the node. + NodeID tailcfg.StableNodeID `json:"nodeId"` + + // Name is the fully-qualified name of the node. + Name string `json:"name,omitzero"` // e.g., "carbonite.example.ts.net" + + // Addresses are the Tailscale IP addresses of the node. + Addresses []netip.Addr `json:"addresses,omitempty"` + + // User is the user that owns the node. + // It is not populated if the node is tagged. + User string `json:"user,omitzero"` // e.g., "johndoe@example.com" + + // Tags are the tags of the node. + // It is not populated if the node is owned by a user. + Tags []string `json:"tags,omitempty"` // e.g., ["tag:prod","tag:logs"] +} + // ConnectionCounts is a flattened struct of both a connection and counts. type ConnectionCounts struct { Connection diff --git a/wgengine/netlog/netlog.go b/wgengine/netlog/netlog.go index 2984df994..b5b0892ca 100644 --- a/wgengine/netlog/netlog.go +++ b/wgengine/netlog/netlog.go @@ -10,8 +10,6 @@ package netlog import ( "cmp" "context" - "encoding/json" - "errors" "fmt" "io" "log" @@ -26,12 +24,17 @@ import ( "tailscale.com/net/netmon" "tailscale.com/net/sockstats" "tailscale.com/net/tsaddr" - "tailscale.com/tailcfg" + "tailscale.com/types/ipproto" "tailscale.com/types/logid" "tailscale.com/types/netlogfunc" "tailscale.com/types/netlogtype" + "tailscale.com/types/netmap" "tailscale.com/util/eventbus" + "tailscale.com/util/set" "tailscale.com/wgengine/router" + + jsonv2 "github.com/go-json-experiment/json" + "github.com/go-json-experiment/json/jsontext" ) // pollPeriod specifies how often to poll for network traffic. @@ -49,25 +52,37 @@ func (noopDevice) SetConnectionCounter(netlogfunc.ConnectionCounter) {} // Logger logs statistics about every connection. // At present, it only logs connections within a tailscale network. -// Exit node traffic is not logged for privacy reasons. +// By default, exit node traffic is not logged for privacy reasons +// unless the Tailnet administrator opts-into explicit logging. // The zero value is ready for use. type Logger struct { mu sync.Mutex // protects all fields below - logger *logtail.Logger - stats *statistics - tun Device - sock Device + // shutdownLocked shuts down the logger. + // The mutex must be held when calling. + shutdownLocked func(context.Context) error - addrs map[netip.Addr]bool - prefixes map[netip.Prefix]bool + record record // the current record of network connection flows + recordLen int // upper bound on JSON length of record + recordsChan chan record // set to nil when shutdown + flushTimer *time.Timer // fires when record should flush to recordsChan + + // Information about Tailscale nodes. + // These are read-only once updated by ReconfigNetworkMap. + selfNode nodeUser + allNodes map[netip.Addr]nodeUser // includes selfNode; nodeUser values are always valid + + // Information about routes. + // These are read-only once updated by ReconfigRoutes. + routeAddrs set.Set[netip.Addr] + routePrefixes []netip.Prefix } // Running reports whether the logger is running. func (nl *Logger) Running() bool { nl.mu.Lock() defer nl.mu.Unlock() - return nl.logger != nil + return nl.shutdownLocked != nil } var testClient *http.Client @@ -75,9 +90,9 @@ var testClient *http.Client // Startup starts an asynchronous network logger that monitors // statistics for the provided tun and/or sock device. // -// The tun Device captures packets within the tailscale network, -// where at least one address is a tailscale IP address. -// The source is always from the perspective of the current node. +// The tun [Device] captures packets within the tailscale network, +// where at least one address is usually a tailscale IP address. +// The source is usually from the perspective of the current node. // If one of the other endpoint is not a tailscale IP address, // then it suggests the use of a subnet router or exit node. // For example, when using a subnet router, the source address is @@ -89,20 +104,23 @@ var testClient *http.Client // In this case, the node acting as a subnet router is acting on behalf // of some remote endpoint within the subnet range. // The tun is used to populate the VirtualTraffic, SubnetTraffic, -// and ExitTraffic fields in Message. +// and ExitTraffic fields in [netlogtype.Message]. // -// The sock Device captures packets at the magicsock layer. +// The sock [Device] captures packets at the magicsock layer. // The source is always a tailscale IP address and the destination // is a non-tailscale IP address to contact for that particular tailscale node. // The IP protocol and source port are always zero. -// The sock is used to populated the PhysicalTraffic field in Message. +// The sock is used to populated the PhysicalTraffic field in [netlogtype.Message]. +// // The netMon parameter is optional; if non-nil it's used to do faster interface lookups. -func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus, logExitFlowEnabledEnabled bool) error { +func (nl *Logger) Startup(nm *netmap.NetworkMap, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus, logExitFlowEnabledEnabled bool) error { nl.mu.Lock() defer nl.mu.Unlock() - if nl.logger != nil { - return fmt.Errorf("network logger already running for %v", nl.logger.PrivateID().Public()) + + if nl.shutdownLocked != nil { + return fmt.Errorf("network logger already running") } + nl.selfNode, nl.allNodes = makeNodeMaps(nm) // Startup a log stream to Tailscale's logging service. logf := log.Printf @@ -110,7 +128,7 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo if testClient != nil { httpc = testClient } - nl.logger = logtail.NewLogger(logtail.Config{ + logger := logtail.NewLogger(logtail.Config{ Collection: "tailtraffic.log.tailscale.io", PrivateID: nodeLogID, CopyPrivateID: domainLogID, @@ -124,108 +142,305 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo IncludeProcID: true, IncludeProcSequence: true, }, logf) - nl.logger.SetSockstatsLabel(sockstats.LabelNetlogLogger) - - // Startup a data structure to track per-connection statistics. - // There is a maximum size for individual log messages that logtail - // can upload to the Tailscale log service, so stay below this limit. - const maxLogSize = 256 << 10 - const maxConns = (maxLogSize - netlogtype.MaxMessageJSONSize) / netlogtype.MaxConnectionCountsJSONSize - nl.stats = newStatistics(pollPeriod, maxConns, func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts) { - nl.mu.Lock() - addrs := nl.addrs - prefixes := nl.prefixes - nl.mu.Unlock() - recordStatistics(nl.logger, nodeID, start, end, virtual, physical, addrs, prefixes, logExitFlowEnabledEnabled) - }) + logger.SetSockstatsLabel(sockstats.LabelNetlogLogger) // Register the connection tracker into the TUN device. - nl.tun = cmp.Or[Device](tun, noopDevice{}) - nl.tun.SetConnectionCounter(nl.stats.UpdateVirtual) + tun = cmp.Or[Device](tun, noopDevice{}) + tun.SetConnectionCounter(nl.updateVirtConn) // Register the connection tracker into magicsock. - nl.sock = cmp.Or[Device](sock, noopDevice{}) - nl.sock.SetConnectionCounter(nl.stats.UpdatePhysical) + sock = cmp.Or[Device](sock, noopDevice{}) + sock.SetConnectionCounter(nl.updatePhysConn) + + // Startup a goroutine to record log messages. + // This is done asynchronously so that the cost of serializing + // the network flow log message never stalls processing of packets. + nl.record = record{} + nl.recordLen = 0 + nl.recordsChan = make(chan record, 100) + recorderDone := make(chan struct{}) + go func(recordsChan chan record) { + defer close(recorderDone) + for rec := range recordsChan { + msg := rec.toMessage(false, !logExitFlowEnabledEnabled) + if b, err := jsonv2.Marshal(msg, jsontext.AllowInvalidUTF8(true)); err != nil { + logger.Logf("json.Marshal error: %v", err) + } else { + logger.Logf("%s", b) + } + } + }(nl.recordsChan) + + // Register the mechanism for shutting down. + nl.shutdownLocked = func(ctx context.Context) error { + tun.SetConnectionCounter(nil) + sock.SetConnectionCounter(nil) + + // Flush and process all pending records. + nl.flushRecordLocked() + close(nl.recordsChan) + nl.recordsChan = nil + <-recorderDone + recorderDone = nil + + // Try to upload all pending records. + err := logger.Shutdown(ctx) + + // Purge state. + nl.shutdownLocked = nil + nl.selfNode = nodeUser{} + nl.allNodes = nil + nl.routeAddrs = nil + nl.routePrefixes = nil + + return err + } return nil } -func recordStatistics(logger *logtail.Logger, nodeID tailcfg.StableNodeID, start, end time.Time, connStats, sockStats map[netlogtype.Connection]netlogtype.Counts, addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool, logExitFlowEnabled bool) { - m := netlogtype.Message{NodeID: nodeID, Start: start.UTC(), End: end.UTC()} +var ( + tailscaleServiceIPv4 = tsaddr.TailscaleServiceIP() + tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6() +) - classifyAddr := func(a netip.Addr) (isTailscale, withinRoute bool) { - // NOTE: There could be mis-classifications where an address is treated - // as a Tailscale IP address because the subnet range overlaps with - // the subnet range that Tailscale IP addresses are allocated from. - // This should never happen for IPv6, but could happen for IPv4. - withinRoute = addrs[a] - for p := range prefixes { - if p.Contains(a) && p.Bits() > 0 { - withinRoute = true - break - } - } - return withinRoute && tsaddr.IsTailscaleIP(a), withinRoute && !tsaddr.IsTailscaleIP(a) +func (nl *Logger) updateVirtConn(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) { + // Network logging is defined as traffic between two Tailscale nodes. + // Traffic with the internal Tailscale service is not with another node + // and should not be logged. It also happens to be a high volume + // amount of discrete traffic flows (e.g., DNS lookups). + switch dst.Addr() { + case tailscaleServiceIPv4, tailscaleServiceIPv6: + return } - exitTraffic := make(map[netlogtype.Connection]netlogtype.Counts) - for conn, cnts := range connStats { - srcIsTailscaleIP, srcWithinSubnet := classifyAddr(conn.Src.Addr()) - dstIsTailscaleIP, dstWithinSubnet := classifyAddr(conn.Dst.Addr()) - switch { - case srcIsTailscaleIP && dstIsTailscaleIP: - m.VirtualTraffic = append(m.VirtualTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) - case srcWithinSubnet || dstWithinSubnet: - m.SubnetTraffic = append(m.SubnetTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) - default: - const anonymize = true - if anonymize && !logExitFlowEnabled { - // Only preserve the address if it is a Tailscale IP address. - srcOrig, dstOrig := conn.Src, conn.Dst - conn = netlogtype.Connection{} // scrub everything by default - if srcIsTailscaleIP { - conn.Src = netip.AddrPortFrom(srcOrig.Addr(), 0) - } - if dstIsTailscaleIP { - conn.Dst = netip.AddrPortFrom(dstOrig.Addr(), 0) - } - } - exitTraffic[conn] = exitTraffic[conn].Add(cnts) + nl.mu.Lock() + defer nl.mu.Unlock() + + // Lookup the connection and increment the counts. + nl.initRecordLocked() + conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} + cnts, found := nl.record.virtConns[conn] + if !found { + cnts.connType = nl.addNewVirtConnLocked(conn) + } + if recv { + cnts.RxPackets += uint64(packets) + cnts.RxBytes += uint64(bytes) + } else { + cnts.TxPackets += uint64(packets) + cnts.TxBytes += uint64(bytes) + } + nl.record.virtConns[conn] = cnts +} + +// addNewVirtConnLocked adds the first insertion of a physical connection. +// The [Logger.mu] must be held. +func (nl *Logger) addNewVirtConnLocked(c netlogtype.Connection) connType { + // Check whether this is the first insertion of the src and dst node. + // If so, compute the additional JSON bytes that would be added + // to the record for the node information. + var srcNodeLen, dstNodeLen int + srcNode, srcSeen := nl.record.seenNodes[c.Src.Addr()] + if !srcSeen { + srcNode = nl.allNodes[c.Src.Addr()] + if srcNode.Valid() { + srcNodeLen = srcNode.jsonLen() } } - for conn, cnts := range exitTraffic { - m.ExitTraffic = append(m.ExitTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) - } - for conn, cnts := range sockStats { - m.PhysicalTraffic = append(m.PhysicalTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) + dstNode, dstSeen := nl.record.seenNodes[c.Dst.Addr()] + if !dstSeen { + dstNode = nl.allNodes[c.Dst.Addr()] + if dstNode.Valid() { + dstNodeLen = dstNode.jsonLen() + } } - if len(m.VirtualTraffic)+len(m.SubnetTraffic)+len(m.ExitTraffic)+len(m.PhysicalTraffic) > 0 { - if b, err := json.Marshal(m); err != nil { - logger.Logf("json.Marshal error: %v", err) + // Check whether the additional [netlogtype.ConnectionCounts] + // and [netlogtype.Node] information would exceed [maxLogSize]. + if nl.recordLen+netlogtype.MaxConnectionCountsJSONSize+srcNodeLen+dstNodeLen > maxLogSize { + nl.flushRecordLocked() + nl.initRecordLocked() + } + + // Insert newly seen src and/or dst nodes. + if !srcSeen && srcNode.Valid() { + nl.record.seenNodes[c.Src.Addr()] = srcNode + } + if !dstSeen && dstNode.Valid() { + nl.record.seenNodes[c.Dst.Addr()] = dstNode + } + nl.recordLen += netlogtype.MaxConnectionCountsJSONSize + srcNodeLen + dstNodeLen + + // Classify the traffic type. + var srcIsSelfNode bool + if nl.selfNode.Valid() { + srcIsSelfNode = nl.selfNode.Addresses().ContainsFunc(func(p netip.Prefix) bool { + return c.Src.Addr() == p.Addr() && p.IsSingleIP() + }) + } + switch { + case srcIsSelfNode && dstNode.Valid(): + return virtualTraffic + case srcIsSelfNode: + // TODO: Should we swap src for the node serving as the proxy? + // It is relatively useless always using the self IP address. + if nl.withinRoutesLocked(c.Dst.Addr()) { + return subnetTraffic // a client using another subnet router } else { - logger.Logf("%s", b) + return exitTraffic // a client using exit an exit node } + case dstNode.Valid(): + if nl.withinRoutesLocked(c.Src.Addr()) { + return subnetTraffic // serving as a subnet router + } else { + return exitTraffic // serving as an exit node + } + default: + return unknownTraffic } } -func makeRouteMaps(cfg *router.Config) (addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) { - addrs = make(map[netip.Addr]bool) - for _, p := range cfg.LocalAddrs { - if p.IsSingleIP() { - addrs[p.Addr()] = true +func (nl *Logger) updatePhysConn(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) { + nl.mu.Lock() + defer nl.mu.Unlock() + + // Lookup the connection and increment the counts. + nl.initRecordLocked() + conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} + cnts, found := nl.record.physConns[conn] + if !found { + nl.addNewPhysConnLocked(conn) + } + if recv { + cnts.RxPackets += uint64(packets) + cnts.RxBytes += uint64(bytes) + } else { + cnts.TxPackets += uint64(packets) + cnts.TxBytes += uint64(bytes) + } + nl.record.physConns[conn] = cnts +} + +// addNewPhysConnLocked adds the first insertion of a physical connection. +// The [Logger.mu] must be held. +func (nl *Logger) addNewPhysConnLocked(c netlogtype.Connection) { + // Check whether this is the first insertion of the src node. + var srcNodeLen int + srcNode, srcSeen := nl.record.seenNodes[c.Src.Addr()] + if !srcSeen { + srcNode = nl.allNodes[c.Src.Addr()] + if srcNode.Valid() { + srcNodeLen = srcNode.jsonLen() } } - prefixes = make(map[netip.Prefix]bool) - insertPrefixes := func(rs []netip.Prefix) { - for _, p := range rs { - if p.IsSingleIP() { - addrs[p.Addr()] = true - } else { - prefixes[p] = true + + // Check whether the additional [netlogtype.ConnectionCounts] + // and [netlogtype.Node] information would exceed [maxLogSize]. + if nl.recordLen+netlogtype.MaxConnectionCountsJSONSize+srcNodeLen > maxLogSize { + nl.flushRecordLocked() + nl.initRecordLocked() + } + + // Insert newly seen src and/or dst nodes. + if !srcSeen && srcNode.Valid() { + nl.record.seenNodes[c.Src.Addr()] = srcNode + } + nl.recordLen += netlogtype.MaxConnectionCountsJSONSize + srcNodeLen +} + +// initRecordLocked initialize the current record if uninitialized. +// The [Logger.mu] must be held. +func (nl *Logger) initRecordLocked() { + if nl.recordLen != 0 { + return + } + nl.record.selfNode = nl.selfNode + nl.record.start = time.Now().UTC() + nl.record.seenNodes = make(map[netip.Addr]nodeUser) + nl.record.virtConns = make(map[netlogtype.Connection]countsType) + nl.record.physConns = make(map[netlogtype.Connection]netlogtype.Counts) + nl.recordLen = netlogtype.MinMessageJSONSize + nl.selfNode.jsonLen() + + // Start a time to auto-flush the record. + // Avoid tickers since continually waking up a goroutine + // is expensive on battery powered devices. + nl.flushTimer = time.AfterFunc(pollPeriod, func() { + nl.mu.Lock() + defer nl.mu.Unlock() + if !nl.record.start.IsZero() && time.Since(nl.record.start) > pollPeriod/2 { + nl.flushRecordLocked() + } + }) +} + +// flushRecordLocked flushes the current record if initialized. +// The [Logger.mu] must be held. +func (nl *Logger) flushRecordLocked() { + if nl.recordLen == 0 { + return + } + nl.record.end = time.Now().UTC() + if nl.recordsChan != nil { + select { + case nl.recordsChan <- nl.record: + default: + // TODO: Buffered channel is full, drop the message. Should we log this? + } + } + if nl.flushTimer != nil { + nl.flushTimer.Stop() + nl.flushTimer = nil + } + nl.record = record{} + nl.recordLen = 0 +} + +func makeNodeMaps(nm *netmap.NetworkMap) (selfNode nodeUser, allNodes map[netip.Addr]nodeUser) { + if nm == nil { + return + } + allNodes = make(map[netip.Addr]nodeUser) + if nm.SelfNode.Valid() { + selfNode = nodeUser{nm.SelfNode, nm.UserProfiles[nm.SelfNode.User()]} + for _, addr := range nm.SelfNode.Addresses().All() { + if addr.IsSingleIP() { + allNodes[addr.Addr()] = selfNode } } } + for _, peer := range nm.Peers { + if peer.Valid() { + for _, addr := range peer.Addresses().All() { + if addr.IsSingleIP() { + allNodes[addr.Addr()] = nodeUser{peer, nm.UserProfiles[peer.User()]} + } + } + } + } + return selfNode, allNodes +} + +// ReconfigNetworkMap configures the network logger with an updated netmap. +func (nl *Logger) ReconfigNetworkMap(nm *netmap.NetworkMap) { + selfNode, allNodes := makeNodeMaps(nm) // avoid holding lock while making maps + nl.mu.Lock() + nl.selfNode, nl.allNodes = selfNode, allNodes + nl.mu.Unlock() +} + +func makeRouteMaps(cfg *router.Config) (addrs set.Set[netip.Addr], prefixes []netip.Prefix) { + addrs = make(set.Set[netip.Addr]) + insertPrefixes := func(rs []netip.Prefix) { + for _, p := range rs { + if p.IsSingleIP() { + addrs.Add(p.Addr()) + } else { + prefixes = append(prefixes, p) + } + } + } + insertPrefixes(cfg.LocalAddrs) insertPrefixes(cfg.Routes) insertPrefixes(cfg.SubnetRoutes) return addrs, prefixes @@ -235,11 +450,25 @@ func makeRouteMaps(cfg *router.Config) (addrs map[netip.Addr]bool, prefixes map[ // The cfg is used to classify the types of connections captured by // the tun Device passed to Startup. func (nl *Logger) ReconfigRoutes(cfg *router.Config) { + addrs, prefixes := makeRouteMaps(cfg) // avoid holding lock while making maps nl.mu.Lock() - defer nl.mu.Unlock() - // TODO(joetsai): There is a race where deleted routes are not known at - // the time of extraction. We need to keep old routes around for a bit. - nl.addrs, nl.prefixes = makeRouteMaps(cfg) + nl.routeAddrs, nl.routePrefixes = addrs, prefixes + nl.mu.Unlock() +} + +// withinRoutesLocked reports whether a is within the configured routes, +// which should only contain Tailscale addresses and subnet routes. +// The [Logger.mu] must be held. +func (nl *Logger) withinRoutesLocked(a netip.Addr) bool { + if nl.routeAddrs.Contains(a) { + return true + } + for _, p := range nl.routePrefixes { + if p.Contains(a) && p.Bits() > 0 { + return true + } + } + return false } // Shutdown shuts down the network logger. @@ -248,26 +477,8 @@ func (nl *Logger) ReconfigRoutes(cfg *router.Config) { func (nl *Logger) Shutdown(ctx context.Context) error { nl.mu.Lock() defer nl.mu.Unlock() - if nl.logger == nil { + if nl.shutdownLocked == nil { return nil } - - // Shutdown in reverse order of Startup. - // Do not hold lock while shutting down since this may flush one last time. - nl.mu.Unlock() - nl.sock.SetConnectionCounter(nil) - nl.tun.SetConnectionCounter(nil) - err1 := nl.stats.Shutdown(ctx) - err2 := nl.logger.Shutdown(ctx) - nl.mu.Lock() - - // Purge state. - nl.logger = nil - nl.stats = nil - nl.tun = nil - nl.sock = nil - nl.addrs = nil - nl.prefixes = nil - - return errors.Join(err1, err2) + return nl.shutdownLocked(ctx) } diff --git a/wgengine/netlog/netlog_omit.go b/wgengine/netlog/netlog_omit.go index 43209df91..03610a1ef 100644 --- a/wgengine/netlog/netlog_omit.go +++ b/wgengine/netlog/netlog_omit.go @@ -7,7 +7,8 @@ package netlog type Logger struct{} -func (*Logger) Startup(...any) error { return nil } -func (*Logger) Running() bool { return false } -func (*Logger) Shutdown(any) error { return nil } -func (*Logger) ReconfigRoutes(any) {} +func (*Logger) Startup(...any) error { return nil } +func (*Logger) Running() bool { return false } +func (*Logger) Shutdown(any) error { return nil } +func (*Logger) ReconfigNetworkMap(any) {} +func (*Logger) ReconfigRoutes(any) {} diff --git a/wgengine/netlog/netlog_test.go b/wgengine/netlog/netlog_test.go new file mode 100644 index 000000000..ed9f672bf --- /dev/null +++ b/wgengine/netlog/netlog_test.go @@ -0,0 +1,236 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !ts_omit_netlog && !ts_omit_logtail + +package netlog + +import ( + "encoding/binary" + "math/rand/v2" + "net/netip" + "sync" + "testing" + "testing/synctest" + "time" + + jsonv2 "github.com/go-json-experiment/json" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "tailscale.com/tailcfg" + "tailscale.com/types/bools" + "tailscale.com/types/ipproto" + "tailscale.com/types/netlogtype" + "tailscale.com/types/netmap" + "tailscale.com/wgengine/router" +) + +func TestEmbedNodeInfo(t *testing.T) { + // Initialize the logger with a particular view of the netmap. + var logger Logger + logger.ReconfigNetworkMap(&netmap.NetworkMap{ + SelfNode: (&tailcfg.Node{ + StableID: "n123456CNTL", + ID: 123456, + Name: "test.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.3")}, + Tags: []string{"tag:foo", "tag:bar"}, + }).View(), + Peers: []tailcfg.NodeView{ + (&tailcfg.Node{ + StableID: "n123457CNTL", + ID: 123457, + Name: "peer1.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.4")}, + Tags: []string{"tag:peer"}, + }).View(), + (&tailcfg.Node{ + StableID: "n123458CNTL", + ID: 123458, + Name: "peer2.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.5")}, + User: 54321, + }).View(), + }, + UserProfiles: map[tailcfg.UserID]tailcfg.UserProfileView{ + 54321: (&tailcfg.UserProfile{ID: 54321, LoginName: "peer@example.com"}).View(), + }, + }) + logger.ReconfigRoutes(&router.Config{ + SubnetRoutes: []netip.Prefix{ + prefix("172.16.1.1/16"), + prefix("192.168.1.1/24"), + }, + }) + + // Update the counters for a few connections. + var group sync.WaitGroup + defer group.Wait() + conns := []struct { + virt bool + proto ipproto.Proto + src, dst netip.AddrPort + txP, txB, rxP, rxB int + }{ + {true, 0x6, addrPort("100.1.2.3:80"), addrPort("100.1.2.4:1812"), 88, 278, 34, 887}, + {true, 0x6, addrPort("100.1.2.3:443"), addrPort("100.1.2.5:1742"), 96, 635, 23, 790}, + {true, 0x6, addrPort("100.1.2.3:443"), addrPort("100.1.2.6:1175"), 48, 94, 86, 618}, // unknown peer (in Tailscale IP space, but not a known peer) + {true, 0x6, addrPort("100.1.2.3:80"), addrPort("192.168.1.241:713"), 43, 154, 66, 883}, + {true, 0x6, addrPort("100.1.2.3:80"), addrPort("192.168.2.241:713"), 43, 154, 66, 883}, // not in the subnet, must be exit traffic + {true, 0x6, addrPort("100.1.2.3:80"), addrPort("172.16.5.18:713"), 7, 243, 40, 59}, + {true, 0x6, addrPort("100.1.2.3:80"), addrPort("172.20.5.18:713"), 61, 753, 42, 492}, // not in the subnet, must be exit traffic + {true, 0x6, addrPort("192.168.1.241:713"), addrPort("100.1.2.3:80"), 43, 154, 66, 883}, + {true, 0x6, addrPort("192.168.2.241:713"), addrPort("100.1.2.3:80"), 43, 154, 66, 883}, // not in the subnet, must be exit traffic + {true, 0x6, addrPort("172.16.5.18:713"), addrPort("100.1.2.3:80"), 7, 243, 40, 59}, + {true, 0x6, addrPort("172.20.5.18:713"), addrPort("100.1.2.3:80"), 61, 753, 42, 492}, // not in the subnet, must be exit traffic + {true, 0x6, addrPort("14.255.192.128:39230"), addrPort("243.42.106.193:48206"), 81, 791, 79, 316}, // unknown connection + {false, 0x6, addrPort("100.1.2.4:0"), addrPort("35.92.180.165:9743"), 63, 136, 61, 409}, // physical traffic with peer1 + {false, 0x6, addrPort("100.1.2.5:0"), addrPort("131.19.35.17:9743"), 88, 452, 2, 716}, // physical traffic with peer2 + } + for range 10 { + for _, conn := range conns { + update := bools.IfElse(conn.virt, logger.updateVirtConn, logger.updatePhysConn) + group.Go(func() { update(conn.proto, conn.src, conn.dst, conn.txP, conn.txB, false) }) + group.Go(func() { update(conn.proto, conn.src, conn.dst, conn.rxP, conn.rxB, true) }) + } + } + group.Wait() + + // Verify that the counters match. + got := logger.record.toMessage(false, false) + got.Start = time.Time{} // avoid flakiness + want := netlogtype.Message{ + NodeID: "n123456CNTL", + SrcNode: netlogtype.Node{ + NodeID: "n123456CNTL", + Name: "test.tail123456.ts.net", + Addresses: []netip.Addr{addr("100.1.2.3")}, + Tags: []string{"tag:bar", "tag:foo"}, + }, + DstNodes: []netlogtype.Node{{ + NodeID: "n123457CNTL", + Name: "peer1.tail123456.ts.net", + Addresses: []netip.Addr{addr("100.1.2.4")}, + Tags: []string{"tag:peer"}, + }, { + NodeID: "n123458CNTL", + Name: "peer2.tail123456.ts.net", + Addresses: []netip.Addr{addr("100.1.2.5")}, + User: "peer@example.com", + }}, + VirtualTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "100.1.2.3:80", "100.1.2.4:1812"), Counts: counts(880, 2780, 340, 8870)}, + {Connection: conn(0x6, "100.1.2.3:443", "100.1.2.5:1742"), Counts: counts(960, 6350, 230, 7900)}, + }, + SubnetTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "100.1.2.3:80", "172.16.5.18:713"), Counts: counts(70, 2430, 400, 590)}, + {Connection: conn(0x6, "100.1.2.3:80", "192.168.1.241:713"), Counts: counts(430, 1540, 660, 8830)}, + {Connection: conn(0x6, "172.16.5.18:713", "100.1.2.3:80"), Counts: counts(70, 2430, 400, 590)}, + {Connection: conn(0x6, "192.168.1.241:713", "100.1.2.3:80"), Counts: counts(430, 1540, 660, 8830)}, + }, + ExitTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "14.255.192.128:39230", "243.42.106.193:48206"), Counts: counts(810, 7910, 790, 3160)}, + {Connection: conn(0x6, "100.1.2.3:80", "172.20.5.18:713"), Counts: counts(610, 7530, 420, 4920)}, + {Connection: conn(0x6, "100.1.2.3:80", "192.168.2.241:713"), Counts: counts(430, 1540, 660, 8830)}, + {Connection: conn(0x6, "100.1.2.3:443", "100.1.2.6:1175"), Counts: counts(480, 940, 860, 6180)}, + {Connection: conn(0x6, "172.20.5.18:713", "100.1.2.3:80"), Counts: counts(610, 7530, 420, 4920)}, + {Connection: conn(0x6, "192.168.2.241:713", "100.1.2.3:80"), Counts: counts(430, 1540, 660, 8830)}, + }, + PhysicalTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "100.1.2.4:0", "35.92.180.165:9743"), Counts: counts(630, 1360, 610, 4090)}, + {Connection: conn(0x6, "100.1.2.5:0", "131.19.35.17:9743"), Counts: counts(880, 4520, 20, 7160)}, + }, + } + if d := cmp.Diff(got, want, cmpopts.EquateComparable(netip.Addr{}, netip.AddrPort{})); d != "" { + t.Errorf("Message (-got +want):\n%s", d) + } +} + +func TestUpdateRace(t *testing.T) { + var logger Logger + logger.recordsChan = make(chan record, 1) + go func(recordsChan chan record) { + for range recordsChan { + } + }(logger.recordsChan) + + var group sync.WaitGroup + defer group.Wait() + for i := range 1000 { + group.Go(func() { + src, dst := randAddrPort(), randAddrPort() + for j := range 1000 { + if i%2 == 0 { + logger.updateVirtConn(0x1, src, dst, rand.IntN(10), rand.IntN(1000), j%2 == 0) + } else { + logger.updatePhysConn(0x1, src, dst, rand.IntN(10), rand.IntN(1000), j%2 == 0) + } + } + }) + group.Go(func() { + for range 1000 { + logger.ReconfigNetworkMap(new(netmap.NetworkMap)) + } + }) + group.Go(func() { + for range 1000 { + logger.ReconfigRoutes(new(router.Config)) + } + }) + } + + group.Wait() + logger.mu.Lock() + close(logger.recordsChan) + logger.mu.Unlock() +} + +func randAddrPort() netip.AddrPort { + var b [4]uint8 + binary.LittleEndian.PutUint32(b[:], rand.Uint32()) + return netip.AddrPortFrom(netip.AddrFrom4(b), uint16(rand.Uint32())) +} + +func TestAutoFlushMaxConns(t *testing.T) { + var logger Logger + logger.recordsChan = make(chan record, 1) + for i := 0; len(logger.recordsChan) == 0; i++ { + logger.updateVirtConn(0, netip.AddrPortFrom(netip.Addr{}, uint16(i)), netip.AddrPort{}, 1, 1, false) + } + b, _ := jsonv2.Marshal(logger.recordsChan) + if len(b) > maxLogSize { + t.Errorf("len(Message) = %v, want <= %d", len(b), maxLogSize) + } +} + +func TestAutoFlushTimeout(t *testing.T) { + var logger Logger + logger.recordsChan = make(chan record, 1) + synctest.Test(t, func(t *testing.T) { + logger.updateVirtConn(0, netip.AddrPort{}, netip.AddrPort{}, 1, 1, false) + time.Sleep(pollPeriod) + }) + rec := <-logger.recordsChan + if d := rec.end.Sub(rec.start); d != pollPeriod { + t.Errorf("window = %v, want %v", d, pollPeriod) + } + if len(rec.virtConns) != 1 { + t.Errorf("len(virtConns) = %d, want 1", len(rec.virtConns)) + } +} + +func BenchmarkUpdateSameConn(b *testing.B) { + var logger Logger + b.ReportAllocs() + for range b.N { + logger.updateVirtConn(0, netip.AddrPort{}, netip.AddrPort{}, 1, 1, false) + } +} + +func BenchmarkUpdateNewConns(b *testing.B) { + var logger Logger + b.ReportAllocs() + for i := range b.N { + logger.updateVirtConn(0, netip.AddrPortFrom(netip.Addr{}, uint16(i)), netip.AddrPort{}, 1, 1, false) + } +} diff --git a/wgengine/netlog/record.go b/wgengine/netlog/record.go new file mode 100644 index 000000000..b8db26fc5 --- /dev/null +++ b/wgengine/netlog/record.go @@ -0,0 +1,196 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !ts_omit_netlog && !ts_omit_logtail + +package netlog + +import ( + "cmp" + "net/netip" + "slices" + "time" + "unicode/utf8" + + "tailscale.com/tailcfg" + "tailscale.com/types/netlogtype" + "tailscale.com/util/set" +) + +// maxLogSize is the maximum number of bytes for a log message. +const maxLogSize = 256 << 10 + +// record is the in-memory representation of a [netlogtype.Message]. +// It uses maps to efficiently look-up addresses and connections. +// In contrast, [netlogtype.Message] is designed to be JSON serializable, +// where complex keys types are not well support in JSON objects. +type record struct { + selfNode nodeUser + + start time.Time + end time.Time + + seenNodes map[netip.Addr]nodeUser + + virtConns map[netlogtype.Connection]countsType + physConns map[netlogtype.Connection]netlogtype.Counts +} + +// nodeUser is a node with additional user profile information. +type nodeUser struct { + tailcfg.NodeView + user tailcfg.UserProfileView // UserProfileView for NodeView.User +} + +// countsType is a counts with classification information about the connection. +type countsType struct { + netlogtype.Counts + connType connType +} + +type connType uint8 + +const ( + unknownTraffic connType = iota + virtualTraffic + subnetTraffic + exitTraffic +) + +// toMessage converts a [record] into a [netlogtype.Message]. +func (r record) toMessage(excludeNodeInfo, anonymizeExitTraffic bool) netlogtype.Message { + if !r.selfNode.Valid() { + return netlogtype.Message{} + } + + m := netlogtype.Message{ + NodeID: r.selfNode.StableID(), + Start: r.start.UTC(), + End: r.end.UTC(), + } + + // Convert node fields. + if !excludeNodeInfo { + m.SrcNode = r.selfNode.toNode() + seenIDs := set.Of(r.selfNode.ID()) + for _, node := range r.seenNodes { + if _, ok := seenIDs[node.ID()]; !ok && node.Valid() { + m.DstNodes = append(m.DstNodes, node.toNode()) + seenIDs.Add(node.ID()) + } + } + slices.SortFunc(m.DstNodes, func(x, y netlogtype.Node) int { + return cmp.Compare(x.NodeID, y.NodeID) + }) + } + + // Converter traffic fields. + anonymizedExitTraffic := make(map[netlogtype.Connection]netlogtype.Counts) + for conn, cnts := range r.virtConns { + switch cnts.connType { + case virtualTraffic: + m.VirtualTraffic = append(m.VirtualTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts.Counts}) + case subnetTraffic: + m.SubnetTraffic = append(m.SubnetTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts.Counts}) + default: + if anonymizeExitTraffic { + conn = netlogtype.Connection{ // scrub the IP protocol type + Src: netip.AddrPortFrom(conn.Src.Addr(), 0), // scrub the port number + Dst: netip.AddrPortFrom(conn.Dst.Addr(), 0), // scrub the port number + } + if !r.seenNodes[conn.Src.Addr()].Valid() { + conn.Src = netip.AddrPort{} // not a Tailscale node, so scrub the address + } + if !r.seenNodes[conn.Dst.Addr()].Valid() { + conn.Dst = netip.AddrPort{} // not a Tailscale node, so scrub the address + } + anonymizedExitTraffic[conn] = anonymizedExitTraffic[conn].Add(cnts.Counts) + continue + } + m.ExitTraffic = append(m.ExitTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts.Counts}) + } + } + for conn, cnts := range anonymizedExitTraffic { + m.ExitTraffic = append(m.ExitTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) + } + for conn, cnts := range r.physConns { + m.PhysicalTraffic = append(m.PhysicalTraffic, netlogtype.ConnectionCounts{Connection: conn, Counts: cnts}) + } + + // Sort the connections for deterministic results. + slices.SortFunc(m.VirtualTraffic, compareConnCnts) + slices.SortFunc(m.SubnetTraffic, compareConnCnts) + slices.SortFunc(m.ExitTraffic, compareConnCnts) + slices.SortFunc(m.PhysicalTraffic, compareConnCnts) + + return m +} + +func compareConnCnts(x, y netlogtype.ConnectionCounts) int { + return cmp.Or( + netip.AddrPort.Compare(x.Src, y.Src), + netip.AddrPort.Compare(x.Dst, y.Dst), + cmp.Compare(x.Proto, y.Proto)) +} + +// jsonLen computes an upper-bound on the size of the JSON representation. +func (nu nodeUser) jsonLen() int { + if !nu.Valid() { + return len(`{"nodeId":""}`) + } + n := netlogtype.MinNodeJSONSize + jsonQuotedLen(nu.Name()) + if nu.Tags().Len() > 0 { + for _, tag := range nu.Tags().All() { + n += jsonQuotedLen(tag) + len(",") + } + } else if nu.user.Valid() && nu.user.ID() == nu.User() { + n += jsonQuotedLen(nu.user.LoginName()) + } + return n +} + +// toNode converts the [nodeUser] into a [netlogtype.Node]. +func (nu nodeUser) toNode() netlogtype.Node { + if !nu.Valid() { + return netlogtype.Node{} + } + n := netlogtype.Node{NodeID: nu.StableID(), Name: nu.Name()} + var ipv4, ipv6 netip.Addr + for _, addr := range nu.Addresses().All() { + switch { + case addr.IsSingleIP() && addr.Addr().Is4(): + ipv4 = addr.Addr() + case addr.IsSingleIP() && addr.Addr().Is6(): + ipv6 = addr.Addr() + } + } + n.Addresses = []netip.Addr{ipv4, ipv6} + n.Addresses = slices.DeleteFunc(n.Addresses, func(a netip.Addr) bool { return !a.IsValid() }) + if nu.Tags().Len() > 0 { + n.Tags = nu.Tags().AsSlice() + slices.Sort(n.Tags) + n.Tags = slices.Compact(n.Tags) + } else if nu.user.Valid() && nu.user.ID() == nu.User() { + n.User = nu.user.LoginName() + } + return n +} + +// jsonQuotedLen computes the length of the JSON serialization of s +// according to [jsontext.AppendQuote]. +func jsonQuotedLen(s string) int { + n := len(`"`) + len(s) + len(`"`) + for i, r := range s { + switch { + case r == '\b', r == '\t', r == '\n', r == '\f', r == '\r', r == '"', r == '\\': + n += len(`\X`) - 1 + case r < ' ': + n += len(`\uXXXX`) - 1 + case r == utf8.RuneError: + if _, m := utf8.DecodeRuneInString(s[i:]); m == 1 { // exactly an invalid byte + n += len("�") - 1 + } + } + } + return n +} diff --git a/wgengine/netlog/record_test.go b/wgengine/netlog/record_test.go new file mode 100644 index 000000000..d3ab8b86c --- /dev/null +++ b/wgengine/netlog/record_test.go @@ -0,0 +1,255 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !ts_omit_netlog && !ts_omit_logtail + +package netlog + +import ( + "net/netip" + "testing" + "time" + + jsonv2 "github.com/go-json-experiment/json" + "github.com/go-json-experiment/json/jsontext" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "tailscale.com/tailcfg" + "tailscale.com/types/ipproto" + "tailscale.com/types/netlogtype" + "tailscale.com/util/must" +) + +func addr(s string) netip.Addr { + if s == "" { + return netip.Addr{} + } + return must.Get(netip.ParseAddr(s)) +} +func addrPort(s string) netip.AddrPort { + if s == "" { + return netip.AddrPort{} + } + return must.Get(netip.ParseAddrPort(s)) +} +func prefix(s string) netip.Prefix { + if p, err := netip.ParsePrefix(s); err == nil { + return p + } + a := addr(s) + return netip.PrefixFrom(a, a.BitLen()) +} + +func conn(proto ipproto.Proto, src, dst string) netlogtype.Connection { + return netlogtype.Connection{Proto: proto, Src: addrPort(src), Dst: addrPort(dst)} +} + +func counts(txP, txB, rxP, rxB uint64) netlogtype.Counts { + return netlogtype.Counts{TxPackets: txP, TxBytes: txB, RxPackets: rxP, RxBytes: rxB} +} + +func TestToMessage(t *testing.T) { + rec := record{ + selfNode: nodeUser{NodeView: (&tailcfg.Node{ + ID: 123456, + StableID: "n123456CNTL", + Name: "src.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.3")}, + Tags: []string{"tag:src"}, + }).View()}, + start: time.Now(), + end: time.Now().Add(5 * time.Second), + + seenNodes: map[netip.Addr]nodeUser{ + addr("100.1.2.4"): {NodeView: (&tailcfg.Node{ + ID: 123457, + StableID: "n123457CNTL", + Name: "dst1.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.4")}, + Tags: []string{"tag:dst1"}, + }).View()}, + addr("100.1.2.5"): {NodeView: (&tailcfg.Node{ + ID: 123458, + StableID: "n123458CNTL", + Name: "dst2.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.5")}, + Tags: []string{"tag:dst2"}, + }).View()}, + }, + + virtConns: map[netlogtype.Connection]countsType{ + conn(0x1, "100.1.2.3:1234", "100.1.2.4:80"): {Counts: counts(12, 34, 56, 78), connType: virtualTraffic}, + conn(0x1, "100.1.2.3:1234", "100.1.2.5:80"): {Counts: counts(23, 45, 78, 790), connType: virtualTraffic}, + conn(0x6, "172.16.1.1:80", "100.1.2.4:1234"): {Counts: counts(91, 54, 723, 621), connType: subnetTraffic}, + conn(0x6, "172.16.1.2:443", "100.1.2.5:1234"): {Counts: counts(42, 813, 3, 1823), connType: subnetTraffic}, + conn(0x6, "172.16.1.3:80", "100.1.2.6:1234"): {Counts: counts(34, 52, 78, 790), connType: subnetTraffic}, + conn(0x6, "100.1.2.3:1234", "12.34.56.78:80"): {Counts: counts(11, 110, 10, 100), connType: exitTraffic}, + conn(0x6, "100.1.2.4:1234", "23.34.56.78:80"): {Counts: counts(423, 1, 6, 123), connType: exitTraffic}, + conn(0x6, "100.1.2.4:1234", "23.34.56.78:443"): {Counts: counts(22, 220, 20, 200), connType: exitTraffic}, + conn(0x6, "100.1.2.5:1234", "45.34.56.78:80"): {Counts: counts(33, 330, 30, 300), connType: exitTraffic}, + conn(0x6, "100.1.2.6:1234", "67.34.56.78:80"): {Counts: counts(44, 440, 40, 400), connType: exitTraffic}, + conn(0x6, "42.54.72.42:555", "18.42.7.1:777"): {Counts: counts(44, 440, 40, 400)}, + }, + + physConns: map[netlogtype.Connection]netlogtype.Counts{ + conn(0, "100.1.2.4:0", "4.3.2.1:1234"): counts(12, 34, 56, 78), + conn(0, "100.1.2.5:0", "4.3.2.10:1234"): counts(78, 56, 34, 12), + }, + } + rec.seenNodes[rec.selfNode.toNode().Addresses[0]] = rec.selfNode + + got := rec.toMessage(false, false) + want := netlogtype.Message{ + NodeID: rec.selfNode.StableID(), + Start: rec.start, + End: rec.end, + SrcNode: rec.selfNode.toNode(), + DstNodes: []netlogtype.Node{ + rec.seenNodes[addr("100.1.2.4")].toNode(), + rec.seenNodes[addr("100.1.2.5")].toNode(), + }, + VirtualTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x1, "100.1.2.3:1234", "100.1.2.4:80"), Counts: counts(12, 34, 56, 78)}, + {Connection: conn(0x1, "100.1.2.3:1234", "100.1.2.5:80"), Counts: counts(23, 45, 78, 790)}, + }, + SubnetTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "172.16.1.1:80", "100.1.2.4:1234"), Counts: counts(91, 54, 723, 621)}, + {Connection: conn(0x6, "172.16.1.2:443", "100.1.2.5:1234"), Counts: counts(42, 813, 3, 1823)}, + {Connection: conn(0x6, "172.16.1.3:80", "100.1.2.6:1234"), Counts: counts(34, 52, 78, 790)}, + }, + ExitTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0x6, "42.54.72.42:555", "18.42.7.1:777"), Counts: counts(44, 440, 40, 400)}, + {Connection: conn(0x6, "100.1.2.3:1234", "12.34.56.78:80"), Counts: counts(11, 110, 10, 100)}, + {Connection: conn(0x6, "100.1.2.4:1234", "23.34.56.78:80"), Counts: counts(423, 1, 6, 123)}, + {Connection: conn(0x6, "100.1.2.4:1234", "23.34.56.78:443"), Counts: counts(22, 220, 20, 200)}, + {Connection: conn(0x6, "100.1.2.5:1234", "45.34.56.78:80"), Counts: counts(33, 330, 30, 300)}, + {Connection: conn(0x6, "100.1.2.6:1234", "67.34.56.78:80"), Counts: counts(44, 440, 40, 400)}, + }, + PhysicalTraffic: []netlogtype.ConnectionCounts{ + {Connection: conn(0, "100.1.2.4:0", "4.3.2.1:1234"), Counts: counts(12, 34, 56, 78)}, + {Connection: conn(0, "100.1.2.5:0", "4.3.2.10:1234"), Counts: counts(78, 56, 34, 12)}, + }, + } + if d := cmp.Diff(got, want, cmpopts.EquateComparable(netip.Addr{}, netip.AddrPort{})); d != "" { + t.Errorf("toMessage(false, false) mismatch (-got +want):\n%s", d) + } + + got = rec.toMessage(true, false) + want.SrcNode = netlogtype.Node{} + want.DstNodes = nil + if d := cmp.Diff(got, want, cmpopts.EquateComparable(netip.Addr{}, netip.AddrPort{})); d != "" { + t.Errorf("toMessage(true, false) mismatch (-got +want):\n%s", d) + } + + got = rec.toMessage(true, true) + want.ExitTraffic = []netlogtype.ConnectionCounts{ + {Connection: conn(0, "", ""), Counts: counts(44+44, 440+440, 40+40, 400+400)}, + {Connection: conn(0, "100.1.2.3:0", ""), Counts: counts(11, 110, 10, 100)}, + {Connection: conn(0, "100.1.2.4:0", ""), Counts: counts(423+22, 1+220, 6+20, 123+200)}, + {Connection: conn(0, "100.1.2.5:0", ""), Counts: counts(33, 330, 30, 300)}, + } + if d := cmp.Diff(got, want, cmpopts.EquateComparable(netip.Addr{}, netip.AddrPort{})); d != "" { + t.Errorf("toMessage(true, true) mismatch (-got +want):\n%s", d) + } +} + +func TestToNode(t *testing.T) { + tests := []struct { + node *tailcfg.Node + user *tailcfg.UserProfile + want netlogtype.Node + }{ + {}, + { + node: &tailcfg.Node{ + StableID: "n123456CNTL", + Name: "test.tail123456.ts.net", + Addresses: []netip.Prefix{prefix("100.1.2.3")}, + Tags: []string{"tag:dupe", "tag:test", "tag:dupe"}, + User: 12345, // should be ignored + }, + want: netlogtype.Node{ + NodeID: "n123456CNTL", + Name: "test.tail123456.ts.net", + Addresses: []netip.Addr{addr("100.1.2.3")}, + Tags: []string{"tag:dupe", "tag:test"}, + }, + }, + { + node: &tailcfg.Node{ + StableID: "n123456CNTL", + Addresses: []netip.Prefix{prefix("100.1.2.3")}, + User: 12345, + }, + want: netlogtype.Node{ + NodeID: "n123456CNTL", + Addresses: []netip.Addr{addr("100.1.2.3")}, + }, + }, + { + node: &tailcfg.Node{ + StableID: "n123456CNTL", + Addresses: []netip.Prefix{prefix("100.1.2.3")}, + User: 12345, + }, + user: &tailcfg.UserProfile{ + ID: 12345, + LoginName: "user@domain", + }, + want: netlogtype.Node{ + NodeID: "n123456CNTL", + Addresses: []netip.Addr{addr("100.1.2.3")}, + User: "user@domain", + }, + }, + } + for _, tt := range tests { + nu := nodeUser{tt.node.View(), tt.user.View()} + got := nu.toNode() + b := must.Get(jsonv2.Marshal(got)) + if len(b) > nu.jsonLen() { + t.Errorf("jsonLen = %v, want >= %d", nu.jsonLen(), len(b)) + } + if d := cmp.Diff(got, tt.want, cmpopts.EquateComparable(netip.Addr{})); d != "" { + t.Errorf("toNode mismatch (-got +want):\n%s", d) + } + } +} + +func FuzzQuotedLen(f *testing.F) { + for _, s := range quotedLenTestdata { + f.Add(s) + } + f.Fuzz(func(t *testing.T, s string) { + testQuotedLen(t, s) + }) +} + +func TestQuotedLen(t *testing.T) { + for _, s := range quotedLenTestdata { + testQuotedLen(t, s) + } +} + +var quotedLenTestdata = []string{ + "", // empty string + func() string { + b := make([]byte, 128) + for i := range b { + b[i] = byte(i) + } + return string(b) + }(), // all ASCII + "�", // replacement rune + "\xff", // invalid UTF-8 + "ʕ◔ϖ◔ʔ", // Unicode gopher +} + +func testQuotedLen(t *testing.T, in string) { + got := jsonQuotedLen(in) + b, _ := jsontext.AppendQuote(nil, in) + want := len(b) + if got != want { + t.Errorf("jsonQuotedLen(%q) = %v, want %v", in, got, want) + } +} diff --git a/wgengine/netlog/stats.go b/wgengine/netlog/stats.go deleted file mode 100644 index c06068803..000000000 --- a/wgengine/netlog/stats.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -//go:build !ts_omit_netlog && !ts_omit_logtail - -package netlog - -import ( - "context" - "net/netip" - "sync" - "time" - - "golang.org/x/sync/errgroup" - "tailscale.com/net/packet" - "tailscale.com/net/tsaddr" - "tailscale.com/types/ipproto" - "tailscale.com/types/netlogtype" -) - -// statistics maintains counters for every connection. -// All methods are safe for concurrent use. -// The zero value is ready for use. -type statistics struct { - maxConns int // immutable once set - - mu sync.Mutex - connCnts - - connCntsCh chan connCnts - shutdownCtx context.Context - shutdown context.CancelFunc - group errgroup.Group -} - -type connCnts struct { - start time.Time - end time.Time - virtual map[netlogtype.Connection]netlogtype.Counts - physical map[netlogtype.Connection]netlogtype.Counts -} - -// newStatistics creates a data structure for tracking connection statistics -// that periodically dumps the virtual and physical connection counts -// depending on whether the maxPeriod or maxConns is exceeded. -// The dump function is called from a single goroutine. -// Shutdown must be called to cleanup resources. -func newStatistics(maxPeriod time.Duration, maxConns int, dump func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts)) *statistics { - s := &statistics{maxConns: maxConns} - s.connCntsCh = make(chan connCnts, 256) - s.shutdownCtx, s.shutdown = context.WithCancel(context.Background()) - s.group.Go(func() error { - // TODO(joetsai): Using a ticker is problematic on mobile platforms - // where waking up a process every maxPeriod when there is no activity - // is a drain on battery life. Switch this instead to instead use - // a time.Timer that is triggered upon network activity. - ticker := new(time.Ticker) - if maxPeriod > 0 { - ticker = time.NewTicker(maxPeriod) - defer ticker.Stop() - } - - for { - var cc connCnts - select { - case cc = <-s.connCntsCh: - case <-ticker.C: - cc = s.extract() - case <-s.shutdownCtx.Done(): - cc = s.extract() - } - if len(cc.virtual)+len(cc.physical) > 0 && dump != nil { - dump(cc.start, cc.end, cc.virtual, cc.physical) - } - if s.shutdownCtx.Err() != nil { - return nil - } - } - }) - return s -} - -// UpdateTxVirtual updates the counters for a transmitted IP packet -// The source and destination of the packet directly correspond with -// the source and destination in netlogtype.Connection. -func (s *statistics) UpdateTxVirtual(b []byte) { - var p packet.Parsed - p.Decode(b) - s.UpdateVirtual(p.IPProto, p.Src, p.Dst, 1, len(b), false) -} - -// UpdateRxVirtual updates the counters for a received IP packet. -// The source and destination of the packet are inverted with respect to -// the source and destination in netlogtype.Connection. -func (s *statistics) UpdateRxVirtual(b []byte) { - var p packet.Parsed - p.Decode(b) - s.UpdateVirtual(p.IPProto, p.Dst, p.Src, 1, len(b), true) -} - -var ( - tailscaleServiceIPv4 = tsaddr.TailscaleServiceIP() - tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6() -) - -func (s *statistics) UpdateVirtual(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) { - // Network logging is defined as traffic between two Tailscale nodes. - // Traffic with the internal Tailscale service is not with another node - // and should not be logged. It also happens to be a high volume - // amount of discrete traffic flows (e.g., DNS lookups). - switch dst.Addr() { - case tailscaleServiceIPv4, tailscaleServiceIPv6: - return - } - - conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} - - s.mu.Lock() - defer s.mu.Unlock() - cnts, found := s.virtual[conn] - if !found && !s.preInsertConn() { - return - } - if receive { - cnts.RxPackets += uint64(packets) - cnts.RxBytes += uint64(bytes) - } else { - cnts.TxPackets += uint64(packets) - cnts.TxBytes += uint64(bytes) - } - s.virtual[conn] = cnts -} - -// UpdateTxPhysical updates the counters for zero or more transmitted wireguard packets. -// The src is always a Tailscale IP address, representing some remote peer. -// The dst is a remote IP address and port that corresponds -// with some physical peer backing the Tailscale IP address. -func (s *statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) { - s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, false) -} - -// UpdateRxPhysical updates the counters for zero or more received wireguard packets. -// The src is always a Tailscale IP address, representing some remote peer. -// The dst is a remote IP address and port that corresponds -// with some physical peer backing the Tailscale IP address. -func (s *statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) { - s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, true) -} - -func (s *statistics) UpdatePhysical(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) { - conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} - - s.mu.Lock() - defer s.mu.Unlock() - cnts, found := s.physical[conn] - if !found && !s.preInsertConn() { - return - } - if receive { - cnts.RxPackets += uint64(packets) - cnts.RxBytes += uint64(bytes) - } else { - cnts.TxPackets += uint64(packets) - cnts.TxBytes += uint64(bytes) - } - s.physical[conn] = cnts -} - -// preInsertConn updates the maps to handle insertion of a new connection. -// It reports false if insertion is not allowed (i.e., after shutdown). -func (s *statistics) preInsertConn() bool { - // Check whether insertion of a new connection will exceed maxConns. - if len(s.virtual)+len(s.physical) == s.maxConns && s.maxConns > 0 { - // Extract the current statistics and send it to the serializer. - // Avoid blocking the network packet handling path. - select { - case s.connCntsCh <- s.extractLocked(): - default: - // TODO(joetsai): Log that we are dropping an entire connCounts. - } - } - - // Initialize the maps if nil. - if s.virtual == nil && s.physical == nil { - s.start = time.Now().UTC() - s.virtual = make(map[netlogtype.Connection]netlogtype.Counts) - s.physical = make(map[netlogtype.Connection]netlogtype.Counts) - } - - return s.shutdownCtx.Err() == nil -} - -func (s *statistics) extract() connCnts { - s.mu.Lock() - defer s.mu.Unlock() - return s.extractLocked() -} - -func (s *statistics) extractLocked() connCnts { - if len(s.virtual)+len(s.physical) == 0 { - return connCnts{} - } - s.end = time.Now().UTC() - cc := s.connCnts - s.connCnts = connCnts{} - return cc -} - -// TestExtract synchronously extracts the current network statistics map -// and resets the counters. This should only be used for testing purposes. -func (s *statistics) TestExtract() (virtual, physical map[netlogtype.Connection]netlogtype.Counts) { - cc := s.extract() - return cc.virtual, cc.physical -} - -// Shutdown performs a final flush of statistics. -// Statistics for any subsequent calls to Update will be dropped. -// It is safe to call Shutdown concurrently and repeatedly. -func (s *statistics) Shutdown(context.Context) error { - s.shutdown() - return s.group.Wait() -} diff --git a/wgengine/netlog/stats_test.go b/wgengine/netlog/stats_test.go deleted file mode 100644 index 6cf7eb998..000000000 --- a/wgengine/netlog/stats_test.go +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -package netlog - -import ( - "context" - "encoding/binary" - "fmt" - "math/rand" - "net/netip" - "runtime" - "sync" - "testing" - "time" - - qt "github.com/frankban/quicktest" - "tailscale.com/cmd/testwrapper/flakytest" - "tailscale.com/types/ipproto" - "tailscale.com/types/netlogtype" -) - -func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) { - var ipHdr [20]byte - ipHdr[0] = 4<<4 | 5 - binary.BigEndian.PutUint16(ipHdr[2:], size) - ipHdr[9] = byte(proto) - *(*[4]byte)(ipHdr[12:]) = srcAddr - *(*[4]byte)(ipHdr[16:]) = dstAddr - out = append(out, ipHdr[:]...) - switch proto { - case ipproto.TCP: - var tcpHdr [20]byte - binary.BigEndian.PutUint16(tcpHdr[0:], srcPort) - binary.BigEndian.PutUint16(tcpHdr[2:], dstPort) - out = append(out, tcpHdr[:]...) - case ipproto.UDP: - var udpHdr [8]byte - binary.BigEndian.PutUint16(udpHdr[0:], srcPort) - binary.BigEndian.PutUint16(udpHdr[2:], dstPort) - out = append(out, udpHdr[:]...) - default: - panic(fmt.Sprintf("unknown proto: %d", proto)) - } - return append(out, make([]byte, int(size)-len(out))...) -} - -// TestInterval ensures that we receive at least one call to `dump` using only -// maxPeriod. -func TestInterval(t *testing.T) { - c := qt.New(t) - - const maxPeriod = 10 * time.Millisecond - const maxConns = 2048 - - gotDump := make(chan struct{}, 1) - stats := newStatistics(maxPeriod, maxConns, func(_, _ time.Time, _, _ map[netlogtype.Connection]netlogtype.Counts) { - select { - case gotDump <- struct{}{}: - default: - } - }) - defer stats.Shutdown(context.Background()) - - srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) - dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) - srcPort := uint16(rand.Intn(16)) - dstPort := uint16(rand.Intn(16)) - size := uint16(64 + rand.Intn(1024)) - p := testPacketV4(ipproto.TCP, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size) - stats.UpdateRxVirtual(p) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - select { - case <-ctx.Done(): - c.Fatal("didn't receive dump within context deadline") - case <-gotDump: - } -} - -func TestConcurrent(t *testing.T) { - flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/7030") - c := qt.New(t) - - const maxPeriod = 10 * time.Millisecond - const maxConns = 10 - virtualAggregate := make(map[netlogtype.Connection]netlogtype.Counts) - stats := newStatistics(maxPeriod, maxConns, func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts) { - c.Assert(start.IsZero(), qt.IsFalse) - c.Assert(end.IsZero(), qt.IsFalse) - c.Assert(end.Before(start), qt.IsFalse) - c.Assert(len(virtual) > 0 && len(virtual) <= maxConns, qt.IsTrue) - c.Assert(len(physical) == 0, qt.IsTrue) - for conn, cnts := range virtual { - virtualAggregate[conn] = virtualAggregate[conn].Add(cnts) - } - }) - defer stats.Shutdown(context.Background()) - var wants []map[netlogtype.Connection]netlogtype.Counts - gots := make([]map[netlogtype.Connection]netlogtype.Counts, runtime.NumCPU()) - var group sync.WaitGroup - for i := range gots { - group.Add(1) - go func(i int) { - defer group.Done() - gots[i] = make(map[netlogtype.Connection]netlogtype.Counts) - rn := rand.New(rand.NewSource(time.Now().UnixNano())) - var p []byte - var t netlogtype.Connection - for j := 0; j < 1000; j++ { - delay := rn.Intn(10000) - if p == nil || rn.Intn(64) == 0 { - proto := ipproto.TCP - if rn.Intn(2) == 0 { - proto = ipproto.UDP - } - srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) - dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))}) - srcPort := uint16(rand.Intn(16)) - dstPort := uint16(rand.Intn(16)) - size := uint16(64 + rand.Intn(1024)) - p = testPacketV4(proto, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size) - t = netlogtype.Connection{Proto: proto, Src: netip.AddrPortFrom(srcAddr, srcPort), Dst: netip.AddrPortFrom(dstAddr, dstPort)} - } - t2 := t - receive := rn.Intn(2) == 0 - if receive { - t2.Src, t2.Dst = t2.Dst, t2.Src - } - - cnts := gots[i][t2] - if receive { - stats.UpdateRxVirtual(p) - cnts.RxPackets++ - cnts.RxBytes += uint64(len(p)) - } else { - cnts.TxPackets++ - cnts.TxBytes += uint64(len(p)) - stats.UpdateTxVirtual(p) - } - gots[i][t2] = cnts - time.Sleep(time.Duration(rn.Intn(1 + delay))) - } - }(i) - } - group.Wait() - c.Assert(stats.Shutdown(context.Background()), qt.IsNil) - wants = append(wants, virtualAggregate) - - got := make(map[netlogtype.Connection]netlogtype.Counts) - want := make(map[netlogtype.Connection]netlogtype.Counts) - mergeMaps(got, gots...) - mergeMaps(want, wants...) - c.Assert(got, qt.DeepEquals, want) -} - -func mergeMaps(dst map[netlogtype.Connection]netlogtype.Counts, srcs ...map[netlogtype.Connection]netlogtype.Counts) { - for _, src := range srcs { - for conn, cnts := range src { - dst[conn] = dst[conn].Add(cnts) - } - } -} - -func Benchmark(b *testing.B) { - // TODO: Test IPv6 packets? - b.Run("SingleRoutine/SameConn", func(b *testing.B) { - p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) - b.ResetTimer() - b.ReportAllocs() - for range b.N { - s := newStatistics(0, 0, nil) - for j := 0; j < 1e3; j++ { - s.UpdateTxVirtual(p) - } - } - }) - b.Run("SingleRoutine/UniqueConns", func(b *testing.B) { - p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789) - b.ResetTimer() - b.ReportAllocs() - for range b.N { - s := newStatistics(0, 0, nil) - for j := 0; j < 1e3; j++ { - binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination - s.UpdateTxVirtual(p) - } - } - }) - b.Run("MultiRoutine/SameConn", func(b *testing.B) { - p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789) - b.ResetTimer() - b.ReportAllocs() - for range b.N { - s := newStatistics(0, 0, nil) - var group sync.WaitGroup - for j := 0; j < runtime.NumCPU(); j++ { - group.Add(1) - go func() { - defer group.Done() - for k := 0; k < 1e3; k++ { - s.UpdateTxVirtual(p) - } - }() - } - group.Wait() - } - }) - b.Run("MultiRoutine/UniqueConns", func(b *testing.B) { - ps := make([][]byte, runtime.NumCPU()) - for i := range ps { - ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789) - } - b.ResetTimer() - b.ReportAllocs() - for range b.N { - s := newStatistics(0, 0, nil) - var group sync.WaitGroup - for j := 0; j < runtime.NumCPU(); j++ { - group.Add(1) - go func(j int) { - defer group.Done() - p := ps[j] - j *= 1e3 - for k := 0; k < 1e3; k++ { - binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination - s.UpdateTxVirtual(p) - } - }(j) - } - group.Wait() - } - }) -} diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 8856a3eaf..30999a7be 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -1055,7 +1055,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, tid := cfg.NetworkLogging.DomainID logExitFlowEnabled := cfg.NetworkLogging.LogExitFlowEnabled e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public()) - if err := e.networkLogger.Startup(cfg.NodeID, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, e.eventBus, logExitFlowEnabled); err != nil { + if err := e.networkLogger.Startup(nm, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, e.eventBus, logExitFlowEnabled); err != nil { e.logf("wgengine: Reconfig: error starting up network logger: %v", err) } e.networkLogger.ReconfigRoutes(routerCfg) @@ -1352,6 +1352,9 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) { e.mu.Lock() e.netMap = nm e.mu.Unlock() + if e.networkLogger.Running() { + e.networkLogger.ReconfigNetworkMap(nm) + } } func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {