2023-01-27 13:37:20 -08:00
|
|
|
// Copyright (c) Tailscale Inc & AUTHORS
|
|
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
2020-03-12 11:13:33 -07:00
|
|
|
|
2020-09-08 15:55:18 -07:00
|
|
|
// Package syncs contains additional sync types and functionality.
|
2020-03-12 11:13:33 -07:00
|
|
|
package syncs
|
|
|
|
|
2021-03-23 12:15:08 -07:00
|
|
|
import (
|
|
|
|
"context"
|
2024-10-09 10:28:12 -07:00
|
|
|
"iter"
|
2022-11-10 10:55:26 -08:00
|
|
|
"sync"
|
2021-03-23 12:15:08 -07:00
|
|
|
"sync/atomic"
|
2022-11-10 10:55:26 -08:00
|
|
|
|
|
|
|
"tailscale.com/util/mak"
|
2021-03-23 12:15:08 -07:00
|
|
|
)
|
2020-03-12 11:13:33 -07:00
|
|
|
|
|
|
|
// ClosedChan returns a channel that's already closed.
|
|
|
|
func ClosedChan() <-chan struct{} { return closedChan }
|
|
|
|
|
|
|
|
var closedChan = initClosedChan()
|
|
|
|
|
|
|
|
func initClosedChan() <-chan struct{} {
|
|
|
|
ch := make(chan struct{})
|
|
|
|
close(ch)
|
|
|
|
return ch
|
|
|
|
}
|
|
|
|
|
2024-04-30 14:27:58 -07:00
|
|
|
// AtomicValue is the generic version of [atomic.Value].
|
2024-12-18 17:11:22 -08:00
|
|
|
// See [MutexValue] for guidance on whether to use this type.
|
2022-08-04 10:51:21 -07:00
|
|
|
type AtomicValue[T any] struct {
|
|
|
|
v atomic.Value
|
|
|
|
}
|
|
|
|
|
2024-04-30 14:27:58 -07:00
|
|
|
// wrappedValue is used to wrap a value T in a concrete type,
|
|
|
|
// otherwise atomic.Value.Store may panic due to mismatching types in interfaces.
|
|
|
|
// This wrapping is not necessary for non-interface kinds of T,
|
|
|
|
// but there is no harm in wrapping anyways.
|
|
|
|
// See https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/sync/atomic/value.go;l=78
|
|
|
|
type wrappedValue[T any] struct{ v T }
|
|
|
|
|
2022-08-04 10:51:21 -07:00
|
|
|
// Load returns the value set by the most recent Store.
|
|
|
|
// It returns the zero value for T if the value is empty.
|
|
|
|
func (v *AtomicValue[T]) Load() T {
|
|
|
|
x, _ := v.LoadOk()
|
|
|
|
return x
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoadOk is like Load but returns a boolean indicating whether the value was
|
|
|
|
// loaded.
|
|
|
|
func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
|
|
|
|
x := v.v.Load()
|
|
|
|
if x != nil {
|
2024-04-30 14:27:58 -07:00
|
|
|
return x.(wrappedValue[T]).v, true
|
2022-08-04 10:51:21 -07:00
|
|
|
}
|
|
|
|
var zero T
|
|
|
|
return zero, false
|
|
|
|
}
|
|
|
|
|
|
|
|
// Store sets the value of the Value to x.
|
|
|
|
func (v *AtomicValue[T]) Store(x T) {
|
2024-04-30 14:27:58 -07:00
|
|
|
v.v.Store(wrappedValue[T]{x})
|
2022-08-04 10:51:21 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Swap stores new into Value and returns the previous value.
|
|
|
|
// It returns the zero value for T if the value is empty.
|
|
|
|
func (v *AtomicValue[T]) Swap(x T) (old T) {
|
2024-04-30 14:27:58 -07:00
|
|
|
oldV := v.v.Swap(wrappedValue[T]{x})
|
2022-08-04 10:51:21 -07:00
|
|
|
if oldV != nil {
|
2024-04-30 14:27:58 -07:00
|
|
|
return oldV.(wrappedValue[T]).v
|
2022-08-04 10:51:21 -07:00
|
|
|
}
|
|
|
|
return old
|
|
|
|
}
|
|
|
|
|
|
|
|
// CompareAndSwap executes the compare-and-swap operation for the Value.
|
|
|
|
func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
|
2024-04-30 14:27:58 -07:00
|
|
|
return v.v.CompareAndSwap(wrappedValue[T]{oldV}, wrappedValue[T]{newV})
|
2022-08-04 10:51:21 -07:00
|
|
|
}
|
|
|
|
|
2024-12-18 17:11:22 -08:00
|
|
|
// MutexValue is a value protected by a mutex.
|
|
|
|
//
|
|
|
|
// AtomicValue, [MutexValue], [atomic.Pointer] are similar and
|
|
|
|
// overlap in their use cases.
|
|
|
|
//
|
|
|
|
// - Use [atomic.Pointer] if the value being stored is a pointer and
|
|
|
|
// you only ever need load and store operations.
|
|
|
|
// An atomic pointer only occupies 1 word of memory.
|
|
|
|
//
|
|
|
|
// - Use [MutexValue] if the value being stored is not a pointer or
|
|
|
|
// you need the ability for a mutex to protect a set of operations
|
|
|
|
// performed on the value.
|
|
|
|
// A mutex-guarded value occupies 1 word of memory plus
|
|
|
|
// the memory representation of T.
|
|
|
|
//
|
|
|
|
// - AtomicValue is useful for non-pointer types that happen to
|
|
|
|
// have the memory layout of a single pointer.
|
|
|
|
// Examples include a map, channel, func, or a single field struct
|
|
|
|
// that contains any prior types.
|
|
|
|
// An atomic value occupies 2 words of memory.
|
|
|
|
// Consequently, Storing of non-pointer types always allocates.
|
|
|
|
//
|
|
|
|
// Note that [AtomicValue] has the ability to report whether it was set
|
|
|
|
// while [MutexValue] lacks the ability to detect if the value was set
|
|
|
|
// and it happens to be the zero value of T. If such a use case is
|
|
|
|
// necessary, then you could consider wrapping T in [opt.Value].
|
|
|
|
type MutexValue[T any] struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
v T
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithLock calls f with a pointer to the value while holding the lock.
|
|
|
|
// The provided pointer must not leak beyond the scope of the call.
|
|
|
|
func (m *MutexValue[T]) WithLock(f func(p *T)) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
f(&m.v)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Load returns a shallow copy of the underlying value.
|
|
|
|
func (m *MutexValue[T]) Load() T {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
return m.v
|
|
|
|
}
|
|
|
|
|
|
|
|
// Store stores a shallow copy of the provided value.
|
|
|
|
func (m *MutexValue[T]) Store(v T) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
m.v = v
|
|
|
|
}
|
|
|
|
|
|
|
|
// Swap stores new into m and returns the previous value.
|
|
|
|
func (m *MutexValue[T]) Swap(new T) (old T) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
old, m.v = m.v, new
|
|
|
|
return old
|
|
|
|
}
|
|
|
|
|
2020-03-12 11:13:33 -07:00
|
|
|
// WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
|
|
|
|
// on completion that you can wait on. (This, you can only use the
|
|
|
|
// value once)
|
|
|
|
// Also, its zero value is not usable. Use the constructor.
|
|
|
|
type WaitGroupChan struct {
|
|
|
|
n int64 // atomic
|
|
|
|
done chan struct{} // closed on transition to zero
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewWaitGroupChan returns a new single-use WaitGroupChan.
|
|
|
|
func NewWaitGroupChan() *WaitGroupChan {
|
|
|
|
return &WaitGroupChan{done: make(chan struct{})}
|
|
|
|
}
|
|
|
|
|
|
|
|
// DoneChan returns a channel that's closed on completion.
|
2020-11-19 23:50:26 +02:00
|
|
|
func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
|
2020-03-12 11:13:33 -07:00
|
|
|
|
|
|
|
// Add adds delta, which may be negative, to the WaitGroupChan
|
|
|
|
// counter. If the counter becomes zero, all goroutines blocked on
|
|
|
|
// Wait or the Done chan are released. If the counter goes negative,
|
|
|
|
// Add panics.
|
|
|
|
//
|
|
|
|
// Note that calls with a positive delta that occur when the counter
|
|
|
|
// is zero must happen before a Wait. Calls with a negative delta, or
|
|
|
|
// calls with a positive delta that start when the counter is greater
|
|
|
|
// than zero, may happen at any time. Typically this means the calls
|
|
|
|
// to Add should execute before the statement creating the goroutine
|
|
|
|
// or other event to be waited for.
|
2020-11-19 23:50:26 +02:00
|
|
|
func (wg *WaitGroupChan) Add(delta int) {
|
|
|
|
n := atomic.AddInt64(&wg.n, int64(delta))
|
2020-03-12 11:13:33 -07:00
|
|
|
if n == 0 {
|
2020-11-19 23:50:26 +02:00
|
|
|
close(wg.done)
|
2020-03-12 11:13:33 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decr decrements the WaitGroup counter by one.
|
|
|
|
//
|
|
|
|
// (It is like sync.WaitGroup's Done method, but we don't use Done in
|
|
|
|
// this type, because it's ambiguous between Context.Done and
|
|
|
|
// WaitGroup.Done. So we use DoneChan and Decr instead.)
|
|
|
|
func (wg *WaitGroupChan) Decr() {
|
|
|
|
wg.Add(-1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait blocks until the WaitGroupChan counter is zero.
|
|
|
|
func (wg *WaitGroupChan) Wait() { <-wg.done }
|
2020-05-29 12:34:01 -07:00
|
|
|
|
2021-03-23 12:15:08 -07:00
|
|
|
// Semaphore is a counting semaphore.
|
|
|
|
//
|
|
|
|
// Use NewSemaphore to create one.
|
|
|
|
type Semaphore struct {
|
|
|
|
c chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewSemaphore returns a semaphore with resource count n.
|
|
|
|
func NewSemaphore(n int) Semaphore {
|
|
|
|
return Semaphore{c: make(chan struct{}, n)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Acquire blocks until a resource is acquired.
|
|
|
|
func (s Semaphore) Acquire() {
|
|
|
|
s.c <- struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// AcquireContext reports whether the resource was acquired before the ctx was done.
|
|
|
|
func (s Semaphore) AcquireContext(ctx context.Context) bool {
|
|
|
|
select {
|
|
|
|
case s.c <- struct{}{}:
|
|
|
|
return true
|
|
|
|
case <-ctx.Done():
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TryAcquire reports, without blocking, whether the resource was acquired.
|
|
|
|
func (s Semaphore) TryAcquire() bool {
|
|
|
|
select {
|
|
|
|
case s.c <- struct{}{}:
|
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Release releases a resource.
|
|
|
|
func (s Semaphore) Release() {
|
|
|
|
<-s.c
|
|
|
|
}
|
2022-11-10 10:55:26 -08:00
|
|
|
|
|
|
|
// Map is a Go map protected by a [sync.RWMutex].
|
|
|
|
// It is preferred over [sync.Map] for maps with entries that change
|
|
|
|
// at a relatively high frequency.
|
|
|
|
// This must not be shallow copied.
|
|
|
|
type Map[K comparable, V any] struct {
|
|
|
|
mu sync.RWMutex
|
|
|
|
m map[K]V
|
|
|
|
}
|
|
|
|
|
2023-10-18 15:02:45 -07:00
|
|
|
// Load loads the value for the provided key and whether it was found.
|
|
|
|
func (m *Map[K, V]) Load(key K) (value V, loaded bool) {
|
2022-11-10 10:55:26 -08:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2023-10-18 15:02:45 -07:00
|
|
|
value, loaded = m.m[key]
|
|
|
|
return value, loaded
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoadFunc calls f with the value for the provided key
|
|
|
|
// regardless of whether the entry exists or not.
|
|
|
|
// The lock is held for the duration of the call to f.
|
|
|
|
func (m *Map[K, V]) LoadFunc(key K, f func(value V, loaded bool)) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
value, loaded := m.m[key]
|
|
|
|
f(value, loaded)
|
2022-11-10 10:55:26 -08:00
|
|
|
}
|
|
|
|
|
2023-10-18 15:02:45 -07:00
|
|
|
// Store stores the value for the provided key.
|
2022-11-10 10:55:26 -08:00
|
|
|
func (m *Map[K, V]) Store(key K, value V) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
mak.Set(&m.m, key, value)
|
|
|
|
}
|
|
|
|
|
2023-10-18 15:02:45 -07:00
|
|
|
// LoadOrStore returns the value for the given key if it exists
|
|
|
|
// otherwise it stores value.
|
2022-11-10 10:55:26 -08:00
|
|
|
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
|
|
|
|
if actual, loaded = m.Load(key); loaded {
|
|
|
|
return actual, loaded
|
|
|
|
}
|
|
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
actual, loaded = m.m[key]
|
|
|
|
if !loaded {
|
|
|
|
actual = value
|
|
|
|
mak.Set(&m.m, key, value)
|
|
|
|
}
|
|
|
|
return actual, loaded
|
|
|
|
}
|
|
|
|
|
2023-10-06 10:24:21 -07:00
|
|
|
// LoadOrInit returns the value for the given key if it exists
|
|
|
|
// otherwise f is called to construct the value to be set.
|
|
|
|
// The lock is held for the duration to prevent duplicate initialization.
|
|
|
|
func (m *Map[K, V]) LoadOrInit(key K, f func() V) (actual V, loaded bool) {
|
|
|
|
if actual, loaded := m.Load(key); loaded {
|
|
|
|
return actual, loaded
|
|
|
|
}
|
|
|
|
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
if actual, loaded = m.m[key]; loaded {
|
|
|
|
return actual, loaded
|
|
|
|
}
|
|
|
|
|
|
|
|
loaded = false
|
|
|
|
actual = f()
|
|
|
|
mak.Set(&m.m, key, actual)
|
|
|
|
return actual, loaded
|
|
|
|
}
|
|
|
|
|
2023-10-18 15:02:45 -07:00
|
|
|
// LoadAndDelete returns the value for the given key if it exists.
|
|
|
|
// It ensures that the map is cleared of any entry for the key.
|
2022-11-10 10:55:26 -08:00
|
|
|
func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
value, loaded = m.m[key]
|
|
|
|
if loaded {
|
|
|
|
delete(m.m, key)
|
|
|
|
}
|
|
|
|
return value, loaded
|
|
|
|
}
|
|
|
|
|
2023-10-18 15:02:45 -07:00
|
|
|
// Delete deletes the entry identified by key.
|
2022-11-10 10:55:26 -08:00
|
|
|
func (m *Map[K, V]) Delete(key K) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
delete(m.m, key)
|
|
|
|
}
|
|
|
|
|
2024-10-09 10:28:12 -07:00
|
|
|
// Keys iterates over all keys in the map in an undefined order.
|
|
|
|
// A read lock is held for the entire duration of the iteration.
|
|
|
|
// Use the [WithLock] method instead to mutate the map during iteration.
|
|
|
|
func (m *Map[K, V]) Keys() iter.Seq[K] {
|
|
|
|
return func(yield func(K) bool) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
for k := range m.m {
|
|
|
|
if !yield(k) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Values iterates over all values in the map in an undefined order.
|
|
|
|
// A read lock is held for the entire duration of the iteration.
|
|
|
|
// Use the [WithLock] method instead to mutate the map during iteration.
|
|
|
|
func (m *Map[K, V]) Values() iter.Seq[V] {
|
|
|
|
return func(yield func(V) bool) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, v := range m.m {
|
|
|
|
if !yield(v) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// All iterates over all entries in the map in an undefined order.
|
|
|
|
// A read lock is held for the entire duration of the iteration.
|
|
|
|
// Use the [WithLock] method instead to mutate the map during iteration.
|
|
|
|
func (m *Map[K, V]) All() iter.Seq2[K, V] {
|
|
|
|
return func(yield func(K, V) bool) {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
for k, v := range m.m {
|
|
|
|
if !yield(k, v) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-11 16:16:30 -07:00
|
|
|
// WithLock calls f with the underlying map.
|
|
|
|
// Use of m2 must not escape the duration of this call.
|
|
|
|
// The write-lock is held for the entire duration of this call.
|
|
|
|
func (m *Map[K, V]) WithLock(f func(m2 map[K]V)) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2024-10-09 14:03:37 -07:00
|
|
|
if m.m == nil {
|
|
|
|
m.m = make(map[K]V)
|
|
|
|
}
|
2024-07-11 16:16:30 -07:00
|
|
|
f(m.m)
|
|
|
|
}
|
|
|
|
|
2023-04-27 17:09:30 -07:00
|
|
|
// Len returns the length of the map.
|
|
|
|
func (m *Map[K, V]) Len() int {
|
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
return len(m.m)
|
|
|
|
}
|
|
|
|
|
2023-08-17 05:23:26 -07:00
|
|
|
// Clear removes all entries from the map.
|
|
|
|
func (m *Map[K, V]) Clear() {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2023-08-16 22:09:53 -07:00
|
|
|
clear(m.m)
|
2023-08-17 05:23:26 -07:00
|
|
|
}
|
|
|
|
|
2024-03-19 18:22:42 -04:00
|
|
|
// Swap stores the value for the provided key, and returns the previous value
|
|
|
|
// (if any). If there was no previous value set, a zero value will be returned.
|
|
|
|
func (m *Map[K, V]) Swap(key K, value V) (oldValue V) {
|
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
|
|
|
|
oldValue = m.m[key]
|
|
|
|
mak.Set(&m.m, key, value)
|
|
|
|
return oldValue
|
|
|
|
}
|
|
|
|
|
syncs: add WaitGroup wrapper (#7481)
The addition of WaitGroup.Go in the standard library has been
repeatedly proposed and rejected.
See golang/go#18022, golang/go#23538, and golang/go#39863
In summary, the argument for WaitGroup.Go is that it avoids bugs like:
go func() {
wg.Add(1)
defer wg.Done()
...
}()
where the increment happens after execution (not before)
and also (to a lesser degree) because:
wg.Go(func() {
...
})
is shorter and more readble.
The argument against WaitGroup.Go is that the provided function
takes no arguments and so inputs and outputs must closed over
by the provided function. The most common race bug for goroutines
is that the caller forgot to capture the loop iteration variable,
so this pattern may make it easier to be accidentally racy.
However, that is changing with golang/go#57969.
In my experience the probability of race bugs due to the former
still outwighs the latter, but I have no concrete evidence to prove it.
The existence of errgroup.Group.Go and frequent utility of the method
at least proves that this is a workable pattern and
the possibility of accidental races do not appear to
manifest as frequently as feared.
A reason *not* to use errgroup.Group everywhere is that there are many
situations where it doesn't make sense for the goroutine to return an error
since the error is handled in a different mechanism
(e.g., logged and ignored, formatted and printed to the frontend, etc.).
While you can use errgroup.Group by always returning nil,
the fact that you *can* return nil makes it easy to accidentally return
an error when nothing is checking the return of group.Wait.
This is not a hypothetical problem, but something that has bitten us
in usages that was only using errgroup.Group without intending to use
the error reporting part of it.
Thus, add a (yet another) variant of WaitGroup here that
is identical to sync.WaitGroup, but with an extra method.
Signed-off-by: Joe Tsai <joetsai@digital-static.net>
2023-03-09 12:04:38 -08:00
|
|
|
// WaitGroup is identical to [sync.WaitGroup],
|
|
|
|
// but provides a Go method to start a goroutine.
|
|
|
|
type WaitGroup struct{ sync.WaitGroup }
|
|
|
|
|
|
|
|
// Go calls the given function in a new goroutine.
|
|
|
|
// It automatically increments the counter before execution and
|
|
|
|
// automatically decrements the counter after execution.
|
|
|
|
// It must not be called concurrently with Wait.
|
|
|
|
func (wg *WaitGroup) Go(f func()) {
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
f()
|
|
|
|
}()
|
|
|
|
}
|