wgengine/magicsock: add timeout on discovery pings, clean up state

Updates #483
This commit is contained in:
Brad Fitzpatrick 2020-07-01 14:39:21 -07:00
parent 77d3ef36f4
commit 710ee88e94

View File

@ -771,7 +771,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
c.logf("magicsock: [unexpected] DERP BUG: attempting to send packet to DERP address %v", addr)
return nil
}
return c.sendUDPStd(addr, b)
_, err := c.sendUDPStd(addr, b)
return err
case *AddrSet:
as = v
}
@ -786,8 +787,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
var success bool
var ret error
for _, addr := range dsts {
err := c.sendAddr(addr, as.publicKey, b)
if err == nil {
sent, err := c.sendAddr(addr, as.publicKey, b)
if sent {
success = true
} else if ret == nil {
ret = err
@ -809,45 +810,55 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
var errDropDerpPacket = errors.New("too many DERP packets queued; dropping")
// sendUDP sends UDP packet b to ipp.
func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) error {
// See sendAddr's docs on the return value meanings.
func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) (sent bool, err error) {
ua := ipp.UDPAddr()
defer netaddr.PutUDPAddr(ua)
return c.sendUDPStd(ua, b)
}
func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (err error) {
// sendUDP sends UDP packet b to addr.
// See sendAddr's docs on the return value meanings.
func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (sent bool, err error) {
switch {
case addr.IP.To4() != nil:
_, err = c.pconn4.WriteTo(b, addr)
if err != nil && c.noV4.Get() {
return nil
return false, nil
}
case len(addr.IP) == net.IPv6len:
if c.pconn6 == nil {
// ignore IPv6 dest if we don't have an IPv6 address.
return nil
return false, nil
}
_, err = c.pconn6.WriteTo(b, addr)
if err != nil && c.noV6.Get() {
return nil
return false, nil
}
default:
return errors.New("bogus sendUDPStd addr type")
panic("bogus sendUDPStd addr type")
}
return err
return err == nil, err
}
// sendAddr sends packet b to addr, which is either a real UDP address
// or a fake UDP address representing a DERP server (see derpmap.go).
// The provided public key identifies the recipient.
func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) error {
//
// The returned err is whether there was an error writing when it
// should've worked.
// The returned sent is whether a packet went out at all.
// An example of when they might be different: sending to an
// IPv6 address when the local machine doesn't have IPv6 support
// returns (false, nil); it's not an error, but nothing was sent.
func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) (sent bool, err error) {
if addr.IP != derpMagicIPAddr {
return c.sendUDP(addr, b)
}
ch := c.derpWriteChanOfAddr(addr, pubKey)
if ch == nil {
return nil
return false, nil
}
// TODO(bradfitz): this makes garbage for now; we could use a
@ -860,12 +871,12 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) error
select {
case <-c.donec():
return errConnClosed
return false, errConnClosed
case ch <- derpWriteRequest{addr, pubKey, pkt}:
return nil
return true, nil
default:
// Too many writes queued. Drop packet.
return errDropDerpPacket
return false, errDropDerpPacket
}
}
@ -1372,7 +1383,7 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) {
}
}
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco tailcfg.DiscoKey, m disco.Message) error {
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco tailcfg.DiscoKey, m disco.Message) (sent bool, err error) {
c.mu.Lock()
var nonce [disco.NonceLen]byte
if _, err := crand.Read(nonce[:]); err != nil {
@ -1386,9 +1397,15 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco
c.mu.Unlock()
pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey)
err := c.sendAddr(dst, dstKey, pkt)
c.logf("magicsock: disco: sent %T to %v; err=%v", m, dst, err)
return err
sent, err = c.sendAddr(dst, dstKey, pkt)
if sent {
c.logf("magicsock: disco: sent %T to %v", m, dst)
} else if err == nil {
c.logf("magicsock: disco: can't send %T to %v", m, dst)
} else {
c.logf("magicsock: disco: failed to send %T to %v: %v", m, dst, err)
}
return sent, err
}
// handleDiscoMessage reports whether msg was a Tailscale inter-node discovery message
@ -2528,7 +2545,7 @@ type discoEndpoint struct {
// mu protects all following fields.
mu sync.Mutex // Lock ordering: Conn.mu, then discoEndpoint.mu
lastSend time.Time
lastSend time.Time // last time there was outgoing packets sent to this peer (from wireguard-go)
derpAddr netaddr.IPPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients)
bestAddr netaddr.IPPort // best non-DERP path; zero if none
@ -2537,6 +2554,9 @@ type discoEndpoint struct {
sentPing map[stun.TxID]sentPing
endpointState map[netaddr.IPPort]*endpointState
// timers are all outstanding timers. They're all stopped on
// cleanup if the discovery endpoint is removed from the
// network map.
timers map[*time.Timer]bool
}
@ -2549,6 +2569,7 @@ type endpointState struct {
type sentPing struct {
to netaddr.IPPort
at time.Time
timer *time.Timer // timeout timer
}
// initFakeUDPAddr populates fakeWGAddr with a globally unique fake UDPAddr.
@ -2611,24 +2632,72 @@ func (de *discoEndpoint) send(b []byte) error {
return errors.New("no DERP addr")
}
}
return de.c.sendAddr(bestAddr, de.publicKey, b)
_, err := de.c.sendAddr(bestAddr, de.publicKey, b)
return err
}
// newTimerLocked creates a new AfterFunc(d, f) and returns it after
// registering it with de.timers.
//
// The timer will unregister itself from de.timers after it fires and after f runs.
func (de *discoEndpoint) newTimerLocked(d time.Duration, f func()) *time.Timer {
var t *time.Timer
t = time.AfterFunc(d, func() {
f()
de.mu.Lock()
delete(de.timers, t)
de.mu.Unlock()
})
de.timers[t] = true
return t
}
// forgetPing is called by a timer when a ping either fails to send or
// has taken too long to get a pong reply.
func (de *discoEndpoint) forgetPing(txid stun.TxID) {
de.mu.Lock()
defer de.mu.Unlock()
if sp, ok := de.sentPing[txid]; ok {
// Stop the timer for the case where sendPing failed to write to UDP.
// In the case of a timer already having fired, this is a no-op:
sp.timer.Stop()
delete(de.sentPing, txid)
delete(de.timers, sp.timer)
}
}
// sendPing sends a ping with the provided txid to ep.
// The caller should've already been recorded the ping in sentPing
// and set up the timer.
func (de *discoEndpoint) sendPing(ep netaddr.IPPort, txid stun.TxID) {
sent, _ := de.sendDiscoMessage(ep, &disco.Ping{TxID: [12]byte(txid)})
if !sent {
de.forgetPing(txid)
}
}
func (de *discoEndpoint) sendPingsLocked(now time.Time) {
sent := false
for ep, st := range de.endpointState {
ep := ep
if !st.lastPing.IsZero() && now.Sub(st.lastPing) < 5*time.Second {
continue
}
st.lastPing = now
txid := stun.NewTxID()
t := de.newTimerLocked(5*time.Second, func() {
de.c.logf("magicsock: disco: timeout waiting for ping %x from %v", txid, ep)
de.forgetPing(txid)
})
de.sentPing[txid] = sentPing{
to: ep,
at: now,
timer: t,
}
sent = true
go de.sendDiscoMessage(ep, &disco.Ping{TxID: [12]byte(txid)})
go de.sendPing(ep, txid)
}
derpAddr := de.derpAddr
if sent && derpAddr.Port != 0 {
@ -2642,7 +2711,7 @@ func (de *discoEndpoint) sendPingsLocked(now time.Time) {
}
}
func (de *discoEndpoint) sendDiscoMessage(dst netaddr.IPPort, dm disco.Message) error {
func (de *discoEndpoint) sendDiscoMessage(dst netaddr.IPPort, dm disco.Message) (sent bool, err error) {
return de.c.sendDiscoMessage(dst, de.publicKey, de.discoKey, dm)
}