mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 04:55:31 +00:00
88097b836a
prober uses NewRegionClient() to connect to a derper using a faked up single-node region, but NewRegionClient() fails to connect if there is no non-STUN only client in the region. Set the STUN only flag to false before we call NewRegionClient() so we can monitor nodes marked as STUN only in the default derpmap. Updates #11492 Signed-off-by: Val <valerie@tailscale.com>
413 lines
11 KiB
Go
413 lines
11 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package prober
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
crand "crypto/rand"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"tailscale.com/derp"
|
|
"tailscale.com/derp/derphttp"
|
|
"tailscale.com/net/stun"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
)
|
|
|
|
// derpProber dynamically manages several probes for each DERP server
|
|
// based on the current DERPMap.
|
|
type derpProber struct {
|
|
p *Prober
|
|
derpMapURL string
|
|
udpInterval time.Duration
|
|
meshInterval time.Duration
|
|
tlsInterval time.Duration
|
|
|
|
// Probe functions that can be overridden for testing.
|
|
tlsProbeFn func(string) ProbeFunc
|
|
udpProbeFn func(string, int) ProbeFunc
|
|
meshProbeFn func(string, string) ProbeFunc
|
|
|
|
sync.Mutex
|
|
lastDERPMap *tailcfg.DERPMap
|
|
lastDERPMapAt time.Time
|
|
nodes map[string]*tailcfg.DERPNode
|
|
probes map[string]*Probe
|
|
}
|
|
|
|
// DERP creates a new derpProber.
|
|
func DERP(p *Prober, derpMapURL string, udpInterval, meshInterval, tlsInterval time.Duration) (*derpProber, error) {
|
|
d := &derpProber{
|
|
p: p,
|
|
derpMapURL: derpMapURL,
|
|
udpInterval: udpInterval,
|
|
meshInterval: meshInterval,
|
|
tlsInterval: tlsInterval,
|
|
tlsProbeFn: TLS,
|
|
nodes: make(map[string]*tailcfg.DERPNode),
|
|
probes: make(map[string]*Probe),
|
|
}
|
|
d.udpProbeFn = d.ProbeUDP
|
|
d.meshProbeFn = d.probeMesh
|
|
return d, nil
|
|
}
|
|
|
|
// ProbeMap fetches the DERPMap and creates/destroys probes for each
|
|
// DERP server as necessary. It should get regularly executed as a
|
|
// probe function itself.
|
|
func (d *derpProber) ProbeMap(ctx context.Context) error {
|
|
if err := d.updateMap(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
wantProbes := map[string]bool{}
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
for _, region := range d.lastDERPMap.Regions {
|
|
for _, server := range region.Nodes {
|
|
labels := map[string]string{
|
|
"region": region.RegionCode,
|
|
"region_id": strconv.Itoa(region.RegionID),
|
|
"hostname": server.HostName,
|
|
}
|
|
|
|
n := fmt.Sprintf("derp/%s/%s/tls", region.RegionCode, server.Name)
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP TLS probe for %s (%s)", server.Name, region.RegionName)
|
|
d.probes[n] = d.p.Run(n, d.tlsInterval, labels, d.tlsProbeFn(server.HostName+":443"))
|
|
}
|
|
|
|
for idx, ipStr := range []string{server.IPv6, server.IPv4} {
|
|
n = fmt.Sprintf("derp/%s/%s/udp", region.RegionCode, server.Name)
|
|
if idx == 0 {
|
|
n = n + "6"
|
|
}
|
|
|
|
if ipStr == "" || server.STUNPort == -1 {
|
|
continue
|
|
}
|
|
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP UDP probe for %s (%s)", server.Name, n)
|
|
d.probes[n] = d.p.Run(n, d.udpInterval, labels, d.udpProbeFn(ipStr, server.STUNPort))
|
|
}
|
|
}
|
|
|
|
for _, to := range region.Nodes {
|
|
n = fmt.Sprintf("derp/%s/%s/%s/mesh", region.RegionCode, server.Name, to.Name)
|
|
wantProbes[n] = true
|
|
if d.probes[n] == nil {
|
|
log.Printf("adding DERP mesh probe for %s->%s (%s)", server.Name, to.Name, region.RegionName)
|
|
d.probes[n] = d.p.Run(n, d.meshInterval, labels, d.meshProbeFn(server.HostName, to.HostName))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for n, probe := range d.probes {
|
|
if !wantProbes[n] {
|
|
log.Printf("removing DERP probe %s", n)
|
|
probe.Close()
|
|
delete(d.probes, n)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *derpProber) probeMesh(from, to string) ProbeFunc {
|
|
return func(ctx context.Context) error {
|
|
d.Lock()
|
|
dm := d.lastDERPMap
|
|
fromN, ok := d.nodes[from]
|
|
if !ok {
|
|
d.Unlock()
|
|
return fmt.Errorf("could not find derp node %s", from)
|
|
}
|
|
toN, ok := d.nodes[to]
|
|
if !ok {
|
|
d.Unlock()
|
|
return fmt.Errorf("could not find derp node %s", to)
|
|
}
|
|
d.Unlock()
|
|
|
|
// TODO: instead of ignoring latency, export it as a separate metric.
|
|
_, err := derpProbeNodePair(ctx, dm, fromN, toN)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (d *derpProber) updateMap(ctx context.Context) error {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", d.derpMapURL, nil)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
res, err := httpOrFileClient.Do(req)
|
|
if err != nil {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
if d.lastDERPMap != nil && time.Since(d.lastDERPMapAt) < 10*time.Minute {
|
|
log.Printf("Error while fetching DERP map, using cached one: %s", err)
|
|
// Assume that control is restarting and use
|
|
// the same one for a bit.
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode != 200 {
|
|
return fmt.Errorf("fetching %s: %s", d.derpMapURL, res.Status)
|
|
}
|
|
dm := new(tailcfg.DERPMap)
|
|
if err := json.NewDecoder(res.Body).Decode(dm); err != nil {
|
|
return fmt.Errorf("decoding %s JSON: %v", d.derpMapURL, err)
|
|
}
|
|
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
d.lastDERPMap = dm
|
|
d.lastDERPMapAt = time.Now()
|
|
d.nodes = make(map[string]*tailcfg.DERPNode)
|
|
for _, reg := range d.lastDERPMap.Regions {
|
|
for _, n := range reg.Nodes {
|
|
if existing, ok := d.nodes[n.HostName]; ok {
|
|
return fmt.Errorf("derpmap has duplicate nodes: %+v and %+v", existing, n)
|
|
}
|
|
d.nodes[n.HostName] = n
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeFunc {
|
|
return func(ctx context.Context) error {
|
|
_, err := derpProbeUDP(ctx, ipaddr, port)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func derpProbeUDP(ctx context.Context, ipStr string, port int) (latency time.Duration, err error) {
|
|
pc, err := net.ListenPacket("udp", ":0")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer pc.Close()
|
|
uc := pc.(*net.UDPConn)
|
|
|
|
tx := stun.NewTxID()
|
|
req := stun.Request(tx)
|
|
|
|
if port == 0 {
|
|
port = 3478
|
|
}
|
|
for {
|
|
ip := net.ParseIP(ipStr)
|
|
_, err := uc.WriteToUDP(req, &net.UDPAddr{IP: ip, Port: port})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// Binding requests and responses are fairly small (~40 bytes),
|
|
// but in practice a STUN response can be up to the size of the
|
|
// path MTU, so we use a jumbo frame size buffer here.
|
|
buf := make([]byte, 9000)
|
|
uc.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
t0 := time.Now()
|
|
n, _, err := uc.ReadFromUDP(buf)
|
|
d := time.Since(t0)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return 0, fmt.Errorf("timeout reading from %v: %v", ip, err)
|
|
}
|
|
if d < time.Second {
|
|
return 0, fmt.Errorf("error reading from %v: %v", ip, err)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
txBack, _, err := stun.ParseResponse(buf[:n])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("parsing STUN response from %v: %v", ip, err)
|
|
}
|
|
if txBack != tx {
|
|
return 0, fmt.Errorf("read wrong tx back from %v", ip)
|
|
}
|
|
if latency == 0 || d < latency {
|
|
latency = d
|
|
}
|
|
break
|
|
}
|
|
return latency, nil
|
|
}
|
|
|
|
func derpProbeNodePair(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode) (latency time.Duration, err error) {
|
|
fromc, err := newConn(ctx, dm, from)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer fromc.Close()
|
|
toc, err := newConn(ctx, dm, to)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer toc.Close()
|
|
|
|
// Wait a bit for from's node to hear about to existing on the
|
|
// other node in the region, in the case where the two nodes
|
|
// are different.
|
|
if from.Name != to.Name {
|
|
time.Sleep(100 * time.Millisecond) // pretty arbitrary
|
|
}
|
|
|
|
latency, err = runDerpProbeNodePair(ctx, from, to, fromc, toc)
|
|
if err != nil {
|
|
// Record pubkeys on failed probes to aid investigation.
|
|
err = fmt.Errorf("%s -> %s: %w",
|
|
fromc.SelfPublicKey().ShortString(),
|
|
toc.SelfPublicKey().ShortString(), err)
|
|
}
|
|
return latency, err
|
|
}
|
|
|
|
func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client) (latency time.Duration, err error) {
|
|
// Make a random packet
|
|
pkt := make([]byte, 8)
|
|
crand.Read(pkt)
|
|
|
|
t0 := time.Now()
|
|
|
|
// Send the random packet.
|
|
sendc := make(chan error, 1)
|
|
go func() {
|
|
sendc <- fromc.Send(toc.SelfPublicKey(), pkt)
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, fmt.Errorf("timeout sending via %q: %w", from.Name, ctx.Err())
|
|
case err := <-sendc:
|
|
if err != nil {
|
|
return 0, fmt.Errorf("error sending via %q: %w", from.Name, err)
|
|
}
|
|
}
|
|
|
|
// Receive the random packet.
|
|
recvc := make(chan any, 1) // either derp.ReceivedPacket or error
|
|
go func() {
|
|
for {
|
|
m, err := toc.Recv()
|
|
if err != nil {
|
|
recvc <- err
|
|
return
|
|
}
|
|
switch v := m.(type) {
|
|
case derp.ReceivedPacket:
|
|
recvc <- v
|
|
default:
|
|
log.Printf("%v: ignoring Recv frame type %T", to.Name, v)
|
|
// Loop.
|
|
}
|
|
}
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, fmt.Errorf("timeout receiving from %q: %w", to.Name, ctx.Err())
|
|
case v := <-recvc:
|
|
if err, ok := v.(error); ok {
|
|
return 0, fmt.Errorf("error receiving from %q: %w", to.Name, err)
|
|
}
|
|
p := v.(derp.ReceivedPacket)
|
|
if p.Source != fromc.SelfPublicKey() {
|
|
return 0, fmt.Errorf("got data packet from unexpected source, %v", p.Source)
|
|
}
|
|
if !bytes.Equal(p.Data, pkt) {
|
|
return 0, fmt.Errorf("unexpected data packet %q", p.Data)
|
|
}
|
|
}
|
|
return time.Since(t0), nil
|
|
}
|
|
|
|
func newConn(ctx context.Context, dm *tailcfg.DERPMap, n *tailcfg.DERPNode) (*derphttp.Client, error) {
|
|
// To avoid spamming the log with regular connection messages.
|
|
l := logger.Filtered(log.Printf, func(s string) bool {
|
|
return !strings.Contains(s, "derphttp.Client.Connect: connecting to")
|
|
})
|
|
priv := key.NewNode()
|
|
dc := derphttp.NewRegionClient(priv, l, nil /* no netMon */, func() *tailcfg.DERPRegion {
|
|
rid := n.RegionID
|
|
// Allow the prober to monitor nodes marked as STUN only in the default map
|
|
n.STUNOnly = false
|
|
return &tailcfg.DERPRegion{
|
|
RegionID: rid,
|
|
RegionCode: fmt.Sprintf("%s-%s", dm.Regions[rid].RegionCode, n.Name),
|
|
RegionName: dm.Regions[rid].RegionName,
|
|
Nodes: []*tailcfg.DERPNode{n},
|
|
}
|
|
})
|
|
dc.IsProber = true
|
|
err := dc.Connect(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cs, ok := dc.TLSConnectionState()
|
|
if !ok {
|
|
dc.Close()
|
|
return nil, errors.New("no TLS state")
|
|
}
|
|
if len(cs.PeerCertificates) == 0 {
|
|
dc.Close()
|
|
return nil, errors.New("no peer certificates")
|
|
}
|
|
if cs.ServerName != n.HostName {
|
|
dc.Close()
|
|
return nil, fmt.Errorf("TLS server name %q != derp hostname %q", cs.ServerName, n.HostName)
|
|
}
|
|
|
|
errc := make(chan error, 1)
|
|
go func() {
|
|
m, err := dc.Recv()
|
|
if err != nil {
|
|
errc <- err
|
|
return
|
|
}
|
|
switch m.(type) {
|
|
case derp.ServerInfoMessage:
|
|
errc <- nil
|
|
default:
|
|
errc <- fmt.Errorf("unexpected first message type %T", errc)
|
|
}
|
|
}()
|
|
select {
|
|
case err := <-errc:
|
|
if err != nil {
|
|
go dc.Close()
|
|
return nil, err
|
|
}
|
|
case <-ctx.Done():
|
|
go dc.Close()
|
|
return nil, fmt.Errorf("timeout waiting for ServerInfoMessage: %w", ctx.Err())
|
|
}
|
|
return dc, nil
|
|
}
|
|
|
|
var httpOrFileClient = &http.Client{Transport: httpOrFileTransport()}
|
|
|
|
func httpOrFileTransport() http.RoundTripper {
|
|
tr := http.DefaultTransport.(*http.Transport).Clone()
|
|
tr.RegisterProtocol("file", http.NewFileTransport(http.Dir("/")))
|
|
return tr
|
|
}
|