diff --git a/prober/derp.go b/prober/derp.go index e21c8ce76..c7a82317d 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -425,6 +425,24 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg. txRecords := make([]txRecord, 0, packetsPerSecond*int(packetTimeout.Seconds())) var txRecordsMu sync.Mutex + // applyTimeouts walks over txRecords and expires any records that are older + // than packetTimeout, recording in metrics that they were removed. + applyTimeouts := func() { + txRecordsMu.Lock() + defer txRecordsMu.Unlock() + + now := time.Now() + recs := txRecords[:0] + for _, r := range txRecords { + if now.Sub(r.at) > packetTimeout { + packetsDropped.Add(1) + } else { + recs = append(recs, r) + } + } + txRecords = recs + } + // Send the packets. sendErrC := make(chan error, 1) // TODO: construct a disco CallMeMaybe in the same fashion as magicsock, e.g. magic bytes, src pub, seal payload. @@ -445,10 +463,12 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg. case <-ctx.Done(): return case <-t.C: + applyTimeouts() txRecordsMu.Lock() if len(txRecords) == cap(txRecords) { txRecords = slices.Delete(txRecords, 0, 1) packetsDropped.Add(1) + log.Printf("unexpected: overflow in txRecords") } txRecords = append(txRecords, txRecord{time.Now(), seq}) txRecordsMu.Unlock()