prober: record total bytes transferred in DERP bandwidth probes

This will enable Prometheus queries to look at the bandwidth over time windows,
for example 'increase(derp_bw_bytes_total)[1h] / increase(derp_bw_transfer_time_seconds_total)[1h]'.

Fixes commit a51672cafd8b6c4e87915a55bda1491eb7cbee84.

Updates 

Signed-off-by: Percy Wegmann <percy@tailscale.com>
This commit is contained in:
Percy Wegmann 2025-01-09 11:50:11 -06:00 committed by Percy Wegmann
parent 377127c20c
commit 6ccde369ff

@ -301,13 +301,14 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
derpPath = "single"
}
var transferTimeSeconds expvar.Float
var totalBytesTransferred expvar.Float
return ProbeClass{
Probe: func(ctx context.Context) error {
fromN, toN, err := d.getNodePair(from, to)
if err != nil {
return err
}
return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTimeSeconds, d.bwTUNIPv4Prefix)
return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTimeSeconds, &totalBytesTransferred, d.bwTUNIPv4Prefix)
},
Class: "derp_bw",
Labels: Labels{
@ -315,11 +316,15 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
"tcp_in_tcp": strconv.FormatBool(d.bwTUNIPv4Prefix != nil),
},
Metrics: func(l prometheus.Labels) []prometheus.Metric {
return []prometheus.Metric{
metrics := []prometheus.Metric{
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)),
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTimeSeconds.Value()),
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_bytes_total", "Amount of data transferred", nil, l), prometheus.CounterValue, float64(size)),
}
if d.bwTUNIPv4Prefix != nil {
// For TCP-in-TCP probes, also record cumulative bytes transferred.
metrics = append(metrics, prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_bytes_total", "Amount of data transferred", nil, l), prometheus.CounterValue, totalBytesTransferred.Value()))
}
return metrics
},
}
}
@ -655,7 +660,7 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error {
// DERP clients connected to two DERP servers.If tunIPv4Address is specified,
// probes will use a TCP connection over a TUN device at this address in order
// to exercise TCP-in-TCP in similar fashion to TCP over Tailscale via DERP.
func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTimeSeconds *expvar.Float, tunIPv4Prefix *netip.Prefix) (err error) {
func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTimeSeconds, totalBytesTransferred *expvar.Float, tunIPv4Prefix *netip.Prefix) (err error) {
// This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
// sent by the bandwidth probe.
fromc, err := newConn(ctx, dm, from, false)
@ -677,7 +682,7 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail
}
if tunIPv4Prefix != nil {
err = derpProbeBandwidthTUN(ctx, transferTimeSeconds, from, to, fromc, toc, size, tunIPv4Prefix)
err = derpProbeBandwidthTUN(ctx, transferTimeSeconds, totalBytesTransferred, from, to, fromc, toc, size, tunIPv4Prefix)
} else {
err = derpProbeBandwidthDirect(ctx, transferTimeSeconds, from, to, fromc, toc, size)
}
@ -848,7 +853,7 @@ var derpProbeBandwidthTUNMu sync.Mutex
// to another over a TUN device at an address at the start of the usable host IP
// range that the given tunAddress lives in. The time taken to finish the transfer
// is recorded in `transferTimeSeconds`.
func derpProbeBandwidthTUN(ctx context.Context, transferTimeSeconds *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64, prefix *netip.Prefix) error {
func derpProbeBandwidthTUN(ctx context.Context, transferTimeSeconds, totalBytesTransferred *expvar.Float, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, size int64, prefix *netip.Prefix) error {
// Make sure all goroutines have finished.
var wg sync.WaitGroup
defer wg.Wait()
@ -1046,9 +1051,10 @@ func derpProbeBandwidthTUN(ctx context.Context, transferTimeSeconds *expvar.Floa
readFinishedC <- fmt.Errorf("unable to set read deadline: %w", err)
}
}
_, err = io.CopyN(io.Discard, readConn, size)
// Measure transfer time irrespective of whether it succeeded or failed.
n, err := io.CopyN(io.Discard, readConn, size)
// Measure transfer time and bytes transferred irrespective of whether it succeeded or failed.
transferTimeSeconds.Add(time.Since(start).Seconds())
totalBytesTransferred.Add(float64(n))
readFinishedC <- err
}()