wgengine/magicsock: clean up, add, improve DERP logs

This commit is contained in:
Brad Fitzpatrick 2020-03-23 14:12:23 -07:00
parent e749377a56
commit c473927558

View File

@ -7,6 +7,7 @@
package magicsock
import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
@ -17,6 +18,7 @@
"math/rand"
"net"
"os"
"sort"
"strconv"
"strings"
"sync"
@ -104,7 +106,19 @@ type Conn struct {
activeDerp map[int]activeDerp
prevDerp map[int]*syncs.WaitGroupChan
derpTLSConfig *tls.Config // normally nil; used by tests
derpRoute map[key.Public]derpRoute
// derpRoute contains optional alternate routes to use as an
// optimization instead of contacting a peer via their home
// DERP connection. If they sent us a message on a different
// DERP connection (which should really only be on our DERP
// home connection, or what was once our home), then we
// remember that route here to optimistically use instead of
// creating a new DERP connection back to their home.
derpRoute map[key.Public]derpRoute // TODO: clean up this map sometime?
// peerLastDerp tracks which DERP node we last used to speak with a
// peer. It's only used to quiet logging, so we only log on change.
peerLastDerp map[key.Public]int // TODO: clean up this map sometime?
}
// derpRoute is a route entry for a public key, saying that a certain
@ -135,7 +149,8 @@ func (c *Conn) addDerpPeerRoute(peer key.Public, derpID int, dc *derphttp.Client
if c.derpRoute == nil {
c.derpRoute = make(map[key.Public]derpRoute)
}
c.derpRoute[peer] = derpRoute{derpID, dc}
r := derpRoute{derpID, dc}
c.derpRoute[peer] = r
}
// DerpMagicIP is a fake WireGuard endpoint IP address that means
@ -149,10 +164,14 @@ func (c *Conn) addDerpPeerRoute(peer key.Public, derpID int, dc *derphttp.Client
// activeDerp contains fields for an active DERP connection.
type activeDerp struct {
c *derphttp.Client
cancel context.CancelFunc
writeCh chan<- derpWriteRequest
lastWrite *time.Time
c *derphttp.Client
cancel context.CancelFunc
writeCh chan<- derpWriteRequest
// lastWrite is the time of the last request for its write
// channel (currently even if there was no write).
// It is always non-nil and initialized to a non-zero Time[
lastWrite *time.Time
createTime time.Time
}
// udpAddr is the key in the addrsByUDP map.
@ -218,6 +237,7 @@ func Listen(opts Options) (*Conn, error) {
udpRecvCh: make(chan udpReadResult),
derpTLSConfig: opts.derpTLSConfig,
derps: opts.DERPs,
peerLastDerp: make(map[key.Public]int),
}
if err := c.initialBind(); err != nil {
@ -275,11 +295,11 @@ func (c *Conn) updateEndpoints(why string) {
}
}()
c.logf("magicsock.Conn: starting endpoint update (%s)", why)
c.logf("magicsock: starting endpoint update (%s)", why)
endpoints, err := c.determineEndpoints(c.connCtx)
if err != nil {
c.logf("magicsock.Conn: endpoint update (%s) failed: %v", why, err)
c.logf("magicsock: endpoint update (%s) failed: %v", why, err)
// TODO(crawshaw): are there any conditions under which
// we should trigger a retry based on the error here?
return
@ -422,7 +442,7 @@ func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
// On change, notify all currently connected DERP servers and
// start connecting to our home DERP if we are not already.
c.myDerp = derpNum
c.logf("home DERP server is now %v, %v", derpNum, c.derps.ServerByID(derpNum))
c.logf("magicsock: home DERP server is now %v (%v)", derpNum, c.derps.ServerByID(derpNum))
for i, ad := range c.activeDerp {
go ad.c.NotePreferred(i == c.myDerp)
}
@ -747,6 +767,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de
ad, ok := c.activeDerp[nodeID]
if ok {
*ad.lastWrite = time.Now()
c.setPeerLastDerpLocked(peer, nodeID, nodeID)
return ad.writeCh
}
@ -759,12 +780,15 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de
if !peer.IsZero() && debugUseDerpRoute {
if r, ok := c.derpRoute[peer]; ok {
if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc {
c.setPeerLastDerpLocked(peer, r.derpID, nodeID)
*ad.lastWrite = time.Now()
return ad.writeCh
}
}
}
c.logf("magicsock: adding connection to derp%v", nodeID)
if c.activeDerp == nil {
c.activeDerp = make(map[int]activeDerp)
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
@ -778,7 +802,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de
// so it is safe to do under the mu lock.
dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf)
if err != nil {
c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err)
c.logf("derphttp.NewClient: node %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err)
return nil
}
@ -793,7 +817,11 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de
ad.writeCh = ch
ad.cancel = cancel
ad.lastWrite = new(time.Time)
*ad.lastWrite = time.Now()
ad.createTime = time.Now()
c.activeDerp[nodeID] = ad
c.logActiveDerpLocked()
c.setPeerLastDerpLocked(peer, nodeID, nodeID)
// Build a startGate for the derp reader+writer
// goroutines, so they don't start running until any
@ -810,10 +838,24 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de
go c.runDerpReader(ctx, addr, dc, wg, startGate)
go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate)
*ad.lastWrite = time.Now()
return ad.writeCh
}
// c.mu must be held.
func (c *Conn) setPeerLastDerpLocked(peer key.Public, nodeID, homeID int) {
if peer.IsZero() {
return
}
old := c.peerLastDerp[peer]
if old == nodeID {
return
}
c.peerLastDerp[peer] = nodeID
pubKey := wgcfg.Key(peer)
c.logf("magicsock: derp route for %x (home derp %v) changed from derp %d => %d", pubKey.ShortString(), homeID, old, nodeID)
}
// derpReadResult is the type sent by runDerpClient to ReceiveIPv4
// when a DERP packet is available.
//
@ -1122,8 +1164,8 @@ func (c *Conn) SetPrivateKey(privateKey wgcfg.PrivateKey) error {
// Key changed. Close existing DERP connections and reconnect to home.
myDerp := c.myDerp
c.myDerp = 0
c.logf("magicsock private key set, rebooting connection to home DERP %d", myDerp)
c.closeAllDerpLocked()
c.logf("magicsock: private key set, rebooting connection to home DERP %d", myDerp)
c.closeAllDerpLocked("new-private-key")
go c.setNearestDERP(myDerp)
return nil
@ -1137,41 +1179,87 @@ func (c *Conn) SetDERPEnabled(wantDerp bool) {
c.wantDerp = wantDerp
if !wantDerp {
c.closeAllDerpLocked()
c.closeAllDerpLocked("derp-disabled")
}
}
// c.mu must be held.
func (c *Conn) closeAllDerpLocked() {
func (c *Conn) closeAllDerpLocked(why string) {
if len(c.activeDerp) == 0 {
return // without the useless log statement
}
for i := range c.activeDerp {
c.closeDerpLocked(i)
c.closeDerpLocked(i, why)
}
c.logActiveDerpLocked()
}
// c.mu must be held.
func (c *Conn) closeDerpLocked(node int) {
// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes.
func (c *Conn) closeDerpLocked(node int, why string) {
if ad, ok := c.activeDerp[node]; ok {
c.logf("closing connection to derp%v", node)
c.logf("magicsock: closing connection to derp%v (%v), age %v", node, why, time.Since(ad.createTime).Round(time.Second))
go ad.c.Close()
ad.cancel()
delete(c.activeDerp, node)
}
}
var bufPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
// c.mu must be held.
func (c *Conn) logActiveDerpLocked() {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
now := time.Now()
buf.Reset()
buf.WriteString(": ")
c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) {
fmt.Fprintf(buf, "derp%d=cr%v,wr%v ", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite)))
})
var details []byte
if buf.Len() > len(": ") {
details = bytes.TrimSpace(buf.Bytes())
}
c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), details)
}
// c.mu must be held.
func (c *Conn) foreachActiveDerpSortedLocked(fn func(nodeID int, ad activeDerp)) {
if len(c.activeDerp) < 2 {
for id, ad := range c.activeDerp {
fn(id, ad)
}
return
}
ids := make([]int, 0, len(c.activeDerp))
for id := range c.activeDerp {
ids = append(ids, id)
}
sort.Ints(ids)
for _, id := range ids {
fn(id, c.activeDerp[id])
}
}
func (c *Conn) cleanStaleDerp() {
c.mu.Lock()
defer c.mu.Unlock()
const inactivityTime = 60 * time.Second
tooOld := time.Now().Add(-inactivityTime)
dirty := false
for i, ad := range c.activeDerp {
if i == c.myDerp {
continue
}
if ad.lastWrite.Before(tooOld) {
c.logf("closing stale DERP connection to derp%v", i)
c.closeDerpLocked(i)
c.closeDerpLocked(i, "idle")
dirty = true
}
}
if dirty {
c.logActiveDerpLocked()
}
}
// DERPs reports the number of active DERP connections.
@ -1198,7 +1286,7 @@ func (c *Conn) Close() error {
c.closed = true
c.connCtxCancel()
c.closeAllDerpLocked()
c.closeAllDerpLocked("conn-close")
if c.pconn6 != nil {
c.pconn6.Close()
}
@ -1687,3 +1775,14 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
return n, err
}
}
// simpleDur rounds d such that it stringifies to something short.
func simpleDur(d time.Duration) time.Duration {
if d < time.Second {
return d.Round(time.Millisecond)
}
if d < time.Minute {
return d.Round(time.Second)
}
return d.Round(time.Minute)
}