diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go index 2ea77cbc3..b78603b71 100644 --- a/cmd/derper/derper.go +++ b/cmd/derper/derper.go @@ -171,6 +171,7 @@ func main() { io.WriteString(w, "derp.Server ConsistencyCheck okay") } })) + debug.Handle("traffic", "Traffic check", http.HandlerFunc(s.ServeDebugTraffic)) if *runSTUN { go serveSTUN() diff --git a/derp/derp_server.go b/derp/derp_server.go index 3885bbb93..38881bec6 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -23,7 +23,9 @@ "math" "math/big" "math/rand" + "net/http" "os" + "os/exec" "runtime" "strconv" "strings" @@ -34,6 +36,7 @@ "go4.org/mem" "golang.org/x/crypto/nacl/box" "golang.org/x/sync/errgroup" + "inet.af/netaddr" "tailscale.com/disco" "tailscale.com/metrics" "tailscale.com/types/key" @@ -141,6 +144,9 @@ type Server struct { // because it includes intra-region forwarded packets as the // src. sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum + + // maps from netaddr.IPPort to a client's public key + keyOfAddr map[netaddr.IPPort]key.Public } // PacketForwarder is something that can forward packets. @@ -186,6 +192,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { watchers: map[*sclient]bool{}, sentTo: map[key.Public]map[key.Public]int64{}, avgQueueDuration: new(uint64), + keyOfAddr: map[netaddr.IPPort]key.Public{}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") @@ -343,6 +350,7 @@ func (s *Server) registerClient(c *sclient) { if _, ok := s.clientsMesh[c.key]; !ok { s.clientsMesh[c.key] = nil // just for varz of total users in cluster } + s.keyOfAddr[c.remoteIPPort] = c.key s.curClients.Add(1) s.broadcastPeerStateChangeLocked(c.key, true) } @@ -377,6 +385,8 @@ func (s *Server) unregisterClient(c *sclient) { delete(s.watchers, c) } + delete(s.keyOfAddr, c.remoteIPPort) + s.curClients.Add(-1) if c.preferred { s.curHomeClients.Add(-1) @@ -450,20 +460,23 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN ctx, cancel := context.WithCancel(context.Background()) defer cancel() + remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr) + c := &sclient{ - connNum: connNum, - s: s, - key: clientKey, - nc: nc, - br: br, - bw: bw, - logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), - done: ctx.Done(), - remoteAddr: remoteAddr, - connectedAt: time.Now(), - sendQueue: make(chan pkt, perClientSendQueueDepth), - peerGone: make(chan key.Public), - canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, + connNum: connNum, + s: s, + key: clientKey, + nc: nc, + br: br, + bw: bw, + logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), + done: ctx.Done(), + remoteAddr: remoteAddr, + remoteIPPort: remoteIPPort, + connectedAt: time.Now(), + sendQueue: make(chan pkt, perClientSendQueueDepth), + peerGone: make(chan key.Public), + canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, } if c.canMesh { c.meshUpdate = make(chan struct{}) @@ -892,18 +905,19 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) type sclient struct { // Static after construction. - connNum int64 // process-wide unique counter, incremented each Accept - s *Server - nc Conn - key key.Public - info clientInfo - logf logger.Logf - done <-chan struct{} // closed when connection closes - remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() - sendQueue chan pkt // packets queued to this client; never closed - peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) - meshUpdate chan struct{} // write request to write peerStateChange - canMesh bool // clientInfo had correct mesh token for inter-region routing + connNum int64 // process-wide unique counter, incremented each Accept + s *Server + nc Conn + key key.Public + info clientInfo + logf logger.Logf + done <-chan struct{} // closed when connection closes + remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() + remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. + sendQueue chan pkt // packets queued to this client; never closed + peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) + meshUpdate chan struct{} // write request to write peerStateChange + canMesh bool // clientInfo had correct mesh token for inter-region routing // Owned by run, not thread-safe. br *bufio.Reader @@ -1398,3 +1412,91 @@ func writePublicKey(bw *bufio.Writer, key *key.Public) error { } return nil } + +const minTimeBetweenLogs = 2 * time.Second + +// BytesSentRecv records the number of bytes that have been sent since the last traffic check +// for a given process, as well as the public key of the process sending those bytes. +type BytesSentRecv struct { + Sent uint64 + Recv uint64 + // Key is the public key of the client which sent/received these bytes. + Key key.Public +} + +// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic. +// Separated out for ease of testing. +func parseSSOutput(raw string) map[netaddr.IPPort]BytesSentRecv { + newState := map[netaddr.IPPort]BytesSentRecv{} + // parse every 2 lines and get src and dst ips, and kv pairs + lines := strings.Split(raw, "\n") + for i := 0; i < len(lines); i += 2 { + ipInfo := strings.Fields(strings.TrimSpace(lines[i])) + if len(ipInfo) < 5 { + continue + } + src, err := netaddr.ParseIPPort(ipInfo[3]) + if err != nil { + continue + } + /* + TODO(jknodt) do we care about the full route or just the src? + dst, err := netaddr.ParseIPPort(string(ipInfo[4])) + if err != nil { + continue + } + */ + stats := strings.Fields(strings.TrimSpace(lines[i+1])) + stat := BytesSentRecv{} + for _, s := range stats { + if strings.Contains(s, "bytes_sent") { + sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:]) + if err == nil { + stat.Sent = uint64(sent) + } + } else if strings.Contains(s, "bytes_received") { + recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:]) + if err == nil { + stat.Recv = uint64(recv) + } + } + } + newState[src] = stat + } + return newState +} + +func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) { + prevState := map[netaddr.IPPort]BytesSentRecv{} + enc := json.NewEncoder(w) + for r.Context().Err() == nil { + output, err := exec.Command("ss", "-i", "-H", "-t").Output() + if err != nil { + fmt.Fprintf(w, "ss failed: %v", err) + return + } + newState := parseSSOutput(string(output)) + s.mu.Lock() + for k, next := range newState { + prev := prevState[k] + if prev.Sent < next.Sent || prev.Recv < next.Recv { + if pkey, ok := s.keyOfAddr[k]; ok { + next.Key = pkey + if err := enc.Encode(next); err != nil { + s.mu.Unlock() + return + } + } + } + } + s.mu.Unlock() + prevState = newState + if _, err := fmt.Fprintln(w); err != nil { + return + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + time.Sleep(minTimeBetweenLogs) + } +} diff --git a/derp/derp_test.go b/derp/derp_test.go index db3586170..a37dd0390 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -948,3 +948,14 @@ func waitConnect(t testing.TB, c *Client) { t.Fatalf("client first Recv was unexpected type %T", v) } } + +func TestParseSSOutput(t *testing.T) { + contents, err := ioutil.ReadFile("testdata/example_ss.txt") + if err != nil { + t.Errorf("ioutil.Readfile(example_ss.txt) failed: %v", err) + } + seen := parseSSOutput(string(contents)) + if len(seen) == 0 { + t.Errorf("parseSSOutput expected non-empty map") + } +} diff --git a/derp/testdata/example_ss.txt b/derp/testdata/example_ss.txt new file mode 100644 index 000000000..2885f1bc1 --- /dev/null +++ b/derp/testdata/example_ss.txt @@ -0,0 +1,8 @@ +ESTAB 0 0 10.255.1.11:35238 34.210.105.16:https + cubic wscale:7,7 rto:236 rtt:34.14/3.432 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:8 ssthresh:6 bytes_sent:38056577 bytes_retrans:2918 bytes_acked:38053660 bytes_received:6973211 segs_out:165090 segs_in:124227 data_segs_out:78018 data_segs_in:71645 send 2.71Mbps lastsnd:1156 lastrcv:1120 lastack:1120 pacing_rate 3.26Mbps delivery_rate 2.35Mbps delivered:78017 app_limited busy:2586132ms retrans:0/6 dsack_dups:4 reordering:5 reord_seen:15 rcv_rtt:126355 rcv_space:65780 rcv_ssthresh:541928 minrtt:26.632 +ESTAB 0 80 100.79.58.14:ssh 100.95.73.104:58145 + cubic wscale:6,7 rto:224 rtt:23.051/2.03 ato:172 mss:1228 pmtu:1280 rcvmss:1228 advmss:1228 cwnd:10 ssthresh:94 bytes_sent:1591815 bytes_retrans:944 bytes_acked:1590791 bytes_received:158925 segs_out:8070 segs_in:8858 data_segs_out:7452 data_segs_in:3789 send 4.26Mbps lastsnd:4 lastrcv:4 lastack:4 pacing_rate 8.52Mbps delivery_rate 10.9Mbps delivered:7451 app_limited busy:61656ms unacked:2 retrans:0/10 dsack_dups:10 rcv_rtt:174712 rcv_space:65025 rcv_ssthresh:64296 minrtt:16.186 +ESTAB 0 374 10.255.1.11:43254 167.172.206.31:https + cubic wscale:7,7 rto:224 rtt:22.55/1.941 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:6 ssthresh:4 bytes_sent:14594668 bytes_retrans:173314 bytes_acked:14420981 bytes_received:4207111 segs_out:80566 segs_in:70310 data_segs_out:24317 data_segs_in:20365 send 3.08Mbps lastsnd:4 lastrcv:4 lastack:4 pacing_rate 3.7Mbps delivery_rate 3.05Mbps delivered:24111 app_limited busy:184820ms unacked:2 retrans:0/185 dsack_dups:1 reord_seen:3 rcv_rtt:651.262 rcv_space:226657 rcv_ssthresh:1557136 minrtt:10.18 +ESTAB 0 0 10.255.1.11:33036 3.121.18.47:https + cubic wscale:7,7 rto:372 rtt:168.408/2.044 ato:40 mss:1448 pmtu:1500 rcvmss:1448 advmss:1448 cwnd:10 bytes_sent:27500 bytes_acked:27501 bytes_received:1386524 segs_out:10990 segs_in:11037 data_segs_out:303 data_segs_in:3414 send 688kbps lastsnd:125776 lastrcv:9640 lastack:22760 pacing_rate 1.38Mbps delivery_rate 482kbps delivered:304 app_limited busy:43024ms rcv_rtt:3345.12 rcv_space:62431 rcv_ssthresh:760472 minrtt:168.867