diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 3520be828..9f6adbfb7 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -78,7 +78,7 @@ func (b *Bus) Close() { } func (b *Bus) pump(ctx context.Context) { - var vals queue + var vals queue[any] acceptCh := func() chan any { if vals.Full() { return nil diff --git a/util/eventbus/queue.go b/util/eventbus/queue.go index 8f6bda748..a62bf3c62 100644 --- a/util/eventbus/queue.go +++ b/util/eventbus/queue.go @@ -10,32 +10,32 @@ import ( const maxQueuedItems = 16 // queue is an ordered queue of length up to maxQueuedItems. -type queue struct { - vals []any +type queue[T any] struct { + vals []T start int } // canAppend reports whether a value can be appended to q.vals without // shifting values around. -func (q *queue) canAppend() bool { +func (q *queue[T]) canAppend() bool { return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals) } -func (q *queue) Full() bool { +func (q *queue[T]) Full() bool { return q.start == 0 && !q.canAppend() } -func (q *queue) Empty() bool { +func (q *queue[T]) Empty() bool { return q.start == len(q.vals) } -func (q *queue) Len() int { +func (q *queue[T]) Len() int { return len(q.vals) - q.start } // Add adds v to the end of the queue. Blocks until append can be // done. -func (q *queue) Add(v any) { +func (q *queue[T]) Add(v T) { if !q.canAppend() { if q.start == 0 { panic("Add on a full queue") @@ -54,21 +54,23 @@ func (q *queue) Add(v any) { // Peek returns the first value in the queue, without removing it from // the queue, or nil if the queue is empty. -func (q *queue) Peek() any { +func (q *queue[T]) Peek() T { if q.Empty() { - return nil + var zero T + return zero } return q.vals[q.start] } // Drop discards the first value in the queue, if any. -func (q *queue) Drop() { +func (q *queue[T]) Drop() { if q.Empty() { return } - q.vals[q.start] = nil + var zero T + q.vals[q.start] = zero q.start++ if q.Empty() { // Reset cursor to start of array, it's free to do. @@ -78,6 +80,6 @@ func (q *queue) Drop() { } // Snapshot returns a copy of the queue's contents. -func (q *queue) Snapshot() []any { +func (q *queue[T]) Snapshot() []T { return slices.Clone(q.vals[q.start:]) } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 606410c8e..85aa1ff6a 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -27,7 +27,7 @@ type subscriber interface { // processing other potential sources of wakeups, which is how we end // up at this awkward type signature and sharing of internal state // through dispatch. - dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool + dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool Close() } @@ -55,7 +55,7 @@ func newSubscribeState(c *Client) *subscribeState { } func (q *subscribeState) pump(ctx context.Context) { - var vals queue + var vals queue[any] acceptCh := func() chan any { if vals.Full() { return nil @@ -155,7 +155,7 @@ func (s *Subscriber[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } -func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue, acceptCh func() chan any) bool { +func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[any], acceptCh func() chan any) bool { t := vals.Peek().(T) for { // Keep the cases in this select in sync with subscribeState.pump