mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-02 18:11:59 +00:00
wgengine/magicsock: hook up discovery messages, upgrade to LAN works
Ping messages now go out somewhat regularly, pong replies are sent, and pong replies are now partially handled enough to upgrade off DERP to LAN. CallMeMaybe packets are sent & received over DERP, but aren't yet handled. That's next (and regular maintenance timers), and then WAN should work. Updates #483
This commit is contained in:
@@ -9,6 +9,7 @@ package magicsock
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -93,6 +94,7 @@ type Conn struct {
|
||||
peerSet map[key.Public]struct{}
|
||||
|
||||
discoPrivate key.Private
|
||||
discoPublic tailcfg.DiscoKey // public of discoPrivate
|
||||
nodeOfDisco map[tailcfg.DiscoKey]*tailcfg.Node
|
||||
discoOfNode map[tailcfg.NodeKey]tailcfg.DiscoKey
|
||||
|
||||
@@ -511,7 +513,8 @@ func (c *Conn) SetDiscoPrivateKey(k key.Private) {
|
||||
panic("unsupported")
|
||||
}
|
||||
c.discoPrivate = k
|
||||
c.logf("magicsock: disco key set; public: %x", k.Public())
|
||||
c.discoPublic = tailcfg.DiscoKey(k.Public())
|
||||
c.logf("magicsock: disco key set; public: %x", c.discoPublic)
|
||||
}
|
||||
|
||||
// c.mu must NOT be held.
|
||||
@@ -658,7 +661,9 @@ func shouldSprayPacket(b []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
var logPacketDests, _ = strconv.ParseBool(os.Getenv("DEBUG_LOG_PACKET_DESTS"))
|
||||
var logPacketDests, _ = strconv.ParseBool(os.Getenv("TS_DEBUG_LOG_PACKET_DESTS"))
|
||||
|
||||
var logDisco, _ = strconv.ParseBool(os.Getenv("TS_DEBUG_DISCO"))
|
||||
|
||||
const sprayPeriod = 3 * time.Second
|
||||
|
||||
@@ -1367,6 +1372,25 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco tailcfg.DiscoKey, m disco.Message) error {
|
||||
c.mu.Lock()
|
||||
var nonce [disco.NonceLen]byte
|
||||
if _, err := crand.Read(nonce[:]); err != nil {
|
||||
panic(err) // worth dying for
|
||||
}
|
||||
pkt := make([]byte, 0, 512) // TODO: size it correctly? pool? if it matters.
|
||||
pkt = append(pkt, disco.Magic...)
|
||||
pkt = append(pkt, c.discoPublic[:]...)
|
||||
pkt = append(pkt, nonce[:]...)
|
||||
sharedKey := c.sharedDiscoKeyLocked(dstDisco)
|
||||
c.mu.Unlock()
|
||||
|
||||
pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey)
|
||||
err := c.sendAddr(dst, dstKey, pkt)
|
||||
c.logf("magicsock: disco: sent %T to %v; err=%v", m, dst, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// handleDiscoMessage reports whether msg was a Tailscale inter-node discovery message
|
||||
// that was handled.
|
||||
//
|
||||
@@ -1380,19 +1404,23 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) {
|
||||
// For messages received over DERP, the addr will be derpMagicIP (with
|
||||
// port being the region)
|
||||
func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
|
||||
const magic = "TS💬"
|
||||
const nonceLen = 24
|
||||
const headerLen = len(magic) + len(tailcfg.DiscoKey{}) + nonceLen
|
||||
if len(msg) < headerLen || string(msg[:len(magic)]) != magic {
|
||||
const headerLen = len(disco.Magic) + len(tailcfg.DiscoKey{}) + disco.NonceLen
|
||||
if len(msg) < headerLen || string(msg[:len(disco.Magic)]) != disco.Magic {
|
||||
return false
|
||||
}
|
||||
var sender tailcfg.DiscoKey
|
||||
copy(sender[:], msg[len(magic):])
|
||||
copy(sender[:], msg[len(disco.Magic):])
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if logDisco {
|
||||
c.logf("magicsock: disco: got disco-looking frame %v", sender)
|
||||
}
|
||||
if c.discoPrivate.IsZero() {
|
||||
if logDisco {
|
||||
c.logf("magicsock: disco: ignoring disco-looking frame, no local key")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1400,14 +1428,17 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
|
||||
if !ok {
|
||||
// Returning false keeps passing it down, to WireGuard.
|
||||
// WireGuard will almost surely reject it, but give it a chance.
|
||||
if logDisco {
|
||||
c.logf("magicsock: disco: ignoring disco-looking frame, don't know about %v", sender)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// First, do we even know (and thus care) about this sender? If not,
|
||||
// don't bother decrypting it.
|
||||
|
||||
var nonce [nonceLen]byte
|
||||
copy(nonce[:], msg[len(magic)+len(key.Public{}):])
|
||||
var nonce [disco.NonceLen]byte
|
||||
copy(nonce[:], msg[len(disco.Magic)+len(key.Public{}):])
|
||||
sealedBox := msg[headerLen:]
|
||||
payload, ok := box.OpenAfterPrecomputation(nil, sealedBox, &nonce, c.sharedDiscoKeyLocked(sender))
|
||||
if !ok {
|
||||
@@ -1416,14 +1447,20 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
|
||||
// and old packets might've still been in flight (or
|
||||
// scheduled). This is particularly the case for LANs
|
||||
// or non-NATed endpoints.
|
||||
// Not worth logging. Pass on to wireguard, in case
|
||||
// Don't log in normal case. Pass on to wireguard, in case
|
||||
// it's actually a a wireguard packet (super unlikely,
|
||||
// but).
|
||||
if logDisco {
|
||||
c.logf("magicsock: disco: failed to open naclbox from %v (wrong rcpt?)", sender)
|
||||
}
|
||||
// TODO(bradfitz): add some counter for this that logs rarely
|
||||
return false
|
||||
}
|
||||
|
||||
dm, err := disco.Parse(payload)
|
||||
if logDisco {
|
||||
c.logf("magicsock: disco: disco.Parse = %T, %v", dm, err)
|
||||
}
|
||||
if err != nil {
|
||||
// Couldn't parse it, but it was inside a correctly
|
||||
// signed box, so just ignore it, assuming it's from a
|
||||
@@ -1452,17 +1489,20 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
|
||||
}
|
||||
|
||||
func (c *Conn) handlePongLocked(m *disco.Pong, n *tailcfg.Node, dk tailcfg.DiscoKey, from netaddr.IPPort) {
|
||||
de, ok := c.endpointOfDisco[dk]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.logf("magicsock: disco: got pong from %s, tx=%x, disco=%x, src=%v (they saw %v)", n.Key.ShortString(), m.TxID, dk[:8], from, m.Src)
|
||||
// TODO: implement
|
||||
go de.handlePong(m)
|
||||
}
|
||||
|
||||
func (c *Conn) handlePingLocked(m *disco.Ping, n *tailcfg.Node, dk tailcfg.DiscoKey, from netaddr.IPPort) {
|
||||
c.logf("magicsock: disco: got ping tx %x from %s/%x at %v", m.TxID, n.Key.ShortString(), dk[:8], from)
|
||||
reply := &disco.Pong{
|
||||
go c.sendDiscoMessage(from, key.Public(n.Key), dk, &disco.Pong{
|
||||
TxID: m.TxID,
|
||||
Src: from,
|
||||
}
|
||||
go c.sendAddr(from, key.Public(n.Key), reply.AppendMarshal(nil))
|
||||
})
|
||||
}
|
||||
|
||||
// handleCallMeMaybeLocked is called when a discovery message arrives
|
||||
@@ -1471,7 +1511,7 @@ func (c *Conn) handlePingLocked(m *disco.Ping, n *tailcfg.Node, dk tailcfg.Disco
|
||||
// stateful firewall should be open. Now we can Ping back and make it
|
||||
// through.
|
||||
func (c *Conn) handleCallMeMaybeLocked(n *tailcfg.Node, dk tailcfg.DiscoKey) {
|
||||
c.logf("magicsock: disco: got call-me-maybe packet from %s (disco=%x)", n.Key.ShortString, dk[:8])
|
||||
c.logf("magicsock: disco: got call-me-maybe packet from %s (disco=%x)", n.Key.ShortString(), dk[:8])
|
||||
// TODO: implement
|
||||
}
|
||||
|
||||
@@ -2491,22 +2531,24 @@ type discoEndpoint struct {
|
||||
lastSend time.Time
|
||||
derpAddr netaddr.IPPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients)
|
||||
|
||||
bestAddr netaddr.IPPort // best non-DERP path; zero if none
|
||||
sentPing map[stun.TxID]sentPing
|
||||
endpointState map[netaddr.IPPort]*endpointState
|
||||
bestAddr netaddr.IPPort // best non-DERP path; zero if none
|
||||
bestAddrLatency time.Duration
|
||||
bestAddrAt time.Time // time best address re-confirmed
|
||||
sentPing map[stun.TxID]sentPing
|
||||
endpointState map[netaddr.IPPort]*endpointState
|
||||
|
||||
timers map[*time.Timer]bool
|
||||
}
|
||||
|
||||
type endpointState struct {
|
||||
// TODO: lastPing time.Time
|
||||
lastPing time.Time
|
||||
// TODO: lastPong time.Time
|
||||
index int // index in nodecfg.Node.Endpoints
|
||||
}
|
||||
|
||||
type sentPing struct {
|
||||
// TODO: to netaddr.IPPort
|
||||
// TODO: at time.Time
|
||||
to netaddr.IPPort
|
||||
at time.Time
|
||||
}
|
||||
|
||||
// initFakeUDPAddr populates fakeWGAddr with a globally unique fake UDPAddr.
|
||||
@@ -2554,15 +2596,12 @@ func (de *discoEndpoint) UpdateDst(addr *net.UDPAddr) error {
|
||||
func (de *discoEndpoint) send(b []byte) error {
|
||||
now := time.Now()
|
||||
|
||||
// TODO: all the disco messaging & state tracking & spraying,
|
||||
// bringing over relevant AddrSet code. For now, just do DERP
|
||||
// as a crutch while I work on other bits.
|
||||
de.mu.Lock()
|
||||
de.lastSend = now
|
||||
derpAddr := de.derpAddr
|
||||
bestAddr := de.bestAddr
|
||||
if bestAddr.Port == 0 {
|
||||
de.sendPingsLocked()
|
||||
if bestAddr.IsZero() || de.bestAddrAt.Before(now.Add(-5*time.Second)) {
|
||||
de.sendPingsLocked(now)
|
||||
}
|
||||
de.mu.Unlock()
|
||||
|
||||
@@ -2575,8 +2614,36 @@ func (de *discoEndpoint) send(b []byte) error {
|
||||
return de.c.sendAddr(bestAddr, de.publicKey, b)
|
||||
}
|
||||
|
||||
func (de *discoEndpoint) sendPingsLocked() {
|
||||
// TODO
|
||||
func (de *discoEndpoint) sendPingsLocked(now time.Time) {
|
||||
sent := false
|
||||
for ep, st := range de.endpointState {
|
||||
if !st.lastPing.IsZero() && now.Sub(st.lastPing) < 5*time.Second {
|
||||
continue
|
||||
}
|
||||
st.lastPing = now
|
||||
|
||||
txid := stun.NewTxID()
|
||||
de.sentPing[txid] = sentPing{
|
||||
to: ep,
|
||||
at: now,
|
||||
}
|
||||
sent = true
|
||||
go de.sendDiscoMessage(ep, &disco.Ping{TxID: [12]byte(txid)})
|
||||
}
|
||||
derpAddr := de.derpAddr
|
||||
if sent && derpAddr.Port != 0 {
|
||||
// In just a bit of a time (for goroutines above to schedule and run),
|
||||
// send a message to peer via DERP informing them that we've sent
|
||||
// so our firewall ports are probably open and now would be a good time
|
||||
// for them to connect.
|
||||
time.AfterFunc(5*time.Millisecond, func() {
|
||||
de.sendDiscoMessage(derpAddr, disco.CallMeMaybe{})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (de *discoEndpoint) sendDiscoMessage(dst netaddr.IPPort, dm disco.Message) error {
|
||||
return de.c.sendDiscoMessage(dst, de.publicKey, de.discoKey, dm)
|
||||
}
|
||||
|
||||
func (de *discoEndpoint) updateFromNode(n *tailcfg.Node) {
|
||||
@@ -2629,6 +2696,37 @@ func (de *discoEndpoint) noteConnectivityChange() {
|
||||
// TODO: reset state
|
||||
}
|
||||
|
||||
func (de *discoEndpoint) handlePong(m *disco.Pong) {
|
||||
de.mu.Lock()
|
||||
defer de.mu.Unlock()
|
||||
|
||||
sp, ok := de.sentPing[m.TxID]
|
||||
if !ok {
|
||||
// This is not a pong for a ping we sent. Ignore.
|
||||
return
|
||||
}
|
||||
delete(de.sentPing, m.TxID)
|
||||
|
||||
now := time.Now()
|
||||
delay := now.Sub(sp.at)
|
||||
de.c.logf("magicsock: disco: got pong reply after %v", delay)
|
||||
|
||||
// Expire our best address if we haven't heard from it in awhile.
|
||||
tooOld := now.Add(-15 * time.Second)
|
||||
if !de.bestAddr.IsZero() && de.bestAddrAt.Before(tooOld) {
|
||||
de.bestAddr = netaddr.IPPort{}
|
||||
}
|
||||
|
||||
// Promote this pong response to our current best address if it's lower latency.
|
||||
// TODO(bradfitz): decide how latency vs. preference order affects decision
|
||||
if de.bestAddr.IsZero() || delay < de.bestAddrLatency {
|
||||
de.bestAddr = sp.to
|
||||
de.bestAddrLatency = delay
|
||||
de.bestAddrAt = now
|
||||
de.c.logf("magicsock: disco: promoted %v to best address for %v", sp.to, de.publicKey.ShortString())
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup is called when a discovery endpoint is no longer present in the NetworkMap.
|
||||
// This is where we can do cleanup such as closing goroutines or canceling timers.
|
||||
func (de *discoEndpoint) cleanup() {
|
||||
|
||||
Reference in New Issue
Block a user