util/eventbus: make internal queue a generic type

In preparation for making the queues carry additional event metadata.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-05 10:33:35 -08:00 committed by Dave Anderson
parent 96202a7c0c
commit bf40bc4fa0
3 changed files with 18 additions and 16 deletions

View File

@ -78,7 +78,7 @@ func (b *Bus) Close() {
} }
func (b *Bus) pump(ctx context.Context) { func (b *Bus) pump(ctx context.Context) {
var vals queue var vals queue[any]
acceptCh := func() chan any { acceptCh := func() chan any {
if vals.Full() { if vals.Full() {
return nil return nil

View File

@ -10,32 +10,32 @@ import (
const maxQueuedItems = 16 const maxQueuedItems = 16
// queue is an ordered queue of length up to maxQueuedItems. // queue is an ordered queue of length up to maxQueuedItems.
type queue struct { type queue[T any] struct {
vals []any vals []T
start int start int
} }
// canAppend reports whether a value can be appended to q.vals without // canAppend reports whether a value can be appended to q.vals without
// shifting values around. // shifting values around.
func (q *queue) canAppend() bool { func (q *queue[T]) canAppend() bool {
return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals) return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals)
} }
func (q *queue) Full() bool { func (q *queue[T]) Full() bool {
return q.start == 0 && !q.canAppend() return q.start == 0 && !q.canAppend()
} }
func (q *queue) Empty() bool { func (q *queue[T]) Empty() bool {
return q.start == len(q.vals) return q.start == len(q.vals)
} }
func (q *queue) Len() int { func (q *queue[T]) Len() int {
return len(q.vals) - q.start return len(q.vals) - q.start
} }
// Add adds v to the end of the queue. Blocks until append can be // Add adds v to the end of the queue. Blocks until append can be
// done. // done.
func (q *queue) Add(v any) { func (q *queue[T]) Add(v T) {
if !q.canAppend() { if !q.canAppend() {
if q.start == 0 { if q.start == 0 {
panic("Add on a full queue") panic("Add on a full queue")
@ -54,21 +54,23 @@ func (q *queue) Add(v any) {
// Peek returns the first value in the queue, without removing it from // Peek returns the first value in the queue, without removing it from
// the queue, or nil if the queue is empty. // the queue, or nil if the queue is empty.
func (q *queue) Peek() any { func (q *queue[T]) Peek() T {
if q.Empty() { if q.Empty() {
return nil var zero T
return zero
} }
return q.vals[q.start] return q.vals[q.start]
} }
// Drop discards the first value in the queue, if any. // Drop discards the first value in the queue, if any.
func (q *queue) Drop() { func (q *queue[T]) Drop() {
if q.Empty() { if q.Empty() {
return return
} }
q.vals[q.start] = nil var zero T
q.vals[q.start] = zero
q.start++ q.start++
if q.Empty() { if q.Empty() {
// Reset cursor to start of array, it's free to do. // Reset cursor to start of array, it's free to do.
@ -78,6 +80,6 @@ func (q *queue) Drop() {
} }
// Snapshot returns a copy of the queue's contents. // Snapshot returns a copy of the queue's contents.
func (q *queue) Snapshot() []any { func (q *queue[T]) Snapshot() []T {
return slices.Clone(q.vals[q.start:]) return slices.Clone(q.vals[q.start:])
} }

View File

@ -27,7 +27,7 @@ type subscriber interface {
// processing other potential sources of wakeups, which is how we end // processing other potential sources of wakeups, which is how we end
// up at this awkward type signature and sharing of internal state // up at this awkward type signature and sharing of internal state
// through dispatch. // through dispatch.
dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool
Close() Close()
} }
@ -55,7 +55,7 @@ func newSubscribeState(c *Client) *subscribeState {
} }
func (q *subscribeState) pump(ctx context.Context) { func (q *subscribeState) pump(ctx context.Context) {
var vals queue var vals queue[any]
acceptCh := func() chan any { acceptCh := func() chan any {
if vals.Full() { if vals.Full() {
return nil return nil
@ -155,7 +155,7 @@ func (s *Subscriber[T]) subscribeType() reflect.Type {
return reflect.TypeFor[T]() return reflect.TypeFor[T]()
} }
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool {
t := vals.Peek().(T) t := vals.Peek().(T)
for { for {
// Keep the cases in this select in sync with subscribeState.pump // Keep the cases in this select in sync with subscribeState.pump