wgengine/magicsock: add new endpoint type used for discovery-supporting peers

This adds a new magicsock endpoint type only used when both sides
support discovery (that is, are advertising a discovery
key). Otherwise the old code is used.

So far the new code only communicates over DERP as proof that the new
code paths are wired up. None of the actually discovery messaging is
implemented yet.

Support for discovery (generating and advertising a key) are still
behind an environment variable for now.

Updates #483
This commit is contained in:
Brad Fitzpatrick 2020-06-28 11:53:37 -07:00
parent 72bfea2ece
commit a975e86bb8
5 changed files with 282 additions and 50 deletions

View File

@ -451,8 +451,6 @@ func (c *Direct) SetEndpoints(localPort uint16, endpoints []string) (changed boo
return c.newEndpoints(localPort, endpoints)
}
var debugNetmap, _ = strconv.ParseBool(os.Getenv("TS_DEBUG_NETMAP"))
func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkMap)) error {
c.mu.Lock()
persist := c.persist
@ -472,7 +470,7 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM
c.logf("PollNetMap: stream=%v :%v %v", maxPolls, localPort, ep)
vlogf := logger.Discard
if debugNetmap {
if Debug.NetMap {
vlogf = c.logf
}
@ -603,6 +601,18 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM
if resp.Debug != nil && resp.Debug.LogHeapPprof {
go logheap.LogHeap(resp.Debug.LogHeapURL)
}
// Temporarily (2020-06-29) support removing all but
// discovery-supporting nodes during development, for
// less noise.
if Debug.OnlyDisco {
filtered := resp.Peers[:0]
for _, p := range resp.Peers {
if !p.DiscoKey.IsZero() {
filtered = append(filtered, p)
}
}
resp.Peers = filtered
}
nm := &NetworkMap{
NodeKey: tailcfg.NodeKey(persist.PrivateNodeKey.Public()),
@ -766,3 +776,33 @@ func loadServerKey(ctx context.Context, httpc *http.Client, serverURL string) (w
}
return key, nil
}
// Debug contains temporary internal-only debug knobs.
// They're unexported to not draw attention to them.
var Debug = initDebug()
type debug struct {
NetMap bool
OnlyDisco bool
Disco bool
}
func initDebug() debug {
return debug{
NetMap: envBool("TS_DEBUG_NETMAP"),
OnlyDisco: os.Getenv("TS_DEBUG_USE_DISCO") == "only",
Disco: os.Getenv("TS_DEBUG_USE_DISCO") == "only" || envBool("TS_DEBUG_USE_DISCO"),
}
}
func envBool(k string) bool {
e := os.Getenv(k)
if e == "" {
return false
}
v, err := strconv.ParseBool(os.Getenv("TS_DEBUG_NETMAP"))
if err != nil {
panic(fmt.Sprintf("invalid non-bool %q for env var %q", e, k))
}
return v
}

View File

@ -204,6 +204,11 @@ func (nm *NetworkMap) WGCfg(logf logger.Logf, uflags int, dnsOverride []wgcfg.IP
return wgcfg.FromWgQuick(s, "tailscale")
}
// EndpointDiscoSuffix is appended to the hex representation of a peer's discovery key
// and is then the sole wireguard endpoint for peers with a non-zero discovery key.
// This form is then recognize by magicsock's CreateEndpoint.
const EndpointDiscoSuffix = ".disco.tailscale:12345"
func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride []wgcfg.IP, allEndpoints bool) string {
buf := new(strings.Builder)
fmt.Fprintf(buf, "[Interface]\n")
@ -229,6 +234,9 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride
fmt.Fprintf(buf, "\n")
for i, peer := range nm.Peers {
if Debug.OnlyDisco && peer.DiscoKey.IsZero() {
continue
}
if (uflags&UAllowSingleHosts) == 0 && len(peer.AllowedIPs) < 2 {
logf("wgcfg: %v skipping a single-host peer.\n", peer.Key.ShortString())
continue
@ -239,6 +247,9 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride
fmt.Fprintf(buf, "[Peer]\n")
fmt.Fprintf(buf, "PublicKey = %s\n", base64.StdEncoding.EncodeToString(peer.Key[:]))
var endpoints []string
if !peer.DiscoKey.IsZero() {
fmt.Fprintf(buf, "Endpoint = %x%s\n", peer.DiscoKey[:], EndpointDiscoSuffix)
} else {
if peer.DERP != "" {
endpoints = append(endpoints, peer.DERP)
}
@ -250,8 +261,7 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride
// TODO(apenwarr): This mode is incompatible.
// Normal wireguard clients don't know how to
// parse it (yet?)
fmt.Fprintf(buf, "Endpoint = %s",
strings.Join(endpoints, ","))
fmt.Fprintf(buf, "Endpoint = %s", strings.Join(endpoints, ","))
} else {
fmt.Fprintf(buf, "Endpoint = %s # other endpoints: %s",
endpoints[0],
@ -259,6 +269,7 @@ func (nm *NetworkMap) _WireGuardConfig(logf logger.Logf, uflags int, dnsOverride
}
buf.WriteByte('\n')
}
}
var aips []string
for _, allowedIP := range peer.AllowedIPs {
aip := allowedIP.String()

View File

@ -8,8 +8,6 @@
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
@ -362,7 +360,7 @@ func (b *LocalBackend) Start(opts Options) error {
b.updateFilter(nil)
var discoPublic tailcfg.DiscoKey
if useDisco, _ := strconv.ParseBool(os.Getenv("TS_DEBUG_USE_DISCO")); useDisco {
if controlclient.Debug.Disco {
discoPrivate := key.NewPrivate()
b.e.SetDiscoPrivateKey(discoPrivate)
discoPublic = tailcfg.DiscoKey(discoPrivate.Public())

View File

@ -28,6 +28,7 @@
"github.com/tailscale/wireguard-go/conn"
"github.com/tailscale/wireguard-go/device"
"github.com/tailscale/wireguard-go/wgcfg"
"go4.org/mem"
"golang.org/x/crypto/nacl/box"
"golang.org/x/time/rate"
"inet.af/netaddr"
@ -80,8 +81,8 @@ type Conn struct {
// ============================================================
mu sync.Mutex // guards all following fields
started bool
closed bool
started bool // Start was called
closed bool // Close was called
endpointsUpdateWaiter *sync.Cond
endpointsUpdateActive bool
@ -90,9 +91,11 @@ type Conn struct {
peerSet map[key.Public]struct{}
discoPrivate key.Private
nodeOfDisco map[tailcfg.DiscoKey]tailcfg.NodeKey
nodeOfDisco map[tailcfg.DiscoKey]*tailcfg.Node
discoOfNode map[tailcfg.NodeKey]tailcfg.DiscoKey
endpointOfDisco map[tailcfg.DiscoKey]*discoEndpoint
// addrsByUDP is a map of every remote ip:port to a priority
// list of endpoint addresses for a peer.
// The priority list is provided by wgengine configuration.
@ -246,6 +249,7 @@ func newConn() *Conn {
udpRecvCh: make(chan udpReadResult),
derpStarted: make(chan struct{}),
peerLastDerp: make(map[key.Public]int),
endpointOfDisco: make(map[tailcfg.DiscoKey]*discoEndpoint),
}
c.endpointsUpdateWaiter = sync.NewCond(&c.mu)
return c
@ -732,6 +736,8 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
switch v := ep.(type) {
default:
panic(fmt.Sprintf("[unexpected] Endpoint type %T", v))
case *discoEndpoint:
return v.send(b)
case *singleEndpoint:
addr := (*net.UDPAddr)(v)
if addr.IP.Equal(derpMagicIP) {
@ -1179,14 +1185,25 @@ func (c *Conn) awaitUDP4(b []byte) {
}
return
}
}
// wgRecvAddr conditionally alters the returned UDPAddr we tell
// wireguard-go we received a packet from. For peers with discovery
// keys, we always use the same one, a unique synthetic value created
// per peer.
func wgRecvAddr(e conn.Endpoint, addr *net.UDPAddr) *net.UDPAddr {
if de, ok := e.(*discoEndpoint); ok {
return de.fakeWGAddr
}
return addr
}
func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr, err error) {
// First, process any buffered packet from earlier.
if addr := c.bufferedIPv4From; addr != nil {
c.bufferedIPv4From = nil
return copy(b, c.bufferedIPv4Packet), c.findEndpoint(addr), addr, nil
ep := c.findEndpoint(addr)
return copy(b, c.bufferedIPv4Packet), ep, wgRecvAddr(ep, addr), nil
}
go c.awaitUDP4(b)
@ -1196,6 +1213,7 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr
// completed a successful receive on udpRecvCh.
var addrSet *AddrSet
var discoEp *discoEndpoint
select {
case dm := <-c.derpRecvCh:
@ -1229,10 +1247,15 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr
}
c.mu.Lock()
if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok {
discoEp = c.endpointOfDisco[dk]
}
if discoEp == nil {
addrSet = c.addrsByKey[dm.src]
}
c.mu.Unlock()
if addrSet == nil {
if addrSet == nil && discoEp == nil {
key := wgcfg.Key(dm.src)
c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString())
}
@ -1259,10 +1282,12 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr
if addrSet != nil {
ep = addrSet
} else if discoEp != nil {
ep = discoEp
} else {
ep = c.findEndpoint(addr)
}
return n, ep, addr, nil
return n, ep, wgRecvAddr(ep, addr), nil
}
func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) {
@ -1280,7 +1305,7 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, *net.UDPAddr, error) {
continue
}
ep := c.findEndpoint(addr)
return n, ep, addr, nil
return n, ep, wgRecvAddr(ep, addr), nil
}
}
@ -1310,7 +1335,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, addr *net.UDPAddr) bool {
return false
}
senderNodeKey, ok := c.nodeOfDisco[sender]
senderNode, ok := c.nodeOfDisco[sender]
if !ok {
// Returning false keeps passing it down, to WireGuard.
// WireGuard will almost surely reject it, but give it a chance.
@ -1325,11 +1350,11 @@ func (c *Conn) handleDiscoMessage(msg []byte, addr *net.UDPAddr) bool {
sealedBox := msg[headerLen:]
payload, ok := box.Open(nil, sealedBox, &nonce, key.Public(sender).B32(), c.discoPrivate.B32())
if !ok {
c.logf("magicsock: failed to open disco message box purportedly from %s (disco key %x)", senderNodeKey.ShortString(), sender[:])
c.logf("magicsock: failed to open disco message box purportedly from %s (disco key %x)", senderNode.Key.ShortString(), sender[:])
return false
}
c.logf("magicsock: got disco message from %s: %x (%q)", senderNodeKey.ShortString(), payload, payload)
c.logf("magicsock: got disco message from %s: %x (%q)", senderNode.Key.ShortString(), payload, payload)
return true
}
@ -1427,8 +1452,12 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) {
numDisco := 0
for _, n := range nm.Peers {
if !n.DiscoKey.IsZero() {
if n.DiscoKey.IsZero() {
continue
}
numDisco++
if ep, ok := c.endpointOfDisco[n.DiscoKey]; ok {
ep.updateFromNode(n)
}
}
@ -1439,12 +1468,12 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) {
// the set of discokeys changed.
for pass := 1; pass <= 2; pass++ {
if c.nodeOfDisco == nil || pass == 2 {
c.nodeOfDisco = map[tailcfg.DiscoKey]tailcfg.NodeKey{}
c.nodeOfDisco = map[tailcfg.DiscoKey]*tailcfg.Node{}
c.discoOfNode = map[tailcfg.NodeKey]tailcfg.DiscoKey{}
}
for _, n := range nm.Peers {
if !n.DiscoKey.IsZero() {
c.nodeOfDisco[n.DiscoKey] = n.Key
c.nodeOfDisco[n.DiscoKey] = n
if old, ok := c.discoOfNode[n.Key]; ok && old != n.DiscoKey {
c.logf("magicsock: node %s changed discovery key from %x to %x", n.Key.ShortString(), old[:8], n.DiscoKey[:8])
// TODO: reset AddrSet states, reset wireguard session key, etc.
@ -1457,6 +1486,14 @@ func (c *Conn) SetNetworkMap(nm *controlclient.NetworkMap) {
}
}
// Clean c.endpointOfDisco for discovery keys that are no longer present.
for dk, de := range c.endpointOfDisco {
if _, ok := c.nodeOfDisco[dk]; !ok {
de.cleanup()
delete(c.endpointOfDisco, dk)
}
}
}
func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil }
@ -1776,9 +1813,15 @@ func (c *Conn) resetAddrSetStates() {
as.curAddr = -1
as.stopSpray = as.timeNow().Add(sprayPeriod)
}
for _, de := range c.endpointOfDisco {
de.noteConnectivityChange()
}
}
// AddrSet is a set of UDP addresses that implements wireguard/conn.Endpoint.
//
// This is the legacy endpoint for peers that don't support discovery;
// it predates discoEndpoint.
type AddrSet struct {
publicKey key.Public // peer public key used for DERP communication
@ -2018,11 +2061,39 @@ func (c *Conn) CreateBind(uint16) (conn.Bind, uint16, error) {
}
// CreateEndpoint is called by WireGuard to connect to an endpoint.
// The key is the public key of the peer and addrs is a
// comma-separated list of UDP ip:ports.
//
// The key is the public key of the peer and addrs is either:
//
// 1) a comma-separated list of UDP ip:ports (the the peer doesn't have a discovery key)
// 2) "<hex-discovery-key>.disco.tailscale:12345", a magic value that means the peer
// is running code that supports active discovery, so CreateEndpoint returns
// a discoEndpoint.
//
func (c *Conn) CreateEndpoint(pubKey [32]byte, addrs string) (conn.Endpoint, error) {
pk := key.Public(pubKey)
c.logf("magicsock: CreateEndpoint: key=%s: %s", pk.ShortString(), strings.ReplaceAll(addrs, "127.3.3.40:", "derp-"))
if strings.HasSuffix(addrs, controlclient.EndpointDiscoSuffix) {
discoHex := strings.TrimSuffix(addrs, controlclient.EndpointDiscoSuffix)
discoKey, err := key.NewPublicFromHexMem(mem.S(discoHex))
if err != nil {
return nil, fmt.Errorf("magicsock: invalid discokey endpoint %q for %v: %w", addrs, pk.ShortString(), err)
}
c.mu.Lock()
defer c.mu.Unlock()
de := &discoEndpoint{
c: c,
publicKey: pk, // peer public key (for WireGuard + DERP)
discoKey: tailcfg.DiscoKey(discoKey), // for discovery mesages
wgEndpointHostPort: addrs,
}
de.initFakeUDPAddr()
de.updateFromNode(c.nodeOfDisco[de.discoKey])
c.endpointOfDisco[de.discoKey] = de
return de, nil
}
a := &AddrSet{
Logf: c.logf,
publicKey: pk,
@ -2075,6 +2146,9 @@ func (c *Conn) CreateEndpoint(pubKey [32]byte, addrs string) (conn.Endpoint, err
return a, nil
}
// singleEndpoint is a wireguard-go/conn.Endpoint used for "roaming
// addressed" in releases of Tailscale that predate discovery
// messages. New peers use discoEndpoint.
type singleEndpoint net.UDPAddr
func (e *singleEndpoint) ClearSrc() {}
@ -2250,3 +2324,112 @@ func udpAddrDebugString(ua net.UDPAddr) string {
}
return ua.String()
}
// discoEndpoint is a wireguard/conn.Endpoint for new-style peers that
// advertise a DiscoKey and participate in active discovery.
type discoEndpoint struct {
c *Conn
publicKey key.Public // peer public key (for WireGuard + DERP)
discoKey tailcfg.DiscoKey // for discovery mesages
fakeWGAddr *net.UDPAddr // the UDPAddr we tell wireguard-go we're using
wgEndpointHostPort string // string from CreateEndpoint: "<hex-discovery-key>.disco.tailscale:12345"
mu sync.Mutex // Lock ordering: Conn.mu, then discoEndpoint.mu
derpAddr *net.UDPAddr
}
// initFakeUDPAddr populates fakeWGAddr with a globally unique fake UDPAddr.
// The current implementation just uses the pointer value of de jammed into an IPv6
// address, but it could also be, say, a counter.
func (de *discoEndpoint) initFakeUDPAddr() {
var addr [16]byte
addr[0] = 0xfd
addr[1] = 0x00
binary.BigEndian.PutUint64(addr[2:], uint64(reflect.ValueOf(de).Pointer()))
ipp := netaddr.IPPort{
IP: netaddr.IPFrom16(addr),
Port: 12345,
}
de.fakeWGAddr = ipp.UDPAddr()
}
func (de *discoEndpoint) Addrs() []wgcfg.Endpoint {
// This has to be the same string that was passed to
// CreateEndpoint, otherwise Reconfig will end up recreating
// Endpoints and losing state over time.
host, portStr, err := net.SplitHostPort(de.wgEndpointHostPort)
if err != nil {
panic(err)
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
panic(err)
}
return []wgcfg.Endpoint{{host, uint16(port)}}
}
func (de *discoEndpoint) ClearSrc() {}
func (de *discoEndpoint) SrcToString() string { panic("unused") } // unused by wireguard-go
func (de *discoEndpoint) SrcIP() net.IP { panic("unused") } // unused by wireguard-go
func (de *discoEndpoint) DstToString() string { return de.wgEndpointHostPort }
func (de *discoEndpoint) DstIP() net.IP { panic("unused") }
func (de *discoEndpoint) DstToBytes() []byte { return de.fakeWGAddr.IP[:] }
func (de *discoEndpoint) UpdateDst(addr *net.UDPAddr) error {
// This is called ~per packet (and requiring a mutex acquisition inside wireguard-go).
// TODO(bradfitz): make that cheaper and/or remove it. We don't need it.
return nil
}
func (de *discoEndpoint) send(b []byte) error {
// TODO: all the disco messaging & state tracking & spraying,
// bringing over relevant AddrSet code. For now, just do DERP
// as a crutch while I work on other bits.
de.mu.Lock()
derpAddr := de.derpAddr
de.mu.Unlock()
if derpAddr == nil {
return errors.New("no DERP addr")
}
return de.c.sendAddr(derpAddr, de.publicKey, b)
}
func (de *discoEndpoint) updateFromNode(n *tailcfg.Node) {
if n == nil {
// TODO: log, error, count? if this even happens.
return
}
de.mu.Lock()
defer de.mu.Unlock()
if n.DERP == "" {
de.derpAddr = nil
} else {
// TODO: add ParseIPPort to netaddr package; only safe to use ResolveUDPAddr
// here because we know no DNS lookups are involved
ua, _ := net.ResolveUDPAddr("udp", n.DERP)
de.derpAddr = ua
}
// TODO: parse all the endpoints, not just DERP
}
// noteConnectivityChange is called when connectivity changes enough
// that we should question our earlier assumptions about which paths
// work.
func (de *discoEndpoint) noteConnectivityChange() {
de.mu.Lock()
defer de.mu.Unlock()
// TODO: reset state
}
// cleanup is called when a discovery endpoint is no longer present in the NetworkMap.
// This is where we can do cleanup such as closing goroutines or canceling timers.
func (de *discoEndpoint) cleanup() {
de.mu.Lock()
defer de.mu.Unlock()
// TODO: real work later, when there's stuff to do
de.c.logf("magicsock: doing cleanup for discovery key %x", de.discoKey[:])
}

View File

@ -844,8 +844,8 @@ func TestDiscoMessage(t *testing.T) {
c := &Conn{
logf: t.Logf,
discoPrivate: key.NewPrivate(),
nodeOfDisco: map[tailcfg.DiscoKey]tailcfg.NodeKey{
tailcfg.DiscoKey(peer1Pub): tailcfg.NodeKey{1: 1},
nodeOfDisco: map[tailcfg.DiscoKey]*tailcfg.Node{
tailcfg.DiscoKey(peer1Pub): &tailcfg.Node{Key: tailcfg.NodeKey{1: 1}},
},
}