mirror of
https://github.com/tailscale/tailscale.git
synced 2025-04-03 23:05:50 +00:00
health, wgenegine: fix receive func health checks yet again
The existing implementation was completely, embarrassingly conceptually broken. We aren't able to see whether wireguard-go's receive function goroutines are running or not. All we can do is model that based on what we have done. This commit fixes that model. Fixes #1781 Signed-off-by: Josh Bleecher Snyder <josharian@gmail.com>
This commit is contained in:
parent
30f5d706a1
commit
8d7f7fc7ce
@ -37,12 +37,9 @@ var (
|
|||||||
ipnWantRunning bool
|
ipnWantRunning bool
|
||||||
anyInterfaceUp = true // until told otherwise
|
anyInterfaceUp = true // until told otherwise
|
||||||
|
|
||||||
receiveIPv4Started bool
|
ReceiveIPv4 = ReceiveFuncState{name: "IPv4"}
|
||||||
receiveIPv4Running bool
|
ReceiveIPv6 = ReceiveFuncState{name: "IPv6"}
|
||||||
receiveIPv6Started bool
|
ReceiveDERP = ReceiveFuncState{name: "DERP"}
|
||||||
receiveIPv6Running bool
|
|
||||||
receiveDERPStarted bool
|
|
||||||
receiveDERPRunning bool
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Subsystem is the name of a subsystem whose health can be monitored.
|
// Subsystem is the name of a subsystem whose health can be monitored.
|
||||||
@ -220,17 +217,65 @@ func SetAnyInterfaceUp(up bool) {
|
|||||||
selfCheckLocked()
|
selfCheckLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetReceiveIPv4Started(running bool) { setHealthBool(&receiveIPv4Started, running) }
|
// ReceiveFuncState tracks the state of a wireguard-go conn.ReceiveFunc.
|
||||||
func SetReceiveIPv4Running(running bool) { setHealthBool(&receiveIPv4Running, running) }
|
type ReceiveFuncState struct {
|
||||||
func SetReceiveIPv6Started(running bool) { setHealthBool(&receiveIPv6Started, running) }
|
// name is a mnemonic for the receive func, used in error messages.
|
||||||
func SetReceiveIPv6Running(running bool) { setHealthBool(&receiveIPv6Running, running) }
|
name string
|
||||||
func SetReceiveDERPStarted(running bool) { setHealthBool(&receiveDERPStarted, running) }
|
// started indicates whether magicsock.connBind.Open
|
||||||
func SetReceiveDERPRunning(running bool) { setHealthBool(&receiveDERPRunning, running) }
|
// has requested that wireguard-go start its receive func
|
||||||
|
// goroutine (without a corresponding connBind.Close).
|
||||||
|
started bool
|
||||||
|
// running models whether wireguard-go's receive func
|
||||||
|
// goroutine is actually running. We cannot easily introspect that,
|
||||||
|
// so it is based on our knowledge of wireguard-go's internals.
|
||||||
|
running bool
|
||||||
|
}
|
||||||
|
|
||||||
func setHealthBool(dst *bool, b bool) {
|
// err returns the error state (if any) that s represents.
|
||||||
|
func (s ReceiveFuncState) err() error {
|
||||||
|
// Possible states:
|
||||||
|
// | started | running | notes
|
||||||
|
// | ------- | ------- | -----
|
||||||
|
// | true | true | normal operation
|
||||||
|
// | true | false | we prematurely returned a permanent error from this receive func
|
||||||
|
// | false | true | we have told package health that we're closing the bind, but the receive funcs haven't closed yet (transient)
|
||||||
|
// | false | false | not running
|
||||||
|
|
||||||
|
// The problematic case is started && !running.
|
||||||
|
// If that happens, wireguard-go will no longer request packets,
|
||||||
|
// and we'll lose an entire communication channel.
|
||||||
|
if s.started && !s.running {
|
||||||
|
return fmt.Errorf("receive%s started but not running", s.name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open tells r that connBind.Open has requested wireguard-go open a conn.Bind that includes r.
|
||||||
|
func (r *ReceiveFuncState) Open() {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
*dst = b
|
r.started = true
|
||||||
|
r.running = true
|
||||||
|
selfCheckLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop tells r that we have returned a permanent error to wireguard-go.
|
||||||
|
// wireguard-go's receive func goroutine for r will soon stop.
|
||||||
|
func (r *ReceiveFuncState) Stop() {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
r.running = false
|
||||||
|
selfCheckLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close tells r that connBind.Close has requested wireguard-go close the bind for r.
|
||||||
|
// This will stop the corresponding receive func goroutine.
|
||||||
|
// Close must be called before actually closing the underlying connection,
|
||||||
|
// to avoid a small window of false positives.
|
||||||
|
func (r *ReceiveFuncState) Close() {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
r.started = false
|
||||||
selfCheckLocked()
|
selfCheckLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,14 +329,10 @@ func overallErrorLocked() error {
|
|||||||
_ = lastMapRequestHeard
|
_ = lastMapRequestHeard
|
||||||
|
|
||||||
var errs []error
|
var errs []error
|
||||||
if receiveIPv4Started && !receiveIPv4Running {
|
for _, recv := range []ReceiveFuncState{ReceiveIPv4, ReceiveIPv6, ReceiveDERP} {
|
||||||
errs = append(errs, errors.New("receiveIPv4 not running"))
|
if err := recv.err(); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
if receiveDERPStarted && !receiveDERPRunning {
|
|
||||||
errs = append(errs, errors.New("receiveDERP not running"))
|
|
||||||
}
|
|
||||||
if receiveIPv6Started && !receiveIPv6Running {
|
|
||||||
errs = append(errs, errors.New("receiveIPv6 not running"))
|
|
||||||
}
|
}
|
||||||
for sys, err := range sysErr {
|
for sys, err := range sysErr {
|
||||||
if err == nil || sys == SysOverall {
|
if err == nil || sys == SysOverall {
|
||||||
|
@ -1581,12 +1581,12 @@ func (c *Conn) noteRecvActivityFromEndpoint(e conn.Endpoint) {
|
|||||||
|
|
||||||
// receiveIPv6 receives a UDP IPv6 packet. It is called by wireguard-go.
|
// receiveIPv6 receives a UDP IPv6 packet. It is called by wireguard-go.
|
||||||
func (c *Conn) receiveIPv6(b []byte) (int, conn.Endpoint, error) {
|
func (c *Conn) receiveIPv6(b []byte) (int, conn.Endpoint, error) {
|
||||||
health.SetReceiveIPv6Running(true)
|
|
||||||
health.SetReceiveIPv6Started(true)
|
|
||||||
defer health.SetReceiveIPv6Running(false)
|
|
||||||
for {
|
for {
|
||||||
n, ipp, err := c.pconn6.ReadFromNetaddr(b)
|
n, ipp, err := c.pconn6.ReadFromNetaddr(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if isPermanentNetError(err) {
|
||||||
|
health.ReceiveIPv6.Stop()
|
||||||
|
}
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint6); ok {
|
if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint6); ok {
|
||||||
@ -1597,12 +1597,12 @@ func (c *Conn) receiveIPv6(b []byte) (int, conn.Endpoint, error) {
|
|||||||
|
|
||||||
// receiveIPv4 receives a UDP IPv4 packet. It is called by wireguard-go.
|
// receiveIPv4 receives a UDP IPv4 packet. It is called by wireguard-go.
|
||||||
func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
|
func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
|
||||||
health.SetReceiveIPv4Running(true)
|
|
||||||
health.SetReceiveIPv4Started(true)
|
|
||||||
defer health.SetReceiveIPv4Running(false)
|
|
||||||
for {
|
for {
|
||||||
n, ipp, err := c.pconn4.ReadFromNetaddr(b)
|
n, ipp, err := c.pconn4.ReadFromNetaddr(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if isPermanentNetError(err) {
|
||||||
|
health.ReceiveIPv4.Stop()
|
||||||
|
}
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint4); ok {
|
if ep, ok := c.receiveIP(b[:n], ipp, &c.ippEndpoint4); ok {
|
||||||
@ -1652,9 +1652,6 @@ func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache)
|
|||||||
// If the packet was a disco message or the peer endpoint wasn't
|
// If the packet was a disco message or the peer endpoint wasn't
|
||||||
// found, the returned error is errLoopAgain.
|
// found, the returned error is errLoopAgain.
|
||||||
func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
|
func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
|
||||||
health.SetReceiveDERPRunning(true)
|
|
||||||
health.SetReceiveDERPStarted(true)
|
|
||||||
defer health.SetReceiveDERPRunning(false)
|
|
||||||
for dm := range c.derpRecvCh {
|
for dm := range c.derpRecvCh {
|
||||||
if c.Closed() {
|
if c.Closed() {
|
||||||
break
|
break
|
||||||
@ -1666,6 +1663,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
|
|||||||
}
|
}
|
||||||
return n, ep, nil
|
return n, ep, nil
|
||||||
}
|
}
|
||||||
|
health.ReceiveDERP.Stop()
|
||||||
return 0, nil, net.ErrClosed
|
return 0, nil, net.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1736,6 +1734,18 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con
|
|||||||
return n, ep
|
return n, ep
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isPermanentNetError reports whether err is permanent.
|
||||||
|
// It matches an equivalent check in wireguard-go's RoutineReceiveIncoming.
|
||||||
|
func isPermanentNetError(err error) bool {
|
||||||
|
// Once this module requires Go 1.17, the comparison to net.ErrClosed can be removed.
|
||||||
|
// See https://github.com/golang/go/issues/45357.
|
||||||
|
if errors.Is(err, net.ErrClosed) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
neterr, ok := err.(net.Error)
|
||||||
|
return ok && !neterr.Temporary()
|
||||||
|
}
|
||||||
|
|
||||||
// discoLogLevel controls the verbosity of discovery log messages.
|
// discoLogLevel controls the verbosity of discovery log messages.
|
||||||
type discoLogLevel int
|
type discoLogLevel int
|
||||||
|
|
||||||
@ -2420,8 +2430,11 @@ func (c *connBind) Open(ignoredPort uint16) ([]conn.ReceiveFunc, uint16, error)
|
|||||||
}
|
}
|
||||||
c.closed = false
|
c.closed = false
|
||||||
fns := []conn.ReceiveFunc{c.receiveIPv4, c.receiveDERP}
|
fns := []conn.ReceiveFunc{c.receiveIPv4, c.receiveDERP}
|
||||||
|
health.ReceiveIPv4.Open()
|
||||||
|
health.ReceiveDERP.Open()
|
||||||
if c.pconn6 != nil {
|
if c.pconn6 != nil {
|
||||||
fns = append(fns, c.receiveIPv6)
|
fns = append(fns, c.receiveIPv6)
|
||||||
|
health.ReceiveIPv6.Open()
|
||||||
}
|
}
|
||||||
// TODO: Combine receiveIPv4 and receiveIPv6 and receiveIP into a single
|
// TODO: Combine receiveIPv4 and receiveIPv6 and receiveIP into a single
|
||||||
// closure that closes over a *RebindingUDPConn?
|
// closure that closes over a *RebindingUDPConn?
|
||||||
@ -2443,17 +2456,15 @@ func (c *connBind) Close() error {
|
|||||||
}
|
}
|
||||||
c.closed = true
|
c.closed = true
|
||||||
// Unblock all outstanding receives.
|
// Unblock all outstanding receives.
|
||||||
// Tell the health checker that we're closing the connections
|
health.ReceiveIPv4.Close()
|
||||||
// before actually closing them to avoid false positives.
|
|
||||||
health.SetReceiveIPv4Started(false)
|
|
||||||
c.pconn4.Close()
|
c.pconn4.Close()
|
||||||
if c.pconn6 != nil {
|
if c.pconn6 != nil {
|
||||||
health.SetReceiveIPv6Started(false)
|
health.ReceiveIPv6.Close()
|
||||||
c.pconn6.Close()
|
c.pconn6.Close()
|
||||||
}
|
}
|
||||||
// Send an empty read result to unblock receiveDERP,
|
// Send an empty read result to unblock receiveDERP,
|
||||||
// which will then check connBind.Closed.
|
// which will then check connBind.Closed.
|
||||||
health.SetReceiveDERPStarted(false)
|
health.ReceiveDERP.Close()
|
||||||
c.derpRecvCh <- derpReadResult{}
|
c.derpRecvCh <- derpReadResult{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user