From dda03a911ee52b27864ecab87e4a913840fc4bde Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 27 Feb 2021 19:33:21 -0800 Subject: [PATCH] wgengine/monitor: change API to permit multiple independent callbakcks Currently it assumes exactly 1 registered callback. This changes it to support 0, 1, or more than 1. This is a step towards plumbing wgengine/monitor into more places (and moving some of wgengine's interface state fetching into monitor in a later step) Signed-off-by: Brad Fitzpatrick --- cmd/tailscaled/debug.go | 9 +++++---- wgengine/monitor/monitor.go | 36 ++++++++++++++++++++++++++++++------ wgengine/userspace.go | 10 ++++++---- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/cmd/tailscaled/debug.go b/cmd/tailscaled/debug.go index d852ad893..e13f361bd 100644 --- a/cmd/tailscaled/debug.go +++ b/cmd/tailscaled/debug.go @@ -69,13 +69,14 @@ func runMonitor(ctx context.Context) error { j, _ := json.MarshalIndent(st, "", " ") os.Stderr.Write(j) } - mon, err := monitor.New(log.Printf, func() { - log.Printf("Link monitor fired. State:") - dump() - }) + mon, err := monitor.New(log.Printf) if err != nil { return err } + mon.RegisterChangeCallback(func() { + log.Printf("Link monitor fired. State:") + dump() + }) log.Printf("Starting link change monitor; initial state:") dump() mon.Start() diff --git a/wgengine/monitor/monitor.go b/wgengine/monitor/monitor.go index bb0c4e853..5c15f3df3 100644 --- a/wgengine/monitor/monitor.go +++ b/wgengine/monitor/monitor.go @@ -36,23 +36,28 @@ type osMon interface { // an interface status changes. type ChangeFunc func() +// An allocated callbackHandle's address is the Mon.cbs map key. +type callbackHandle byte + // Mon represents a monitoring instance. type Mon struct { logf logger.Logf - cb ChangeFunc om osMon // nil means not supported on this platform change chan struct{} stop chan struct{} + mu sync.Mutex // guards cbs + cbs map[*callbackHandle]ChangeFunc + onceStart sync.Once started bool goroutines sync.WaitGroup } -// New instantiates and starts a monitoring instance. Change notifications -// are propagated to the callback function. +// New instantiates and starts a monitoring instance. // The returned monitor is inactive until it's started by the Start method. -func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) { +// Use RegisterChangeCallback to get notified of network changes. +func New(logf logger.Logf) (*Mon, error) { logf = logger.WithPrefix(logf, "monitor: ") om, err := newOSMon(logf) if err != nil { @@ -60,13 +65,28 @@ func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) { } return &Mon{ logf: logf, - cb: callback, + cbs: map[*callbackHandle]ChangeFunc{}, om: om, change: make(chan struct{}, 1), stop: make(chan struct{}), }, nil } +// RegisterChangeCallback adds callback to the set of parties to be +// notified (in their own goroutine) when the network state changes. +// To remove this callback, call unregister (or close the monitor). +func (m *Mon) RegisterChangeCallback(callback ChangeFunc) (unregister func()) { + handle := new(callbackHandle) + m.mu.Lock() + defer m.mu.Unlock() + m.cbs[handle] = callback + return func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.cbs, handle) + } +} + // Start starts the monitor. // A monitor can only be started & closed once. func (m *Mon) Start() { @@ -136,7 +156,11 @@ func (m *Mon) debounce() { case <-m.change: } - m.cb() + m.mu.Lock() + for _, cb := range m.cbs { + go cb() + } + m.mu.Unlock() select { case <-m.stop: diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 79512392e..e7f8dc034 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -256,14 +256,16 @@ func newUserspaceEngineAdvanced(conf EngineConfig) (_ Engine, reterr error) { e.linkState, _ = getLinkState() logf("link state: %+v", e.linkState) - mon, err := monitor.New(logf, func() { - e.LinkChange(false) - tshttpproxy.InvalidateCache() - }) + mon, err := monitor.New(logf) if err != nil { return nil, err } closePool.add(mon) + unregisterMonWatch := mon.RegisterChangeCallback(func() { + e.LinkChange(false) + tshttpproxy.InvalidateCache() + }) + closePool.addFunc(unregisterMonWatch) e.linkMon = mon endpointsFn := func(endpoints []string) {