mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 13:05:46 +00:00
7330aa593e
On some platforms (notably macOS and iOS) we look up the default interface to bind outgoing connections to. This is both duplicated work and results in logspam when the default interface is not available (i.e. when a phone has no connectivity, we log an error and thus cause more things that we will try to upload and fail). Fixed by passing around a netmon.Monitor to more places, so that we can use its cached interface state. Fixes #7850 Updates #7621 Signed-off-by: Mihai Parparita <mihai@tailscale.com>
411 lines
10 KiB
Go
411 lines
10 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package prober
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
crand "crypto/rand"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"tailscale.com/derp"
|
|
"tailscale.com/derp/derphttp"
|
|
"tailscale.com/net/stun"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
)
|
|
|
|
// derpProber dynamically manages several probes for each DERP server
|
|
// based on the current DERPMap.
|
|
type derpProber struct {
|
|
p *Prober
|
|
derpMapURL string
|
|
udpInterval time.Duration
|
|
meshInterval time.Duration
|
|
tlsInterval time.Duration
|
|
|
|
// Probe functions that can be overridden for testing.
|
|
tlsProbeFn func(string) ProbeFunc
|
|
udpProbeFn func(string, int) ProbeFunc
|
|
meshProbeFn func(string, string) ProbeFunc
|
|
|
|
sync.Mutex
|
|
lastDERPMap *tailcfg.DERPMap
|
|
lastDERPMapAt time.Time
|
|
nodes map[string]*tailcfg.DERPNode
|
|
probes map[string]*Probe
|
|
}
|
|
|
|
// DERP creates a new derpProber.
|
|
func DERP(p *Prober, derpMapURL string, udpInterval, meshInterval, tlsInterval time.Duration) (*derpProber, error) {
|
|
d := &derpProber{
|
|
p: p,
|
|
derpMapURL: derpMapURL,
|
|
udpInterval: udpInterval,
|
|
meshInterval: meshInterval,
|
|
tlsInterval: tlsInterval,
|
|
tlsProbeFn: TLS,
|
|
nodes: make(map[string]*tailcfg.DERPNode),
|
|
probes: make(map[string]*Probe),
|
|
}
|
|
d.udpProbeFn = d.ProbeUDP
|
|
d.meshProbeFn = d.probeMesh
|
|
return d, nil
|
|
}
|
|
|
|
// ProbeMap fetches the DERPMap and creates/destroys probes for each
|
|
// DERP server as necessary. It should get regularly executed as a
|
|
// probe function itself.
|
|
func (d *derpProber) ProbeMap(ctx context.Context) error {
|
|
if err := d.updateMap(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
wantProbes := map[string]bool{}
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
for _, region := range d.lastDERPMap.Regions {
|
|
for _, server := range region.Nodes {
|
|
labels := map[string]string{
|
|
"region": region.RegionCode,
|
|
"region_id": strconv.Itoa(region.RegionID),
|
|
"hostname": server.HostName,
|
|
}
|
|
|
|
n := fmt.Sprintf("derp/%s/%s/tls", region.RegionCode, server.Name)
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP TLS probe for %s (%s)", server.Name, region.RegionName)
|
|
d.probes[n] = d.p.Run(n, d.tlsInterval, labels, d.tlsProbeFn(server.HostName+":443"))
|
|
}
|
|
|
|
for idx, ipStr := range []string{server.IPv6, server.IPv4} {
|
|
n = fmt.Sprintf("derp/%s/%s/udp", region.RegionCode, server.Name)
|
|
if idx == 0 {
|
|
n = n + "6"
|
|
}
|
|
|
|
if ipStr == "" || server.STUNPort == -1 {
|
|
continue
|
|
}
|
|
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP UDP probe for %s (%s)", server.Name, n)
|
|
d.probes[n] = d.p.Run(n, d.udpInterval, labels, d.udpProbeFn(ipStr, server.STUNPort))
|
|
}
|
|
}
|
|
|
|
for _, to := range region.Nodes {
|
|
n = fmt.Sprintf("derp/%s/%s/%s/mesh", region.RegionCode, server.Name, to.Name)
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP mesh probe for %s->%s (%s)", server.Name, to.Name, region.RegionName)
|
|
d.probes[n] = d.p.Run(n, d.meshInterval, labels, d.meshProbeFn(server.HostName, to.HostName))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for n, probe := range d.probes {
|
|
if !wantProbes[n] {
|
|
log.Printf("removing DERP probe %s", n)
|
|
probe.Close()
|
|
delete(d.probes, n)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *derpProber) probeMesh(from, to string) ProbeFunc {
|
|
return func(ctx context.Context) error {
|
|
d.Lock()
|
|
dm := d.lastDERPMap
|
|
fromN, ok := d.nodes[from]
|
|
if !ok {
|
|
d.Unlock()
|
|
return fmt.Errorf("could not find derp node %s", from)
|
|
}
|
|
toN, ok := d.nodes[to]
|
|
if !ok {
|
|
d.Unlock()
|
|
return fmt.Errorf("could not find derp node %s", to)
|
|
}
|
|
d.Unlock()
|
|
|
|
// TODO: instead of ignoring latency, export it as a separate metric.
|
|
_, err := derpProbeNodePair(ctx, dm, fromN, toN)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (d *derpProber) updateMap(ctx context.Context) error {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", d.derpMapURL, nil)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
res, err := httpOrFileClient.Do(req)
|
|
if err != nil {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
if d.lastDERPMap != nil && time.Since(d.lastDERPMapAt) < 10*time.Minute {
|
|
log.Printf("Error while fetching DERP map, using cached one: %s", err)
|
|
// Assume that control is restarting and use
|
|
// the same one for a bit.
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode != 200 {
|
|
return fmt.Errorf("fetching %s: %s", d.derpMapURL, res.Status)
|
|
}
|
|
dm := new(tailcfg.DERPMap)
|
|
if err := json.NewDecoder(res.Body).Decode(dm); err != nil {
|
|
return fmt.Errorf("decoding %s JSON: %v", d.derpMapURL, err)
|
|
}
|
|
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
d.lastDERPMap = dm
|
|
d.lastDERPMapAt = time.Now()
|
|
d.nodes = make(map[string]*tailcfg.DERPNode)
|
|
for _, reg := range d.lastDERPMap.Regions {
|
|
for _, n := range reg.Nodes {
|
|
if existing, ok := d.nodes[n.HostName]; ok {
|
|
return fmt.Errorf("derpmap has duplicate nodes: %+v and %+v", existing, n)
|
|
}
|
|
d.nodes[n.HostName] = n
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeFunc {
|
|
return func(ctx context.Context) error {
|
|
_, err := derpProbeUDP(ctx, ipaddr, port)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func derpProbeUDP(ctx context.Context, ipStr string, port int) (latency time.Duration, err error) {
|
|
pc, err := net.ListenPacket("udp", ":0")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer pc.Close()
|
|
uc := pc.(*net.UDPConn)
|
|
|
|
tx := stun.NewTxID()
|
|
req := stun.Request(tx)
|
|
|
|
if port == 0 {
|
|
port = 3478
|
|
}
|
|
for {
|
|
ip := net.ParseIP(ipStr)
|
|
_, err := uc.WriteToUDP(req, &net.UDPAddr{IP: ip, Port: port})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// Binding requests and responses are fairly small (~40 bytes),
|
|
// but in practice a STUN response can be up to the size of the
|
|
// path MTU, so we use a jumbo frame size buffer here.
|
|
buf := make([]byte, 9000)
|
|
uc.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
t0 := time.Now()
|
|
n, _, err := uc.ReadFromUDP(buf)
|
|
d := time.Since(t0)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return 0, fmt.Errorf("timeout reading from %v: %v", ip, err)
|
|
}
|
|
if d < time.Second {
|
|
return 0, fmt.Errorf("error reading from %v: %v", ip, err)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
txBack, _, err := stun.ParseResponse(buf[:n])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("parsing STUN response from %v: %v", ip, err)
|
|
}
|
|
if txBack != tx {
|
|
return 0, fmt.Errorf("read wrong tx back from %v", ip)
|
|
}
|
|
if latency == 0 || d < latency {
|
|
latency = d
|
|
}
|
|
break
|
|
}
|
|
return latency, nil
|
|
}
|
|
|
|
func derpProbeNodePair(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode) (latency time.Duration, err error) {
|
|
fromc, err := newConn(ctx, dm, from)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer fromc.Close()
|
|
toc, err := newConn(ctx, dm, to)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer toc.Close()
|
|
|
|
// Wait a bit for from's node to hear about to existing on the
|
|
// other node in the region, in the case where the two nodes
|
|
// are different.
|
|
if from.Name != to.Name {
|
|
time.Sleep(100 * time.Millisecond) // pretty arbitrary
|
|
}
|
|
|
|
latency, err = runDerpProbeNodePair(ctx, from, to, fromc, toc)
|
|
if err != nil {
|
|
// Record pubkeys on failed probes to aid investigation.
|
|
err = fmt.Errorf("%s -> %s: %w",
|
|
fromc.SelfPublicKey().ShortString(),
|
|
toc.SelfPublicKey().ShortString(), err)
|
|
}
|
|
return latency, err
|
|
}
|
|
|
|
func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client) (latency time.Duration, err error) {
|
|
// Make a random packet
|
|
pkt := make([]byte, 8)
|
|
crand.Read(pkt)
|
|
|
|
t0 := time.Now()
|
|
|
|
// Send the random packet.
|
|
sendc := make(chan error, 1)
|
|
go func() {
|
|
sendc <- fromc.Send(toc.SelfPublicKey(), pkt)
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, fmt.Errorf("timeout sending via %q: %w", from.Name, ctx.Err())
|
|
case err := <-sendc:
|
|
if err != nil {
|
|
return 0, fmt.Errorf("error sending via %q: %w", from.Name, err)
|
|
}
|
|
}
|
|
|
|
// Receive the random packet.
|
|
recvc := make(chan any, 1) // either derp.ReceivedPacket or error
|
|
go func() {
|
|
for {
|
|
m, err := toc.Recv()
|
|
if err != nil {
|
|
recvc <- err
|
|
return
|
|
}
|
|
switch v := m.(type) {
|
|
case derp.ReceivedPacket:
|
|
recvc <- v
|
|
default:
|
|
log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
|
|
// Loop.
|
|
}
|
|
}
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, fmt.Errorf("timeout receiving from %q: %w", to.Name, ctx.Err())
|
|
case v := <-recvc:
|
|
if err, ok := v.(error); ok {
|
|
return 0, fmt.Errorf("error receiving from %q: %w", to.Name, err)
|
|
}
|
|
p := v.(derp.ReceivedPacket)
|
|
if p.Source != fromc.SelfPublicKey() {
|
|
return 0, fmt.Errorf("got data packet from unexpected source, %v", p.Source)
|
|
}
|
|
if !bytes.Equal(p.Data, pkt) {
|
|
return 0, fmt.Errorf("unexpected data packet %q", p.Data)
|
|
}
|
|
}
|
|
return time.Since(t0), nil
|
|
}
|
|
|
|
func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode) (*derphttp.Client, error) {
|
|
// To avoid spamming the log with regular connection messages.
|
|
l := logger.Filtered(log.Printf, func(s string) bool {
|
|
return !strings.Contains(s, "derphttp.Client.Connect: connecting to")
|
|
})
|
|
priv := key.NewNode()
|
|
dc := derphttp.NewRegionClient(priv, l, nil /* no netMon */, func() *tailcfg.DERPRegion {
|
|
rid := n.RegionID
|
|
return &tailcfg.DERPRegion{
|
|
RegionID: rid,
|
|
RegionCode: fmt.Sprintf("%s-%s", dm.Regions[rid].RegionCode, n.Name),
|
|
RegionName: dm.Regions[rid].RegionName,
|
|
Nodes: []*tailcfg.DERPNode{n},
|
|
}
|
|
})
|
|
dc.IsProber = true
|
|
err := dc.Connect(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cs, ok := dc.TLSConnectionState()
|
|
if !ok {
|
|
dc.Close()
|
|
return nil, errors.New("no TLS state")
|
|
}
|
|
if len(cs.PeerCertificates) == 0 {
|
|
dc.Close()
|
|
return nil, errors.New("no peer certificates")
|
|
}
|
|
if cs.ServerName != n.HostName {
|
|
dc.Close()
|
|
return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName)
|
|
}
|
|
|
|
errc := make(chan error, 1)
|
|
go func() {
|
|
m, err := dc.Recv()
|
|
if err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
switch m.(type) {
|
|
case derp.ServerInfoMessage:
|
|
errc <- nil
|
|
default:
|
|
errc <- fmt.Errorf("unexpected first message type %T", errc)
|
|
}
|
|
}()
|
|
select {
|
|
case err := <-errc:
|
|
if err != nil {
|
|
go dc.Close()
|
|
return nil, err
|
|
}
|
|
case <-ctx.Done():
|
|
go dc.Close()
|
|
return nil, fmt.Errorf("timeout waiting for ServerInfoMessage: %w", ctx.Err())
|
|
}
|
|
return dc, nil
|
|
}
|
|
|
|
var httpOrFileClient = &http.Client{Transport: httpOrFileTransport()}
|
|
|
|
func httpOrFileTransport() http.RoundTripper {
|
|
tr := http.DefaultTransport.(*http.Transport).Clone()
|
|
tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
|
|
return tr
|
|
}
|