wgengine/magicsock: deflake tests, Close deadlock again

Better fix than 37903a9056d664ddbc52cce3324dc0610d81862b

Fixes tailscale/corp#533
This commit is contained in:
Brad Fitzpatrick 2020-08-04 09:36:38 -07:00
parent c3467fbadb
commit bfcb0aa0be
2 changed files with 35 additions and 15 deletions

View File

@ -125,7 +125,8 @@ type Conn struct {
packetListener nettype.PacketListener packetListener nettype.PacketListener
// ============================================================ // ============================================================
mu sync.Mutex // guards all following fields mu sync.Mutex // guards all following fields
muCond *sync.Cond
// canCreateEPUnlocked tracks at one place whether mu is // canCreateEPUnlocked tracks at one place whether mu is
// already held. It's then checked in CreateEndpoint to avoid // already held. It's then checked in CreateEndpoint to avoid
@ -137,7 +138,6 @@ type Conn struct {
started bool // Start was called started bool // Start was called
closed bool // Close was called closed bool // Close was called
endpointsUpdateWaiter *sync.Cond
endpointsUpdateActive bool endpointsUpdateActive bool
wantEndpointsUpdate string // true if non-empty; string is reason wantEndpointsUpdate string // true if non-empty; string is reason
lastEndpoints []string lastEndpoints []string
@ -323,7 +323,7 @@ func newConn() *Conn {
sharedDiscoKey: make(map[tailcfg.DiscoKey]*[32]byte), sharedDiscoKey: make(map[tailcfg.DiscoKey]*[32]byte),
discoOfAddr: make(map[netaddr.IPPort]tailcfg.DiscoKey), discoOfAddr: make(map[netaddr.IPPort]tailcfg.DiscoKey),
} }
c.endpointsUpdateWaiter = sync.NewCond(&c.mu) c.muCond = sync.NewCond(&c.mu)
return c return c
} }
@ -395,7 +395,7 @@ func (c *Conn) updateEndpoints(why string) {
go c.updateEndpoints(why) go c.updateEndpoints(why)
} else { } else {
c.endpointsUpdateActive = false c.endpointsUpdateActive = false
c.endpointsUpdateWaiter.Broadcast() c.muCond.Broadcast()
} }
}() }()
@ -1079,6 +1079,7 @@ func (c *Conn) derpWriteChanOfAddr(addr netaddr.IPPort, peer key.Public) chan<-
go func() { go func() {
dc.Connect(ctx) dc.Connect(ctx)
close(c.derpStarted) close(c.derpStarted)
c.muCond.Broadcast()
}() }()
} }
@ -1825,7 +1826,9 @@ func (c *Conn) SetPrivateKey(privateKey wgcfg.PrivateKey) error {
if oldKey.IsZero() { if oldKey.IsZero() {
c.everHadKey = true c.everHadKey = true
c.logf("magicsock: SetPrivateKey called (init)") c.logf("magicsock: SetPrivateKey called (init)")
go c.ReSTUN("set-private-key") if c.started {
go c.ReSTUN("set-private-key")
}
} else if newKey.IsZero() { } else if newKey.IsZero() {
c.logf("magicsock: SetPrivateKey called (zeroed)") c.logf("magicsock: SetPrivateKey called (zeroed)")
c.closeAllDerpLocked("zero-private-key") c.closeAllDerpLocked("zero-private-key")
@ -2087,6 +2090,21 @@ func (c *Conn) Close() error {
c.pconn6.Close() c.pconn6.Close()
} }
err := c.pconn4.Close() err := c.pconn4.Close()
// Wait on goroutines updating right at the end, once everything is
// already closed. We want everything else in the Conn to be
// consistently in the closed state before we release mu to wait
// on the endpoint updater & derphttp.Connect.
for c.goroutinesRunningLocked() {
c.muCond.Wait()
}
return err
}
func (c *Conn) goroutinesRunningLocked() bool {
if c.endpointsUpdateActive {
return true
}
// The goroutine running dc.Connect in derpWriteChanOfAddr may linger // The goroutine running dc.Connect in derpWriteChanOfAddr may linger
// and appear to leak, as observed in https://github.com/tailscale/tailscale/issues/554. // and appear to leak, as observed in https://github.com/tailscale/tailscale/issues/554.
// This is despite the underlying context being cancelled by connCtxCancel above. // This is despite the underlying context being cancelled by connCtxCancel above.
@ -2096,16 +2114,14 @@ func (c *Conn) Close() error {
// on the first run, it sets firstDerp := true and spawns the aforementioned goroutine. // on the first run, it sets firstDerp := true and spawns the aforementioned goroutine.
// To detect this, we check activeDerp, which is initialized to non-nil on the first run. // To detect this, we check activeDerp, which is initialized to non-nil on the first run.
if c.activeDerp != nil { if c.activeDerp != nil {
<-c.derpStarted select {
case <-c.derpStarted:
break
default:
return true
}
} }
// Wait on endpoints updating right at the end, once everything is return false
// already closed. We want everything else in the Conn to be
// consistently in the closed state before we release mu to wait
// on the endpoint updater.
for c.endpointsUpdateActive {
c.endpointsUpdateWaiter.Wait()
}
return err
} }
func maxIdleBeforeSTUNShutdown() time.Duration { func maxIdleBeforeSTUNShutdown() time.Duration {

View File

@ -807,13 +807,17 @@ func testActiveDiscovery(t *testing.T, d *devices) {
// from DERP. // from DERP.
mustDirect := func(m1, m2 *magicStack) { mustDirect := func(m1, m2 *magicStack) {
lastLog := time.Now().Add(-time.Minute)
for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
pst := m1.Status().Peer[m2.Public()] pst := m1.Status().Peer[m2.Public()]
if pst.CurAddr != "" { if pst.CurAddr != "" {
logf("direct link %s->%s found with addr %s", m1, m2, pst.CurAddr) logf("direct link %s->%s found with addr %s", m1, m2, pst.CurAddr)
return return
} }
logf("no direct path %s->%s yet, addrs %v", m1, m2, pst.Addrs) if now := time.Now(); now.Sub(lastLog) > time.Second {
logf("no direct path %s->%s yet, addrs %v", m1, m2, pst.Addrs)
lastLog = now
}
} }
t.Errorf("magicsock did not find a direct path from %s to %s", m1, m2) t.Errorf("magicsock did not find a direct path from %s to %s", m1, m2)
} }