diff --git a/disco/disco.go b/disco/disco.go index 01aafda99..8fa63cfae 100644 --- a/disco/disco.go +++ b/disco/disco.go @@ -157,9 +157,9 @@ func parsePong(ver uint8, p []byte) (m *Pong, err error) { func MessageSummary(m Message) string { switch m := m.(type) { case *Ping: - return fmt.Sprintf("ping tx=%x", m.TxID) + return fmt.Sprintf("ping tx=%x", m.TxID[:6]) case *Pong: - return fmt.Sprintf("pong tx=%x", m.TxID) + return fmt.Sprintf("pong tx=%x", m.TxID[:6]) case CallMeMaybe: return "call-me-maybe" default: diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index c09c82941..110099879 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -14,6 +14,7 @@ "errors" "fmt" "hash/fnv" + "math" "math/rand" "net" "os" @@ -1411,7 +1412,7 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey) sent, err = c.sendAddr(dst, dstKey, pkt) if sent { - c.logf("magicsock: disco: %v->%v (%v, %v): %v", c.discoShort, dstDisco.ShortString(), dstKey.ShortString(), derpStr(dst.String()), disco.MessageSummary(m)) + c.logf("magicsock: disco: %v->%v (%v, %v) sent %v", c.discoShort, dstDisco.ShortString(), dstKey.ShortString(), derpStr(dst.String()), disco.MessageSummary(m)) } else if err == nil { // Can't send. (e.g. no IPv6 locally) } else { @@ -1511,6 +1512,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool { c.logf("[unexpected] CallMeMaybe packets should only come via DERP") return true } + c.logf("magicsock: disco: %v<-%v (%v, %v) got call-me-maybe", c.discoShort, de.discoShort, de.publicKey.ShortString(), derpStr(src.String())) go de.handleCallMeMaybe() } @@ -1518,7 +1520,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool { } func (c *Conn) handlePingLocked(dm *disco.Ping, de *discoEndpoint, src netaddr.IPPort) { - c.logf("magicsock: disco: %v<-%v (%v, %v) ping tx=%x", c.discoShort, de.discoKey.ShortString(), de.publicKey.ShortString(), src, dm.TxID) + c.logf("magicsock: disco: %v<-%v (%v, %v) got ping tx=%x", c.discoShort, de.discoShort, de.publicKey.ShortString(), src, dm.TxID[:6]) // Remember this this route if not present. c.setAddrToDiscoLocked(src, de.discoKey, nil) @@ -2568,6 +2570,15 @@ func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) { c.mu.Lock() defer c.mu.Unlock() + for dk, de := range c.endpointOfDisco { + ps := &ipnstate.PeerStatus{InMagicSock: true} + if node, ok := c.nodeOfDisco[dk]; ok { + ps.Addrs = append(ps.Addrs, node.Endpoints...) + } + de.populatePeerStatus(ps) + sb.AddPeer(de.publicKey, ps) + } + // Old-style (pre-disco) peers: for k, as := range c.addrsByKey { ps := &ipnstate.PeerStatus{ InMagicSock: true, @@ -2629,10 +2640,41 @@ type discoEndpoint struct { timers map[*time.Timer]bool } +const ( + // discoPingInterval is the minimum time between pings + // to an endpoint. (Except in the case of CallMeMaybe frames + // resetting the counter, as the first pings likely didn't through + // the firewall) + discoPingInterval = 5 * time.Second + + // pingTimeoutDuration is how long we wait for a pong reply before + // assuming it's never coming. + pingTimeoutDuration = 5 * time.Second + + // trustUDPAddrDuration is how long we trust a UDP address as the exclusive + // path (without using DERP) without having heard a Pong reply. + trustUDPAddrDuration = 5 * time.Second +) + +// endpointState is some state and history for a specific endpoint of +// a discoEndpoint. (The subject is the discoEndpoint.endpointState +// map key) type endpointState struct { - lastPing time.Time - // TODO: lastPong time.Time - index int // index in nodecfg.Node.Endpoints + // all fields guarded by discoEndpoint.mu: + lastPing time.Time + recentPongs []pongReply // ring buffer up to pongHistoryCount entries + recentPong uint16 // index into recentPongs of most recent; older , wrapped + index int16 // index in nodecfg.Node.Endpoints +} + +// pongHistoryCount is how many pongReply values we keep per endpointState +const pongHistoryCount = 64 + +type pongReply struct { + latency time.Duration + pongAt time.Time // when we received the pong + from netaddr.IPPort // the pong's src (usually same as endpoint map key) + pongSrc netaddr.IPPort // what they reported they heard } type sentPing struct { @@ -2683,35 +2725,44 @@ func (de *discoEndpoint) UpdateDst(addr *net.UDPAddr) error { return nil } +// addrForSendLocked returns the address(es) that should be used for +// sending the next packet. Zero, one, or both of UDP address and DERP +// addr may be non-zero. +// +// de.mu must be held. +func (de *discoEndpoint) addrForSendLocked(now time.Time) (udpAddr, derpAddr netaddr.IPPort) { + udpAddr = de.bestAddr + if udpAddr.IsZero() || now.After(de.trustBestAddrUntil) { + // We had a bestAddr but it expired so send both to it + // and DERP. + derpAddr = de.derpAddr + } + return +} + func (de *discoEndpoint) send(b []byte) error { now := time.Now() de.mu.Lock() de.lastSend = now - derpAddr := de.derpAddr - haveDerp := !derpAddr.IsZero() - bestAddr := de.bestAddr - bestOld := now.After(de.trustBestAddrUntil) - if bestAddr.IsZero() || bestOld { + udpAddr, derpAddr := de.addrForSendLocked(now) + if udpAddr.IsZero() || now.After(de.trustBestAddrUntil) { de.sendPingsLocked(now, true) } de.mu.Unlock() - var didDerp bool - if bestAddr.IsZero() { - if !haveDerp { - return errors.New("no DERP addr") - } - bestAddr = derpAddr - } else if bestOld && haveDerp { - // We have a bestAddr, but it hasn't been confirmed in a while, - // so let's not entirely trust it and also send via DERP. - didDerp, _ = de.c.sendAddr(derpAddr, de.publicKey, b) + if udpAddr.IsZero() && derpAddr.IsZero() { + return errors.New("no UDP or DERP addr") } - - _, err := de.c.sendAddr(bestAddr, de.publicKey, b) - if didDerp { - return nil + var err error + if !udpAddr.IsZero() { + _, err = de.c.sendAddr(udpAddr, de.publicKey, b) + } + if !derpAddr.IsZero() { + if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok && err != nil { + // UDP failed but DERP worked, so good enough: + return nil + } } return err } @@ -2765,14 +2816,14 @@ func (de *discoEndpoint) sendPingsLocked(now time.Time, sendCallMeMaybe bool) { var sentAny bool for ep, st := range de.endpointState { ep := ep - if !st.lastPing.IsZero() && now.Sub(st.lastPing) < 5*time.Second { + if !st.lastPing.IsZero() && now.Sub(st.lastPing) < discoPingInterval { continue } st.lastPing = now txid := stun.NewTxID() - t := de.newTimerLocked(5*time.Second, func() { - de.c.logf("magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid, ep, de.publicKey.ShortString(), de.discoShort) + t := de.newTimerLocked(pingTimeoutDuration, func() { + de.c.logf("magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], ep, de.publicKey.ShortString(), de.discoShort) de.forgetPing(txid) }) de.sentPing[txid] = sentPing{ @@ -2824,15 +2875,19 @@ func (de *discoEndpoint) updateFromNode(n *tailcfg.Node) { st.index = -1 // assume deleted until updated in next loop } for i, epStr := range n.Endpoints { + if i > math.MaxInt16 { + // Seems unlikely. + continue + } ipp, err := netaddr.ParseIPPort(epStr) if err != nil { de.c.logf("magicsock: bogus netmap endpoint %q", epStr) continue } if st, ok := de.endpointState[ipp]; ok { - st.index = i + st.index = int16(i) } else { - de.endpointState[ipp] = &endpointState{index: i} + de.endpointState[ipp] = &endpointState{index: int16(i)} } } // Now delete anything that wasn't updated. @@ -2878,38 +2933,60 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort) } de.removeSentPingLocked(m.TxID, sp) + st, ok := de.endpointState[sp.to] + if !ok { + // This is no longer an endpoint we care about. + return + } + de.c.setAddrToDiscoLocked(src, de.discoKey, de) now := time.Now() - delay := now.Sub(sp.at) + latency := now.Sub(sp.at) - de.c.logf("magicsock: disco: %v<-%v (%v, %v) pong tx=%x latency=%v pong.src=%v%v", de.c.discoShort, de.discoShort, de.publicKey.ShortString(), src, m.TxID, delay.Round(time.Millisecond), m.Src, logger.ArgWriter(func(bw *bufio.Writer) { + st.addPongReplyLocked(pongReply{ + latency: latency, + pongAt: now, + from: src, + pongSrc: m.Src, + }) + + de.c.logf("magicsock: disco: %v<-%v (%v, %v) got pong tx=%x latency=%v pong.src=%v%v", de.c.discoShort, de.discoShort, de.publicKey.ShortString(), src, m.TxID[:6], latency.Round(time.Millisecond), m.Src, logger.ArgWriter(func(bw *bufio.Writer) { if sp.to != src { fmt.Fprintf(bw, " ping.to=%v", sp.to) } })) - // Expire our best address if we haven't heard from it in awhile. - tooOld := now.Add(-15 * time.Second) - if !de.bestAddr.IsZero() && de.bestAddrAt.Before(tooOld) { - de.bestAddr = netaddr.IPPort{} - } - // Promote this pong response to our current best address if it's lower latency. // TODO(bradfitz): decide how latency vs. preference order affects decision - if de.bestAddr.IsZero() || delay < de.bestAddrLatency { + if de.bestAddr.IsZero() || latency < de.bestAddrLatency { if de.bestAddr != sp.to { de.c.logf("magicsock: disco: node %v %v now using %v", de.publicKey.ShortString(), de.discoShort, sp.to) de.bestAddr = sp.to } } if de.bestAddr == sp.to { - de.bestAddrLatency = delay + de.bestAddrLatency = latency de.bestAddrAt = now - de.trustBestAddrUntil = now.Add(5 * time.Second) + de.trustBestAddrUntil = now.Add(trustUDPAddrDuration) } } +// discoEndpoint.mu must be held. +func (st *endpointState) addPongReplyLocked(r pongReply) { + if n := len(st.recentPongs); n < pongHistoryCount { + st.recentPong = uint16(n) + st.recentPongs = append(st.recentPongs, r) + return + } + i := st.recentPong + 1 + if i == pongHistoryCount { + i = 0 + } + st.recentPongs[i] = r + st.recentPong = i +} + // handleCallMeMaybe handles a CallMeMaybe discovery message via // DERP. The contract for use of this message is that the peer has // already sent to us via UDP, so their stateful firewall should be @@ -2926,6 +3003,20 @@ func (de *discoEndpoint) handleCallMeMaybe() { de.sendPingsLocked(time.Now(), false) } +func (de *discoEndpoint) populatePeerStatus(ps *ipnstate.PeerStatus) { + de.mu.Lock() + defer de.mu.Unlock() + + if de.lastSend.IsZero() { + return + } + + now := time.Now() + if udpAddr, _ := de.addrForSendLocked(now); !udpAddr.IsZero() { + ps.CurAddr = udpAddr.String() + } +} + // cleanup is called when a discovery endpoint is no longer present in the NetworkMap. // This is where we can do cleanup such as closing goroutines or canceling timers. func (de *discoEndpoint) cleanup() {