diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 889cea255..d3f4f4cca 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -7,7 +7,11 @@ package execqueue import ( "context" "errors" + "expvar" + "fmt" "sync" + "sync/atomic" + "time" ) type ExecQueue struct { @@ -16,9 +20,36 @@ type ExecQueue struct { inFlight bool // whether a goroutine is running q.run doneWaiter chan struct{} // non-nil if waiter is waiting, then closed queue []func() + + // metrics follow + metricsRegisterOnce sync.Once + metricInserts expvar.Int + metricRemovals expvar.Int + metricQueueLastDrain expvar.Int // unix millis +} + +// This is extremely silly but is for debugging +var metricsCounter atomic.Int64 + +// registerMetrics registers the queue's metrics with expvar, using a unique name. +func (q *ExecQueue) registerMetrics() { + q.metricsRegisterOnce.Do(func() { + m := new(expvar.Map).Init() + m.Set("inserts", &q.metricInserts) + m.Set("removals", &q.metricRemovals) + m.Set("length", expvar.Func(func() any { + return q.metricInserts.Value() - q.metricRemovals.Value() + })) + m.Set("last_drain", &q.metricQueueLastDrain) + + name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1)) + expvar.Publish(name, m) + }) } func (q *ExecQueue) Add(f func()) { + q.registerMetrics() + q.mu.Lock() defer q.mu.Unlock() if q.closed { @@ -26,6 +57,7 @@ func (q *ExecQueue) Add(f func()) { } if q.inFlight { q.queue = append(q.queue, f) + q.metricInserts.Add(1) } else { q.inFlight = true go q.run(f) @@ -35,6 +67,8 @@ func (q *ExecQueue) Add(f func()) { // RunSync waits for the queue to be drained and then synchronously runs f. // It returns an error if the queue is closed before f is run or ctx expires. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { + q.registerMetrics() + for { if err := q.Wait(ctx); err != nil { return err @@ -61,11 +95,13 @@ func (q *ExecQueue) run(f func()) { f := q.queue[0] q.queue[0] = nil q.queue = q.queue[1:] + q.metricRemovals.Add(1) q.mu.Unlock() f() q.mu.Lock() } q.inFlight = false + q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli())) q.queue = nil if q.doneWaiter != nil { close(q.doneWaiter) @@ -76,6 +112,8 @@ func (q *ExecQueue) run(f func()) { // Shutdown asynchronously signals the queue to stop. func (q *ExecQueue) Shutdown() { + q.registerMetrics() + q.mu.Lock() defer q.mu.Unlock() q.closed = true @@ -83,6 +121,8 @@ func (q *ExecQueue) Shutdown() { // Wait waits for the queue to be empty. func (q *ExecQueue) Wait(ctx context.Context) error { + q.registerMetrics() + q.mu.Lock() waitCh := q.doneWaiter if q.inFlight && waitCh == nil {