tailscale/wgengine/netlog/logger.go
Joe Tsai f9120eee57
wgengine: start network logger in Userspace.Reconfig (#5908)
If the wgcfg.Config is specified with network logging arguments,
then Userspace.Reconfig starts up an asynchronous network logger,
which is shutdown either upon Userspace.Close or when Userspace.Reconfig
is called again without network logging or route arguments.

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
2022-10-12 15:05:21 -07:00

246 lines
6.7 KiB
Go

// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package netlog provides a logger that monitors a TUN device and
// periodically records any traffic into a log stream.
package netlog
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/netip"
"sync"
"time"
"golang.org/x/sync/errgroup"
"tailscale.com/logpolicy"
"tailscale.com/logtail"
"tailscale.com/net/flowtrack"
"tailscale.com/net/tsaddr"
"tailscale.com/net/tunstats"
"tailscale.com/smallzstd"
"tailscale.com/wgengine/router"
)
// pollPeriod specifies how often to poll for network traffic.
const pollPeriod = 5 * time.Second
// Device is an abstraction over a tunnel device.
// *tstun.Wrapper implements this interface.
type Device interface {
SetStatisticsEnabled(bool)
ExtractStatistics() map[flowtrack.Tuple]tunstats.Counts
}
// Logger logs statistics about every connection.
// At present, it only logs connections within a tailscale network.
// Exit node traffic is not logged for privacy reasons.
// The zero value is ready for use.
type Logger struct {
mu sync.Mutex
logger *logtail.Logger
addrs map[netip.Addr]bool
prefixes map[netip.Prefix]bool
group errgroup.Group
cancel context.CancelFunc
}
// Running reports whether the logger is running.
func (nl *Logger) Running() bool {
nl.mu.Lock()
defer nl.mu.Unlock()
return nl.logger != nil
}
var testClient *http.Client
// Startup starts an asynchronous network logger that monitors
// statistics for the provided tun device.
// The provided cfg is used to classify the types of connections.
func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device) error {
nl.mu.Lock()
defer nl.mu.Unlock()
if nl.logger != nil {
return fmt.Errorf("network logger already running for %v", nl.logger.PrivateID().Public())
}
httpc := &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)}
if testClient != nil {
httpc = testClient
}
logger := logtail.NewLogger(logtail.Config{
Collection: "tailtraffic.log.tailscale.io",
PrivateID: nodeID,
CopyPrivateID: domainID,
Stderr: io.Discard,
// TODO(joetsai): Set Buffer? Use an in-memory buffer for now.
NewZstdEncoder: func() logtail.Encoder {
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: httpc,
// Include process sequence numbers to identify missing samples.
IncludeProcID: true,
IncludeProcSequence: true,
}, log.Printf)
nl.logger = logger
ctx, cancel := context.WithCancel(context.Background())
nl.cancel = cancel
nl.group.Go(func() error {
tun.SetStatisticsEnabled(true)
defer tun.SetStatisticsEnabled(false)
tun.ExtractStatistics() // clear out any stale statistics
start := time.Now()
ticker := time.NewTicker(pollPeriod)
for {
var end time.Time
select {
case <-ctx.Done():
tun.SetStatisticsEnabled(false)
end = time.Now()
case end = <-ticker.C:
}
tunStats := tun.ExtractStatistics()
if len(tunStats) > 0 {
nl.mu.Lock()
addrs := nl.addrs
prefixes := nl.prefixes
nl.mu.Unlock()
recordStatistics(logger, start, end, tunStats, addrs, prefixes)
}
if ctx.Err() != nil {
break
}
start = end.Add(time.Nanosecond)
}
return nil
})
return nil
}
func recordStatistics(logger *logtail.Logger, start, end time.Time, tunStats map[flowtrack.Tuple]tunstats.Counts, addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) {
classifyAddr := func(a netip.Addr) (isTailscale, withinRoute bool) {
// NOTE: There could be mis-classifications where an address is treated
// as a Tailscale IP address because the subnet range overlaps with
// the subnet range that Tailscale IP addresses are allocated from.
withinRoute = addrs[a]
for p := range prefixes {
if p.Contains(a) && p.Bits() > 0 {
withinRoute = true
}
}
return withinRoute && tsaddr.IsTailscaleIP(a), withinRoute && !tsaddr.IsTailscaleIP(a)
}
type tupleCounts struct {
flowtrack.Tuple
tunstats.Counts
}
var virtualTraffic, subnetTraffic, exitTraffic []tupleCounts
for conn, cnts := range tunStats {
srcIsTailscaleIP, srcWithinSubnet := classifyAddr(conn.Src.Addr())
dstIsTailscaleIP, dstWithinSubnet := classifyAddr(conn.Dst.Addr())
switch {
case srcIsTailscaleIP && dstIsTailscaleIP:
virtualTraffic = append(virtualTraffic, tupleCounts{conn, cnts})
case srcWithinSubnet || dstWithinSubnet:
subnetTraffic = append(subnetTraffic, tupleCounts{conn, cnts})
default:
const anonymize = true
if anonymize {
if len(exitTraffic) == 0 {
exitTraffic = []tupleCounts{{}}
}
exitTraffic[0].Counts = exitTraffic[0].Counts.Add(cnts)
} else {
exitTraffic = append(exitTraffic, tupleCounts{conn, cnts})
}
}
}
if len(virtualTraffic)+len(subnetTraffic)+len(exitTraffic) == 0 {
return // nothing to report
}
if b, err := json.Marshal(struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
VirtualTraffic []tupleCounts `json:"virtualTraffic,omitempty"`
SubnetTraffic []tupleCounts `json:"subnetTraffic,omitempty"`
ExitTraffic []tupleCounts `json:"exitTraffic,omitempty"`
}{start.UTC(), end.UTC(), virtualTraffic, subnetTraffic, exitTraffic}); err != nil {
logger.Logf("json.Marshal error: %v", err)
} else {
logger.Logf("%s", b)
}
}
func makeRouteMaps(cfg *router.Config) (addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) {
addrs = make(map[netip.Addr]bool)
for _, p := range cfg.LocalAddrs {
if p.IsSingleIP() {
addrs[p.Addr()] = true
}
}
prefixes = make(map[netip.Prefix]bool)
insertPrefixes := func(rs []netip.Prefix) {
for _, p := range rs {
if p.IsSingleIP() {
addrs[p.Addr()] = true
} else {
prefixes[p] = true
}
}
}
insertPrefixes(cfg.Routes)
insertPrefixes(cfg.SubnetRoutes)
return addrs, prefixes
}
// ReconfigRoutes configures the network logger with updated routes.
func (nl *Logger) ReconfigRoutes(cfg *router.Config) {
nl.mu.Lock()
defer nl.mu.Unlock()
// TODO(joetsai): There is a race where deleted routes are not known at
// the time of extraction. We need to keep old routes around for a bit.
nl.addrs, nl.prefixes = makeRouteMaps(cfg)
}
// Shutdown shuts down the network logger.
// This attempts to flush out all pending log messages.
// Even if an error is returned, the logger is still shut down.
func (nl *Logger) Shutdown(ctx context.Context) error {
nl.mu.Lock()
defer nl.mu.Unlock()
if nl.logger == nil {
return nil
}
nl.cancel()
nl.mu.Unlock()
nl.group.Wait() // do not hold lock while waiting
nl.mu.Lock()
err := nl.logger.Shutdown(ctx)
nl.logger = nil
nl.addrs = nil
nl.prefixes = nil
nl.cancel = nil
return err
}