derp/derpmap: new DERP config package, merge netcheck into magicsock more

Fixes #153
Updates #162
Updates #163

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2020-03-09 15:20:33 -07:00
committed by Brad Fitzpatrick
parent bd0e20f351
commit 39c0ae1dba
9 changed files with 453 additions and 254 deletions

View File

@@ -30,11 +30,11 @@ import (
"golang.org/x/time/rate"
"tailscale.com/derp"
"tailscale.com/derp/derphttp"
"tailscale.com/derp/derpmap"
"tailscale.com/interfaces"
"tailscale.com/net/dnscache"
"tailscale.com/netcheck"
"tailscale.com/stun"
"tailscale.com/stunner"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@@ -46,11 +46,13 @@ import (
type Conn struct {
pconn *RebindingUDPConn
pconnPort uint16
stunServers []string
startEpUpdate chan struct{} // send to trigger endpoint update
epFunc func(endpoints []string)
logf logger.Logf
sendLogLimit *rate.Limiter
derps *derpmap.World
netChecker *netcheck.Client
goroutines sync.WaitGroup
// bufferedIPv4From and bufferedIPv4Packet are owned by
// ReceiveIPv4, and used when both a DERP and IPv4 packet arrive
@@ -95,6 +97,15 @@ type Conn struct {
derpTLSConfig *tls.Config // normally nil; used by tests
}
// DerpMagicIP is a fake WireGuard endpoint IP address that means
// to use DERP. When used, the port number of the WireGuard endpoint
// is the DERP server number to use.
//
// Mnemonic: 3.3.40 are numbers above the keys D, E, R, P.
const DerpMagicIP = "127.3.3.40"
var derpMagicIP = net.ParseIP(DerpMagicIP).To4()
// activeDerp contains fields for an active DERP connection.
type activeDerp struct {
c *derphttp.Client
@@ -114,10 +125,7 @@ type udpAddr struct {
// The current default (zero) means to auto-select a random free port.
const DefaultPort = 0
var DefaultSTUN = []string{
"stun.l.google.com:19302",
"stun3.l.google.com:19302",
}
var DisableSTUNForTesting bool
// Options contains options for Listen.
type Options struct {
@@ -127,6 +135,8 @@ type Options struct {
// Zero means to pick one automatically.
Port uint16
// STUN, if non-empty, specifies alternate STUN servers for testing.
// If empty, the production DERP servers are used.
STUN []string
// EndpointsFunc optionally provides a func to be called when
@@ -178,7 +188,6 @@ func Listen(opts Options) (*Conn, error) {
pconn: new(RebindingUDPConn),
pconnPort: opts.Port,
sendLogLimit: rate.NewLimiter(rate.Every(1*time.Minute), 1),
stunServers: append([]string{}, opts.STUN...),
startEpUpdate: make(chan struct{}, 1),
connCtx: connCtx,
connCtxCancel: connCtxCancel,
@@ -190,11 +199,27 @@ func Listen(opts Options) (*Conn, error) {
derpRecvCh: make(chan derpReadResult),
udpRecvCh: make(chan udpReadResult),
derpTLSConfig: opts.derpTLSConfig,
derps: derpmap.Prod(),
}
if len(opts.STUN) > 0 {
c.derps = derpmap.NewTestWorld(opts.STUN...)
}
c.netChecker = &netcheck.Client{
DERP: c.derps,
Logf: logger.WithPrefix(c.logf, "netcheck: "),
GetSTUNConn4: func() netcheck.STUNConn { return c.pconn },
// TODO: add GetSTUNConn6 once Conn has a pconn6
}
c.ignoreSTUNPackets()
c.pconn.Reset(packetConn.(*net.UDPConn))
c.reSTUN()
go c.epUpdate(connCtx)
c.goroutines.Add(1)
go func() {
defer c.goroutines.Done()
c.epUpdate(connCtx)
}()
return c, nil
}
@@ -244,10 +269,14 @@ func (c *Conn) epUpdate(ctx context.Context) {
go func() {
defer close(lastDone)
c.updateNetInfo() // best effort
c.cleanStaleDerp()
endpoints, err := c.determineEndpoints(epCtx)
netReport, err := c.updateNetInfo(epCtx)
if err != nil {
c.logf("magicsock.Conn: updateNetInfo failed: %v", err)
return
}
endpoints, err := c.determineEndpoints(epCtx, netReport)
if err != nil {
c.logf("magicsock.Conn: endpoint update failed: %v", err)
// TODO(crawshaw): are there any conditions under which
@@ -258,36 +287,25 @@ func (c *Conn) epUpdate(ctx context.Context) {
return
}
lastEndpoints = endpoints
// TODO(bradfiz): get nearestDerp back to ipn for a HostInfo update
c.epFunc(endpoints)
}()
}
}
func (c *Conn) hasExternalSTUN() bool {
for _, hp := range c.stunServers {
if strings.Contains(hp, ".com:") {
// matches stun.l.google.com:19302 or derp\d+.tailscale.com:nnnn
return true
}
}
return false
}
func (c *Conn) updateNetInfo() {
logf := logger.WithPrefix(c.logf, "updateNetInfo: ")
if !c.hasExternalSTUN() {
logf("skipping in non-production mode")
return
func (c *Conn) updateNetInfo(ctx context.Context) (*netcheck.Report, error) {
if DisableSTUNForTesting {
return nil, errors.New("STUN disabled for testing")
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
report, err := netcheck.GetReport(ctx, logf)
c.stunReceiveFunc.Store(c.netChecker.ReceiveSTUNPacket)
defer c.ignoreSTUNPackets()
report, err := c.netChecker.GetReport(ctx)
if err != nil {
logf("GetReport: %v", err)
return
return nil, err
}
ni := &tailcfg.NetInfo{
@@ -314,6 +332,7 @@ func (c *Conn) updateNetInfo() {
// TODO: set link type
c.callNetInfoCallback(ni)
return report, nil
}
var processStartUnixNano = time.Now().UnixNano()
@@ -331,14 +350,15 @@ func (c *Conn) pickDERPFallback() int {
return c.myDerp
}
if len(derpNodeID) == 0 {
ids := c.derps.IDs()
if len(ids) == 0 {
// No DERP nodes registered.
return 0
}
h := fnv.New64()
h.Write([]byte(fmt.Sprintf("%p/%d", c, processStartUnixNano))) // arbitrary
return derpNodeID[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(derpNodeID))]
return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))]
}
// callNetInfoCallback calls the NetInfo callback (if previously
@@ -385,7 +405,7 @@ func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
return true
}
c.myDerp = derpNum
c.logf("home DERP server is now %s", derpHost(derpNum))
c.logf("home DERP server is now %v, %v", derpNum, c.derps.ServerByID(derpNum))
for i, ad := range c.activeDerp {
go ad.c.NotePreferred(i == c.myDerp)
}
@@ -398,35 +418,24 @@ func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) {
// determineEndpoints returns the machine's endpoint addresses. It
// does a STUN lookup to determine its public address.
func (c *Conn) determineEndpoints(ctx context.Context) (ipPorts []string, err error) {
var (
alreadyMu sync.Mutex
already = make(map[string]bool) // endpoint -> true
)
var eps []string // unique endpoints
func (c *Conn) determineEndpoints(ctx context.Context, nr *netcheck.Report) (ipPorts []string, err error) {
already := make(map[string]bool) // endpoint -> true
var eps []string // unique endpoints
addAddr := func(s, reason string) {
c.logf("magicsock: found local %s (%s)\n", s, reason)
alreadyMu.Lock()
defer alreadyMu.Unlock()
if !already[s] {
already[s] = true
eps = append(eps, s)
}
}
s := &stunner.Stunner{
Send: c.pconn.WriteTo,
Endpoint: func(server, endpoint string, d time.Duration) { addAddr(endpoint, "stun") },
Servers: c.stunServers,
Logf: c.logf,
if nr.GlobalV4 != "" {
addAddr(nr.GlobalV4, "stun")
}
c.stunReceiveFunc.Store(s.Receive)
if err := s.Run(ctx); err != nil {
return nil, err
const tailControlDoesIPv6 = false // TODO: when IPv6 filtering/splitting is enabled in tailcontrol
if nr.GlobalV6 != "" && tailControlDoesIPv6 {
addAddr(nr.GlobalV6, "stun")
}
c.ignoreSTUNPackets()
@@ -615,7 +624,9 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
ret = err
}
if err != nil && addr != roamAddr && c.sendLogLimit.Allow() {
c.logf("magicsock: Conn.Send(%v): %v", addr, err)
if c.connCtx.Err() == nil { // don't log if we're closed
c.logf("magicsock: Conn.Send(%v): %v", addr, err)
}
}
}
if success {
@@ -686,14 +697,15 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
if c.activeDerp == nil {
c.activeDerp = make(map[int]activeDerp)
}
host := derpHost(addr.Port)
if host == "" {
derpSrv := c.derps.ServerByID(addr.Port)
if derpSrv == nil || derpSrv.HostHTTPS == "" {
return nil
}
// TODO(bradfitz): don't hold derpMu here. It's slow. Release first and use singleflight to dial+re-lock to add.
dc, err := derphttp.NewClient(c.privateKey, "https://"+host+"/derp", c.logf)
dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf)
if err != nil {
c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", addr.Port, host, err)
c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", addr.Port, derpSrv.HostHTTPS, err)
return nil
}
dc.NotePreferred(c.myDerp == addr.Port)
@@ -852,6 +864,34 @@ type udpReadResult struct {
// immediate cancellation of network operations.
var aLongTimeAgo = time.Unix(233431200, 0)
// awaitUDP4 reads a single IPv4 UDP packet (or an error) and sends it
// to c.udpRecvCh, skipping over (but handling) any STUN replies.
func (c *Conn) awaitUDP4(b []byte) {
for {
n, pAddr, err := c.pconn.ReadFrom(b)
if err != nil {
select {
case c.udpRecvCh <- udpReadResult{err: err}:
case <-c.donec():
}
return
}
addr := pAddr.(*net.UDPAddr)
if stun.Is(b[:n]) {
c.stunReceiveFunc.Load().(func([]byte, *net.UDPAddr))(b, addr)
continue
}
addr.IP = addr.IP.To4()
select {
case c.udpRecvCh <- udpReadResult{n: n, addr: addr}:
case <-c.donec():
}
return
}
}
func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr, err error) {
// First, process any buffered packet from earlier.
if addr := c.bufferedIPv4From; addr != nil {
@@ -859,31 +899,7 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, addr *net.UDPAddr
return copy(b, c.bufferedIPv4Packet), c.findEndpoint(addr), addr, nil
}
go func() {
// Read a packet, and process any STUN packets before returning.
for {
n, pAddr, err := c.pconn.ReadFrom(b)
if err != nil {
select {
case c.udpRecvCh <- udpReadResult{err: err}:
case <-c.donec():
}
return
}
if stun.Is(b[:n]) {
c.stunReceiveFunc.Load().(func([]byte, *net.UDPAddr))(b, addr)
continue
}
addr := pAddr.(*net.UDPAddr)
addr.IP = addr.IP.To4()
select {
case c.udpRecvCh <- udpReadResult{n: n, addr: addr}:
case <-c.donec():
}
return
}
}()
go c.awaitUDP4(b)
// Once the above goroutine has started, it owns b until it writes
// to udpRecvCh. The code below must not access b until it's
@@ -1052,7 +1068,9 @@ func (c *Conn) Close() error {
c.closeAllDerpLocked()
c.derpMu.Unlock()
return c.pconn.Close()
err := c.pconn.Close()
c.goroutines.Wait()
return err
}
func (c *Conn) reSTUN() {