From 9c37c14039c356f4c3acff820acd6b74a60a0d53 Mon Sep 17 00:00:00 2001 From: julianknodt Date: Mon, 21 Jun 2021 11:18:03 -0700 Subject: [PATCH] Create logger for derp bandwidth This is a small logger which reports the number of bytes received and sent between derp users. It currently doesn't do anything fancy, it just keeps a running log of all the processes and prints the top n every 2 seconds. I also added a change for reporting deltas and not the full amt of bytes written. Signed-off-by: julianknodt --- derp/derp_server.go | 8 +++- derp/derpvis/vis.go | 92 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 derp/derpvis/vis.go diff --git a/derp/derp_server.go b/derp/derp_server.go index 4ac578724..3e43b2111 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -1475,7 +1475,13 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) { if prev.Sent < next.Sent || prev.Recv < next.Recv { if pkey, ok := s.keyOfAddr[k]; ok { next.Key = pkey - if err := enc.Encode(next); err != nil { + // only write the delta + written := BytesSentRecv{ + Recv: next.Recv - prev.Recv, + Sent: next.Sent - prev.Sent, + Key: pkey, + } + if err := enc.Encode(written); err != nil { s.mu.Unlock() return } diff --git a/derp/derpvis/vis.go b/derp/derpvis/vis.go new file mode 100644 index 000000000..922ab3e02 --- /dev/null +++ b/derp/derpvis/vis.go @@ -0,0 +1,92 @@ +// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "sort" + "sync" + "time" + + "inet.af/netaddr" + "tailscale.com/derp" + "tailscale.com/types/key" +) + +var DERPIP = flag.String("derp-ip", "", "IP address of DERP server to visualize") +var topN = flag.Int("n", 50, "How many processes to show at once") + +func main() { + flag.Parse() + if *DERPIP == "" { + log.Fatalf("Usage: derp-ip required") + } + + derpIP := netaddr.MustParseIP(*DERPIP) + url := fmt.Sprintf("http://%s/debug/traffic", derpIP.String()) + resp, err := http.Get(url) + if err != nil { + log.Fatalf("Request failed: %v", err) + } + defer resp.Body.Close() + var mu sync.Mutex + sent := &prioQueue{ + vals: map[key.Public]uint64{}, + } + recv := &prioQueue{ + vals: map[key.Public]uint64{}, + } + + go func() { + for { + mu.Lock() + sort.Sort(sent) + sort.Sort(recv) + for i := 0; i < min(*topN, recv.Len()); i++ { + fmt.Printf("%d=(%s): recv=%d\n", i, recv.ord[i].ShortString(), recv.vals[recv.ord[i]]) + } + mu.Unlock() + time.Sleep(2 * time.Second) + } + }() + + dec := json.NewDecoder(resp.Body) + var tmp derp.BytesSentRecv + for err = dec.Decode(&tmp); err == nil; { + mu.Lock() + sent.Add(tmp.Key, tmp.Sent) + recv.Add(tmp.Key, tmp.Recv) + mu.Unlock() + } + log.Fatalf("Error decoding: %v", err) +} + +type prioQueue struct { + ord []key.Public + vals map[key.Public]uint64 +} + +func (p *prioQueue) Len() int { return len(p.ord) } +func (p *prioQueue) Less(i, j int) bool { return p.vals[p.ord[i]] < p.vals[p.ord[j]] } +func (p *prioQueue) Swap(i, j int) { p.ord[i], p.ord[j] = p.ord[j], p.ord[i] } +func (p *prioQueue) Add(key key.Public, val uint64) { + if _, exists := p.vals[key]; exists { + p.vals[key] += val + return + } + p.vals[key] = val + p.ord = append(p.ord, key) +} + +func min(a, b int) int { + if a < b { + return a + } + return b +}