mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-13 06:07:34 +00:00
.bencher
.github
appc
atomicfile
chirp
client
clientupdate
cmd
control
derp
disco
docs
doctor
drive
envknob
health
hostinfo
internal
ipn
jsondb
k8s-operator
kube
licenses
log
logpolicy
logtail
metrics
net
packages
paths
portlist
posture
prober
proxymap
release
safesocket
safeweb
scripts
smallzstd
ssh
syncs
tailcfg
taildrop
tempfork
tka
tool
tsconst
tsd
tsnet
tstest
tstime
tsweb
types
util
cache
cibuild
clientmetric
cloudenv
cmpver
codegen
cstruct
ctxkey
deephash
dirwalk
dnsname
execqueue
execqueue.go
execqueue_test.go
expvarx
goroutines
groupmember
hashx
httphdr
httpm
jsonutil
limiter
lineread
linuxfw
lru
mak
multierr
must
nocasemaps
osdiag
osshare
osuser
pidowner
precompress
progresstracking
quarantine
race
racebuild
rands
reload
ringbuffer
set
singleflight
slicesx
syspolicy
sysresources
systemd
testenv
topk
truncate
uniq
vizerror
winutil
zstdframe
version
wf
wgengine
words
.gitattributes
.gitignore
.golangci.yml
ALPINE.txt
AUTHORS
CODEOWNERS
CODE_OF_CONDUCT.md
Dockerfile
Dockerfile.base
LICENSE
Makefile
PATENTS
README.md
SECURITY.md
VERSION.txt
api.md
build_dist.sh
build_docker.sh
flake.lock
flake.nix
go.mod
go.mod.sri
go.sum
go.toolchain.branch
go.toolchain.rev
gomod_test.go
header.txt
pull-toolchain.sh
shell.nix
staticcheck.conf
update-flake.sh
version-embed.go
version_test.go

This is a useful primitive for asynchronous execution of ordered work I want to use in another change. Updates tailscale/corp#16833 Signed-off-by: James Tucker <james@tailscale.com>
105 lines
1.9 KiB
Go
105 lines
1.9 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
// Package execqueue implements an ordered asynchronous queue for executing functions.
|
|
package execqueue
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type ExecQueue struct {
|
|
mu sync.Mutex
|
|
closed bool
|
|
inFlight bool // whether a goroutine is running q.run
|
|
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
|
|
queue []func()
|
|
}
|
|
|
|
func (q *ExecQueue) Add(f func()) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if q.closed {
|
|
return
|
|
}
|
|
if q.inFlight {
|
|
q.queue = append(q.queue, f)
|
|
} else {
|
|
q.inFlight = true
|
|
go q.run(f)
|
|
}
|
|
}
|
|
|
|
// RunSync waits for the queue to be drained and then synchronously runs f.
|
|
// It returns an error if the queue is closed before f is run or ctx expires.
|
|
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
|
|
for {
|
|
if err := q.Wait(ctx); err != nil {
|
|
return err
|
|
}
|
|
q.mu.Lock()
|
|
if q.inFlight {
|
|
q.mu.Unlock()
|
|
continue
|
|
}
|
|
defer q.mu.Unlock()
|
|
if q.closed {
|
|
return errors.New("closed")
|
|
}
|
|
f()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (q *ExecQueue) run(f func()) {
|
|
f()
|
|
|
|
q.mu.Lock()
|
|
for len(q.queue) > 0 && !q.closed {
|
|
f := q.queue[0]
|
|
q.queue[0] = nil
|
|
q.queue = q.queue[1:]
|
|
q.mu.Unlock()
|
|
f()
|
|
q.mu.Lock()
|
|
}
|
|
q.inFlight = false
|
|
q.queue = nil
|
|
if q.doneWaiter != nil {
|
|
close(q.doneWaiter)
|
|
q.doneWaiter = nil
|
|
}
|
|
q.mu.Unlock()
|
|
}
|
|
|
|
// Shutdown asynchronously signals the queue to stop.
|
|
func (q *ExecQueue) Shutdown() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
q.closed = true
|
|
}
|
|
|
|
// Wait waits for the queue to be empty.
|
|
func (q *ExecQueue) Wait(ctx context.Context) error {
|
|
q.mu.Lock()
|
|
waitCh := q.doneWaiter
|
|
if q.inFlight && waitCh == nil {
|
|
waitCh = make(chan struct{})
|
|
q.doneWaiter = waitCh
|
|
}
|
|
q.mu.Unlock()
|
|
|
|
if waitCh == nil {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-waitCh:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|