wgengine{,/monitor}: move interface state fetching/comparing to monitor

Gets it out of wgengine so the Engine isn't responsible for being a
callback registration hub for it.

This also removes the Engine.LinkChange method, as it's no longer
necessary.  The monitor tells us about changes; it doesn't seem to
need any help. (Currently it was only used by Swift, but as of
14dc79013754fe we just do the same from Go)

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2021-03-01 12:56:03 -08:00
parent a038e8690c
commit e3df29d488
6 changed files with 79 additions and 106 deletions

View File

@ -60,12 +60,7 @@ func debugMode(args []string) error {
}
func runMonitor(ctx context.Context) error {
dump := func() {
st, err := interfaces.GetState()
if err != nil {
log.Printf("error getting state: %v", err)
return
}
dump := func(st *interfaces.State) {
j, _ := json.MarshalIndent(st, "", " ")
os.Stderr.Write(j)
}
@ -73,12 +68,16 @@ func runMonitor(ctx context.Context) error {
if err != nil {
return err
}
mon.RegisterChangeCallback(func() {
log.Printf("Link monitor fired. State:")
dump()
mon.RegisterChangeCallback(func(changed bool, st *interfaces.State) {
if changed {
log.Printf("Link monitor fired; no change")
return
}
log.Printf("Link monitor fired. New state:")
dump(st)
})
log.Printf("Starting link change monitor; initial state:")
dump()
dump(mon.InterfaceState())
mon.Start()
log.Printf("Started link change monitor; waiting...")
select {}

View File

@ -64,19 +64,20 @@ func getControlDebugFlags() []string {
// state machine generates events back out to zero or more components.
type LocalBackend struct {
// Elements that are thread-safe or constant after construction.
ctx context.Context // canceled by Close
ctxCancel context.CancelFunc // cancels ctx
logf logger.Logf // general logging
keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change
e wgengine.Engine
store ipn.StateStore
backendLogID string
portpoll *portlist.Poller // may be nil
portpollOnce sync.Once // guards starting readPoller
gotPortPollRes chan struct{} // closed upon first readPoller result
serverURL string // tailcontrol URL
newDecompressor func() (controlclient.Decompressor, error)
ctx context.Context // canceled by Close
ctxCancel context.CancelFunc // cancels ctx
logf logger.Logf // general logging
keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change
e wgengine.Engine
store ipn.StateStore
backendLogID string
unregisterLinkMon func()
portpoll *portlist.Poller // may be nil
portpollOnce sync.Once // guards starting readPoller
gotPortPollRes chan struct{} // closed upon first readPoller result
serverURL string // tailcontrol URL
newDecompressor func() (controlclient.Decompressor, error)
filterHash string
@ -138,7 +139,7 @@ func NewLocalBackend(logf logger.Logf, logid string, store ipn.StateStore, e wge
portpoll: portpoll,
gotPortPollRes: make(chan struct{}),
}
e.SetLinkChangeCallback(b.linkChange)
b.unregisterLinkMon = e.GetLinkMonitor().RegisterChangeCallback(b.linkChange)
b.statusChanged = sync.NewCond(&b.statusLock)
return b, nil
@ -178,6 +179,7 @@ func (b *LocalBackend) Shutdown() {
cli := b.c
b.mu.Unlock()
b.unregisterLinkMon()
if cli != nil {
cli.Shutdown()
}

View File

@ -33,9 +33,11 @@ type osMon interface {
Receive() (message, error)
}
// ChangeFunc is a callback function that's called when
// an interface status changes.
type ChangeFunc func()
// ChangeFunc is a callback function that's called when the network
// changed. The changed parameter is whether the network changed
// enough for interfaces.State to have changed since the last
// callback.
type ChangeFunc func(changed bool, state *interfaces.State)
// An allocated callbackHandle's address is the Mon.cbs map key.
type callbackHandle byte
@ -47,8 +49,9 @@ type Mon struct {
change chan struct{}
stop chan struct{}
mu sync.Mutex // guards cbs
cbs map[*callbackHandle]ChangeFunc
mu sync.Mutex // guards cbs
cbs map[*callbackHandle]ChangeFunc
ifState *interfaces.State
onceStart sync.Once
started bool
@ -64,18 +67,30 @@ func New(logf logger.Logf) (*Mon, error) {
if err != nil {
return nil, err
}
return &Mon{
m := &Mon{
logf: logf,
cbs: map[*callbackHandle]ChangeFunc{},
om: om,
change: make(chan struct{}, 1),
stop: make(chan struct{}),
}, nil
}
st, err := m.interfaceStateUncached()
if err != nil {
return nil, err
}
m.ifState = st
return m, nil
}
// InterfaceState returns the state of the machine's network interfaces,
// without any Tailscale ones.
func (m *Mon) InterfaceState() (*interfaces.State, error) {
func (m *Mon) InterfaceState() *interfaces.State {
m.mu.Lock()
defer m.mu.Unlock()
return m.ifState
}
func (m *Mon) interfaceStateUncached() (*interfaces.State, error) {
s, err := interfaces.GetState()
if s != nil {
s.RemoveTailscaleInterfaces()
@ -167,11 +182,19 @@ func (m *Mon) debounce() {
case <-m.change:
}
m.mu.Lock()
for _, cb := range m.cbs {
go cb()
if curState, err := m.interfaceStateUncached(); err != nil {
m.logf("interfaces.State: %v", err)
} else {
m.mu.Lock()
changed := !curState.Equal(m.ifState)
if changed {
m.ifState = curState
}
for _, cb := range m.cbs {
go cb(changed, m.ifState)
}
m.mu.Unlock()
}
m.mu.Unlock()
select {
case <-m.stop:

View File

@ -120,11 +120,9 @@ type userspaceEngine struct {
mu sync.Mutex // guards following; see lock order comment below
closing bool // Close was called (even if we're still closing)
statusCallback StatusCallback
linkChangeCallback func(major bool, newState *interfaces.State)
peerSequence []wgkey.Key
endpoints []string
pingers map[wgkey.Key]*pinger // legacy pingers for pre-discovery peers
linkState *interfaces.State
pingers map[wgkey.Key]*pinger // legacy pingers for pre-discovery peers
pendOpen map[flowtrack.Tuple]*pendingOpenFlow // see pendopen.go
networkMapCallbacks map[*someHandle]NetworkMapCallback
@ -248,12 +246,11 @@ func newUserspaceEngine(logf logger.Logf, rawTUNDev tun.Device, conf Config) (_
e.linkMonOwned = true
}
e.linkState, _ = e.linkMon.InterfaceState()
logf("link state: %+v", e.linkState)
logf("link state: %+v", e.linkMon.InterfaceState())
unregisterMonWatch := e.linkMon.RegisterChangeCallback(func() {
e.LinkChange(false)
unregisterMonWatch := e.linkMon.RegisterChangeCallback(func(changed bool, st *interfaces.State) {
tshttpproxy.InvalidateCache()
e.linkChange(false, st)
})
closePool.addFunc(unregisterMonWatch)
e.linkMonUnregister = unregisterMonWatch
@ -279,7 +276,7 @@ func newUserspaceEngine(logf logger.Logf, rawTUNDev tun.Device, conf Config) (_
return nil, fmt.Errorf("wgengine: %v", err)
}
closePool.add(e.magicConn)
e.magicConn.SetNetworkUp(e.linkState.AnyInterfaceUp())
e.magicConn.SetNetworkUp(e.linkMon.InterfaceState().AnyInterfaceUp())
// Respond to all pings only in fake mode.
if conf.Fake {
@ -1250,30 +1247,15 @@ func (e *userspaceEngine) Wait() {
<-e.waitCh
}
func (e *userspaceEngine) setLinkState(st *interfaces.State) (changed bool, cb func(major bool, newState *interfaces.State)) {
if st == nil {
return false, nil
}
e.mu.Lock()
defer e.mu.Unlock()
changed = e.linkState == nil || !st.Equal(e.linkState)
e.linkState = st
return changed, e.linkChangeCallback
func (e *userspaceEngine) GetLinkMonitor() *monitor.Mon {
return e.linkMon
}
func (e *userspaceEngine) LinkChange(isExpensive bool) {
cur, err := e.linkMon.InterfaceState()
if err != nil {
e.logf("LinkChange: interfaces.GetState: %v", err)
return
}
cur.IsExpensive = isExpensive
needRebind, linkChangeCallback := e.setLinkState(cur)
func (e *userspaceEngine) linkChange(changed bool, cur *interfaces.State) {
up := cur.AnyInterfaceUp()
if !up {
e.logf("LinkChange: all links down; pausing: %v", cur)
} else if needRebind {
} else if changed {
e.logf("LinkChange: major, rebinding. New state: %v", cur)
} else {
e.logf("[v1] LinkChange: minor")
@ -1282,23 +1264,11 @@ func (e *userspaceEngine) LinkChange(isExpensive bool) {
e.magicConn.SetNetworkUp(up)
why := "link-change-minor"
if needRebind {
if changed {
why = "link-change-major"
e.magicConn.Rebind()
}
e.magicConn.ReSTUN(why)
if linkChangeCallback != nil {
go linkChangeCallback(needRebind, cur)
}
}
func (e *userspaceEngine) SetLinkChangeCallback(cb func(major bool, newState *interfaces.State)) {
e.mu.Lock()
defer e.mu.Unlock()
e.linkChangeCallback = cb
if e.linkState != nil {
go cb(false, e.linkState)
}
}
func (e *userspaceEngine) AddNetworkMapCallback(cb NetworkMapCallback) func() {

View File

@ -14,10 +14,10 @@
"inet.af/netaddr"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/interfaces"
"tailscale.com/tailcfg"
"tailscale.com/types/netmap"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/monitor"
"tailscale.com/wgengine/router"
"tailscale.com/wgengine/tsdns"
"tailscale.com/wgengine/wgcfg"
@ -75,10 +75,11 @@ func (e *watchdogEngine) watchdog(name string, fn func()) {
func (e *watchdogEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config) error {
return e.watchdogErr("Reconfig", func() error { return e.wrap.Reconfig(cfg, routerCfg) })
}
func (e *watchdogEngine) GetLinkMonitor() *monitor.Mon {
return e.wrap.GetLinkMonitor()
}
func (e *watchdogEngine) GetFilter() *filter.Filter {
var x *filter.Filter
e.watchdog("GetFilter", func() { x = e.wrap.GetFilter() })
return x
return e.wrap.GetFilter()
}
func (e *watchdogEngine) SetFilter(filt *filter.Filter) {
e.watchdog("SetFilter", func() { e.wrap.SetFilter(filt) })
@ -98,12 +99,6 @@ func (e *watchdogEngine) SetNetInfoCallback(cb NetInfoCallback) {
func (e *watchdogEngine) RequestStatus() {
e.watchdog("RequestStatus", func() { e.wrap.RequestStatus() })
}
func (e *watchdogEngine) LinkChange(isExpensive bool) {
e.watchdog("LinkChange", func() { e.wrap.LinkChange(isExpensive) })
}
func (e *watchdogEngine) SetLinkChangeCallback(cb func(major bool, newState *interfaces.State)) {
e.watchdog("SetLinkChangeCallback", func() { e.wrap.SetLinkChangeCallback(cb) })
}
func (e *watchdogEngine) SetDERPMap(m *tailcfg.DERPMap) {
e.watchdog("SetDERPMap", func() { e.wrap.SetDERPMap(m) })
}

View File

@ -9,10 +9,10 @@
"inet.af/netaddr"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/interfaces"
"tailscale.com/tailcfg"
"tailscale.com/types/netmap"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/monitor"
"tailscale.com/wgengine/router"
"tailscale.com/wgengine/tsdns"
"tailscale.com/wgengine/wgcfg"
@ -72,6 +72,9 @@ type Engine interface {
// WireGuard status changes.
SetStatusCallback(StatusCallback)
// GetLinkMonitor returns the link monitor.
GetLinkMonitor() *monitor.Mon
// RequestStatus requests a WireGuard status update right
// away, sent to the callback registered via SetStatusCallback.
RequestStatus()
@ -86,18 +89,6 @@ type Engine interface {
// TODO: return an error?
Wait()
// LinkChange informs the engine that the system network
// link has changed. The isExpensive parameter is set on links
// where sending packets uses substantial power or money,
// such as mobile data on a phone.
//
// LinkChange should be called whenever something changed with
// the network, no matter how minor. The implementation should
// look at the state of the network and decide whether the
// change from before is interesting enough to warrant taking
// action on.
LinkChange(isExpensive bool)
// SetDERPMap controls which (if any) DERP servers are used.
// If nil, DERP is disabled. It starts disabled until a DERP map
// is configured.
@ -120,13 +111,6 @@ type Engine interface {
// new NetInfo summary is available.
SetNetInfoCallback(NetInfoCallback)
// SetLinkChangeCallback sets the function to call when the
// link state changes.
// The provided function is run in a new goroutine once upon
// initial call (if the engine has a known link state) and
// upon any change.
SetLinkChangeCallback(func(major bool, newState *interfaces.State))
// DiscoPublicKey gets the public key used for path discovery
// messages.
DiscoPublicKey() tailcfg.DiscoKey