mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 13:05:46 +00:00
util/set: add new set package for SetHandle type
We use this pattern in a number of places (in this repo and elsewhere) and I was about to add a fourth to this repo which was crossing the line. Add this type instead so they're all the same. Also, we have another Set type (SliceSet, which tracks its keys in order) in another repo we can move to this package later. Change-Id: Ibbdcdba5443fae9b6956f63990bdb9e9443cefa9 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
5c8d2fa695
commit
ea25ef8236
@ -292,6 +292,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
|||||||
tailscale.com/util/osshare from tailscale.com/ipn/ipnlocal+
|
tailscale.com/util/osshare from tailscale.com/ipn/ipnlocal+
|
||||||
tailscale.com/util/pidowner from tailscale.com/ipn/ipnauth
|
tailscale.com/util/pidowner from tailscale.com/ipn/ipnauth
|
||||||
tailscale.com/util/racebuild from tailscale.com/logpolicy
|
tailscale.com/util/racebuild from tailscale.com/logpolicy
|
||||||
|
tailscale.com/util/set from tailscale.com/health+
|
||||||
tailscale.com/util/singleflight from tailscale.com/control/controlclient+
|
tailscale.com/util/singleflight from tailscale.com/control/controlclient+
|
||||||
tailscale.com/util/strs from tailscale.com/hostinfo+
|
tailscale.com/util/strs from tailscale.com/hostinfo+
|
||||||
tailscale.com/util/systemd from tailscale.com/control/controlclient+
|
tailscale.com/util/systemd from tailscale.com/control/controlclient+
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
"tailscale.com/envknob"
|
"tailscale.com/envknob"
|
||||||
"tailscale.com/tailcfg"
|
"tailscale.com/tailcfg"
|
||||||
"tailscale.com/util/multierr"
|
"tailscale.com/util/multierr"
|
||||||
|
"tailscale.com/util/set"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -26,7 +27,7 @@
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
sysErr = map[Subsystem]error{} // error key => err (or nil for no error)
|
sysErr = map[Subsystem]error{} // error key => err (or nil for no error)
|
||||||
watchers = map[*watchHandle]func(Subsystem, error){} // opt func to run if error state changes
|
watchers = set.HandleSet[func(Subsystem, error)]{} // opt func to run if error state changes
|
||||||
warnables = map[*Warnable]struct{}{} // set of warnables
|
warnables = map[*Warnable]struct{}{} // set of warnables
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
|
|
||||||
@ -148,8 +149,6 @@ func AppendWarnableDebugFlags(base []string) []string {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchHandle byte
|
|
||||||
|
|
||||||
// RegisterWatcher adds a function that will be called if an
|
// RegisterWatcher adds a function that will be called if an
|
||||||
// error changes state either to unhealthy or from unhealthy. It is
|
// error changes state either to unhealthy or from unhealthy. It is
|
||||||
// not called on transition from unknown to healthy. It must be non-nil
|
// not called on transition from unknown to healthy. It must be non-nil
|
||||||
@ -157,8 +156,7 @@ func AppendWarnableDebugFlags(base []string) []string {
|
|||||||
func RegisterWatcher(cb func(key Subsystem, err error)) (unregister func()) {
|
func RegisterWatcher(cb func(key Subsystem, err error)) (unregister func()) {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
handle := new(watchHandle)
|
handle := watchers.Add(cb)
|
||||||
watchers[handle] = cb
|
|
||||||
if timer == nil {
|
if timer == nil {
|
||||||
timer = time.AfterFunc(time.Minute, timerSelfCheck)
|
timer = time.AfterFunc(time.Minute, timerSelfCheck)
|
||||||
}
|
}
|
||||||
@ -174,23 +172,23 @@ func RegisterWatcher(cb func(key Subsystem, err error)) (unregister func()) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetRouterHealth sets the state of the wgengine/router.Router.
|
// SetRouterHealth sets the state of the wgengine/router.Router.
|
||||||
func SetRouterHealth(err error) { set(SysRouter, err) }
|
func SetRouterHealth(err error) { setErr(SysRouter, err) }
|
||||||
|
|
||||||
// RouterHealth returns the wgengine/router.Router error state.
|
// RouterHealth returns the wgengine/router.Router error state.
|
||||||
func RouterHealth() error { return get(SysRouter) }
|
func RouterHealth() error { return get(SysRouter) }
|
||||||
|
|
||||||
// SetDNSHealth sets the state of the net/dns.Manager
|
// SetDNSHealth sets the state of the net/dns.Manager
|
||||||
func SetDNSHealth(err error) { set(SysDNS, err) }
|
func SetDNSHealth(err error) { setErr(SysDNS, err) }
|
||||||
|
|
||||||
// DNSHealth returns the net/dns.Manager error state.
|
// DNSHealth returns the net/dns.Manager error state.
|
||||||
func DNSHealth() error { return get(SysDNS) }
|
func DNSHealth() error { return get(SysDNS) }
|
||||||
|
|
||||||
// SetDNSOSHealth sets the state of the net/dns.OSConfigurator
|
// SetDNSOSHealth sets the state of the net/dns.OSConfigurator
|
||||||
func SetDNSOSHealth(err error) { set(SysDNSOS, err) }
|
func SetDNSOSHealth(err error) { setErr(SysDNSOS, err) }
|
||||||
|
|
||||||
// SetDNSManagerHealth sets the state of the Linux net/dns manager's
|
// SetDNSManagerHealth sets the state of the Linux net/dns manager's
|
||||||
// discovery of the /etc/resolv.conf situation.
|
// discovery of the /etc/resolv.conf situation.
|
||||||
func SetDNSManagerHealth(err error) { set(SysDNSManager, err) }
|
func SetDNSManagerHealth(err error) { setErr(SysDNSManager, err) }
|
||||||
|
|
||||||
// DNSOSHealth returns the net/dns.OSConfigurator error state.
|
// DNSOSHealth returns the net/dns.OSConfigurator error state.
|
||||||
func DNSOSHealth() error { return get(SysDNSOS) }
|
func DNSOSHealth() error { return get(SysDNSOS) }
|
||||||
@ -213,7 +211,7 @@ func get(key Subsystem) error {
|
|||||||
return sysErr[key]
|
return sysErr[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
func set(key Subsystem, err error) {
|
func setErr(key Subsystem, err error) {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
setLocked(key, err)
|
setLocked(key, err)
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
"tailscale.com/util/mak"
|
"tailscale.com/util/mak"
|
||||||
"tailscale.com/util/multierr"
|
"tailscale.com/util/multierr"
|
||||||
"tailscale.com/util/osshare"
|
"tailscale.com/util/osshare"
|
||||||
|
"tailscale.com/util/set"
|
||||||
"tailscale.com/util/systemd"
|
"tailscale.com/util/systemd"
|
||||||
"tailscale.com/util/uniq"
|
"tailscale.com/util/uniq"
|
||||||
"tailscale.com/version"
|
"tailscale.com/version"
|
||||||
@ -180,8 +181,8 @@ type LocalBackend struct {
|
|||||||
peerAPIListeners []*peerAPIListener
|
peerAPIListeners []*peerAPIListener
|
||||||
loginFlags controlclient.LoginFlags
|
loginFlags controlclient.LoginFlags
|
||||||
incomingFiles map[*incomingFile]bool
|
incomingFiles map[*incomingFile]bool
|
||||||
fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received
|
fileWaiters set.HandleSet[context.CancelFunc] // of wake-up funcs
|
||||||
notifyWatchers map[*mapSetHandle]chan *ipn.Notify
|
notifyWatchers set.HandleSet[chan *ipn.Notify]
|
||||||
lastStatusTime time.Time // status.AsOf value of the last processed status update
|
lastStatusTime time.Time // status.AsOf value of the last processed status update
|
||||||
// directFileRoot, if non-empty, means to write received files
|
// directFileRoot, if non-empty, means to write received files
|
||||||
// directly to this directory, without staging them in an
|
// directly to this directory, without staging them in an
|
||||||
@ -1703,11 +1704,10 @@ func (b *LocalBackend) readPoller() {
|
|||||||
// notifications. There is currently (2022-11-22) no mechanism provided to
|
// notifications. There is currently (2022-11-22) no mechanism provided to
|
||||||
// detect when a message has been dropped.
|
// detect when a message has been dropped.
|
||||||
func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, fn func(roNotify *ipn.Notify) (keepGoing bool)) {
|
func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, fn func(roNotify *ipn.Notify) (keepGoing bool)) {
|
||||||
handle := new(mapSetHandle)
|
|
||||||
ch := make(chan *ipn.Notify, 128)
|
ch := make(chan *ipn.Notify, 128)
|
||||||
|
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
mak.Set(&b.notifyWatchers, handle, ch)
|
handle := b.notifyWatchers.Add(ch)
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
defer func() {
|
defer func() {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
@ -3745,18 +3745,16 @@ func (b *LocalBackend) TestOnlyPublicKeys() (machineKey key.MachinePublic, nodeK
|
|||||||
return mk, nk
|
return mk, nk
|
||||||
}
|
}
|
||||||
|
|
||||||
// mapSetHandle is a minimal (but non-zero) value whose address serves as a map
|
func (b *LocalBackend) removeFileWaiter(handle set.Handle) {
|
||||||
// key for sets of non-comparable values that can't be map keys themselves.
|
|
||||||
type mapSetHandle byte
|
|
||||||
|
|
||||||
func (b *LocalBackend) setFileWaiter(handle *mapSetHandle, wakeWaiter context.CancelFunc) {
|
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
if wakeWaiter == nil {
|
|
||||||
delete(b.fileWaiters, handle)
|
delete(b.fileWaiters, handle)
|
||||||
} else {
|
|
||||||
mak.Set(&b.fileWaiters, handle, wakeWaiter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *LocalBackend) addFileWaiter(wakeWaiter context.CancelFunc) set.Handle {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
return b.fileWaiters.Add(wakeWaiter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LocalBackend) WaitingFiles() ([]apitype.WaitingFile, error) {
|
func (b *LocalBackend) WaitingFiles() ([]apitype.WaitingFile, error) {
|
||||||
@ -3780,9 +3778,8 @@ func (b *LocalBackend) AwaitWaitingFiles(ctx context.Context) ([]apitype.Waiting
|
|||||||
gotFile, gotFileCancel := context.WithCancel(context.Background())
|
gotFile, gotFileCancel := context.WithCancel(context.Background())
|
||||||
defer gotFileCancel()
|
defer gotFileCancel()
|
||||||
|
|
||||||
handle := new(mapSetHandle)
|
handle := b.addFileWaiter(gotFileCancel)
|
||||||
b.setFileWaiter(handle, gotFileCancel)
|
defer b.removeFileWaiter(handle)
|
||||||
defer b.setFileWaiter(handle, nil)
|
|
||||||
|
|
||||||
// Now that we've registered ourselves, check again, in case
|
// Now that we've registered ourselves, check again, in case
|
||||||
// of race. Otherwise there's a small window where we could
|
// of race. Otherwise there's a small window where we could
|
||||||
|
30
util/set/set.go
Normal file
30
util/set/set.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// Package set contains set types.
|
||||||
|
package set
|
||||||
|
|
||||||
|
// HandleSet is a set of T.
|
||||||
|
//
|
||||||
|
// It is not safe for concurrent use.
|
||||||
|
type HandleSet[T any] map[Handle]T
|
||||||
|
|
||||||
|
// Handle is a opaque comparable value that's used as the map key
|
||||||
|
// in a HandleSet. The only way to get one is to call HandleSet.Add.
|
||||||
|
type Handle struct {
|
||||||
|
v *byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds the element (map value) e to the set.
|
||||||
|
//
|
||||||
|
// It returns the handle (map key) with which e can be removed, using a map
|
||||||
|
// delete.
|
||||||
|
func (s *HandleSet[T]) Add(e T) Handle {
|
||||||
|
h := Handle{new(byte)}
|
||||||
|
if *s == nil {
|
||||||
|
*s = make(HandleSet[T])
|
||||||
|
}
|
||||||
|
(*s)[h] = e
|
||||||
|
return h
|
||||||
|
}
|
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
"tailscale.com/net/interfaces"
|
"tailscale.com/net/interfaces"
|
||||||
"tailscale.com/types/logger"
|
"tailscale.com/types/logger"
|
||||||
|
"tailscale.com/util/set"
|
||||||
)
|
)
|
||||||
|
|
||||||
// pollWallTimeInterval is how often we check the time to check
|
// pollWallTimeInterval is how often we check the time to check
|
||||||
@ -54,9 +55,6 @@ type osMon interface {
|
|||||||
// callback.
|
// callback.
|
||||||
type ChangeFunc func(changed bool, state *interfaces.State)
|
type ChangeFunc func(changed bool, state *interfaces.State)
|
||||||
|
|
||||||
// An allocated callbackHandle's address is the Mon.cbs map key.
|
|
||||||
type callbackHandle byte
|
|
||||||
|
|
||||||
// Mon represents a monitoring instance.
|
// Mon represents a monitoring instance.
|
||||||
type Mon struct {
|
type Mon struct {
|
||||||
logf logger.Logf
|
logf logger.Logf
|
||||||
@ -65,8 +63,8 @@ type Mon struct {
|
|||||||
stop chan struct{} // closed on Stop
|
stop chan struct{} // closed on Stop
|
||||||
|
|
||||||
mu sync.Mutex // guards all following fields
|
mu sync.Mutex // guards all following fields
|
||||||
cbs map[*callbackHandle]ChangeFunc
|
cbs set.HandleSet[ChangeFunc]
|
||||||
ruleDelCB map[*callbackHandle]RuleDeleteCallback
|
ruleDelCB set.HandleSet[RuleDeleteCallback]
|
||||||
ifState *interfaces.State
|
ifState *interfaces.State
|
||||||
gwValid bool // whether gw and gwSelfIP are valid
|
gwValid bool // whether gw and gwSelfIP are valid
|
||||||
gw netip.Addr // our gateway's IP
|
gw netip.Addr // our gateway's IP
|
||||||
@ -86,7 +84,6 @@ func New(logf logger.Logf) (*Mon, error) {
|
|||||||
logf = logger.WithPrefix(logf, "monitor: ")
|
logf = logger.WithPrefix(logf, "monitor: ")
|
||||||
m := &Mon{
|
m := &Mon{
|
||||||
logf: logf,
|
logf: logf,
|
||||||
cbs: map[*callbackHandle]ChangeFunc{},
|
|
||||||
change: make(chan struct{}, 1),
|
change: make(chan struct{}, 1),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
lastWall: wallTime(),
|
lastWall: wallTime(),
|
||||||
@ -144,10 +141,9 @@ func (m *Mon) GatewayAndSelfIP() (gw, myIP netip.Addr, ok bool) {
|
|||||||
// notified (in their own goroutine) when the network state changes.
|
// notified (in their own goroutine) when the network state changes.
|
||||||
// To remove this callback, call unregister (or close the monitor).
|
// To remove this callback, call unregister (or close the monitor).
|
||||||
func (m *Mon) RegisterChangeCallback(callback ChangeFunc) (unregister func()) {
|
func (m *Mon) RegisterChangeCallback(callback ChangeFunc) (unregister func()) {
|
||||||
handle := new(callbackHandle)
|
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
m.cbs[handle] = callback
|
handle := m.cbs.Add(callback)
|
||||||
return func() {
|
return func() {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
@ -165,13 +161,9 @@ func (m *Mon) RegisterChangeCallback(callback ChangeFunc) (unregister func()) {
|
|||||||
// notified (in their own goroutine) when a Linux ip rule is deleted.
|
// notified (in their own goroutine) when a Linux ip rule is deleted.
|
||||||
// To remove this callback, call unregister (or close the monitor).
|
// To remove this callback, call unregister (or close the monitor).
|
||||||
func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregister func()) {
|
func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregister func()) {
|
||||||
handle := new(callbackHandle)
|
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
if m.ruleDelCB == nil {
|
handle := m.ruleDelCB.Add(callback)
|
||||||
m.ruleDelCB = map[*callbackHandle]RuleDeleteCallback{}
|
|
||||||
}
|
|
||||||
m.ruleDelCB[handle] = callback
|
|
||||||
return func() {
|
return func() {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user