mirror of
https://github.com/tailscale/tailscale.git
synced 2025-01-10 01:53:49 +00:00
c76a6e5167
Some checks are pending
checklocks / checklocks (push) Waiting to run
CodeQL / Analyze (go) (push) Waiting to run
Dockerfile build / deploy (push) Waiting to run
CI / race-root-integration (1/4) (push) Waiting to run
CI / race-root-integration (2/4) (push) Waiting to run
CI / race-root-integration (3/4) (push) Waiting to run
CI / race-root-integration (4/4) (push) Waiting to run
CI / test (-coverprofile=/tmp/coverage.out, amd64) (push) Waiting to run
CI / test (-race, amd64, 1/3) (push) Waiting to run
CI / test (-race, amd64, 2/3) (push) Waiting to run
CI / test (-race, amd64, 3/3) (push) Waiting to run
CI / test (386) (push) Waiting to run
CI / windows (push) Waiting to run
CI / privileged (push) Waiting to run
CI / vm (push) Waiting to run
CI / race-build (push) Waiting to run
CI / cross (386, linux) (push) Waiting to run
CI / cross (amd64, darwin) (push) Waiting to run
CI / cross (amd64, freebsd) (push) Waiting to run
CI / fuzz (push) Waiting to run
CI / cross (amd64, openbsd) (push) Waiting to run
CI / cross (amd64, windows) (push) Waiting to run
CI / cross (arm, 5, linux) (push) Waiting to run
CI / cross (arm, 7, linux) (push) Waiting to run
CI / cross (arm64, darwin) (push) Waiting to run
CI / cross (arm64, linux) (push) Waiting to run
CI / cross (arm64, windows) (push) Waiting to run
CI / cross (loong64, linux) (push) Waiting to run
CI / ios (push) Waiting to run
CI / depaware (push) Waiting to run
CI / crossmin (amd64, plan9) (push) Waiting to run
CI / crossmin (ppc64, aix) (push) Waiting to run
CI / android (push) Waiting to run
CI / wasm (push) Waiting to run
CI / tailscale_go (push) Waiting to run
CI / go_generate (push) Waiting to run
CI / go_mod_tidy (push) Waiting to run
CI / licenses (push) Waiting to run
CI / staticcheck (386, windows) (push) Waiting to run
CI / staticcheck (amd64, darwin) (push) Waiting to run
CI / staticcheck (amd64, linux) (push) Waiting to run
CI / staticcheck (amd64, windows) (push) Waiting to run
CI / notify_slack (push) Blocked by required conditions
CI / check_mergeability (push) Blocked by required conditions
In f77821fd63 (released in v1.72.0), we made the client tell a DERP server when the connection was not its ideal choice (the first node in its region). But we didn't do anything with that information until now. This adds a metric about how many such connections are on a given derper, and also adds a bit to the PeerPresentFlags bitmask so watchers can identify (and rebalance) them. Updates tailscale/corp#372 Change-Id: Ief8af448750aa6d598e5939a57c062f4e55962be Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2303 lines
68 KiB
Go
2303 lines
68 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package derp
|
|
|
|
// TODO(crawshaw): with predefined serverKey in clients and HMAC on packets we could skip TLS
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/ed25519"
|
|
crand "crypto/rand"
|
|
"crypto/x509"
|
|
"crypto/x509/pkix"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"expvar"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"math"
|
|
"math/big"
|
|
"math/rand/v2"
|
|
"net"
|
|
"net/http"
|
|
"net/netip"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go4.org/mem"
|
|
"golang.org/x/sync/errgroup"
|
|
"tailscale.com/client/tailscale"
|
|
"tailscale.com/disco"
|
|
"tailscale.com/envknob"
|
|
"tailscale.com/metrics"
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/tstime"
|
|
"tailscale.com/tstime/rate"
|
|
"tailscale.com/types/key"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/ctxkey"
|
|
"tailscale.com/util/mak"
|
|
"tailscale.com/util/set"
|
|
"tailscale.com/util/slicesx"
|
|
"tailscale.com/version"
|
|
)
|
|
|
|
// verboseDropKeys is the set of destination public keys that should
|
|
// verbosely log whenever DERP drops a packet.
|
|
var verboseDropKeys = map[key.NodePublic]bool{}
|
|
|
|
// IdealNodeHeader is the HTTP request header sent on DERP HTTP client requests
|
|
// to indicate that they're connecting to their ideal (Region.Nodes[0]) node.
|
|
// The HTTP header value is the name of the node they wish they were connected
|
|
// to. This is an optional header.
|
|
const IdealNodeHeader = "Ideal-Node"
|
|
|
|
// IdealNodeContextKey is the context key used to pass the IdealNodeHeader value
|
|
// from the HTTP handler to the DERP server's Accept method.
|
|
var IdealNodeContextKey = ctxkey.New[string]("ideal-node", "")
|
|
|
|
func init() {
|
|
keys := envknob.String("TS_DEBUG_VERBOSE_DROPS")
|
|
if keys == "" {
|
|
return
|
|
}
|
|
for _, keyStr := range strings.Split(keys, ",") {
|
|
k, err := key.ParseNodePublicUntyped(mem.S(keyStr))
|
|
if err != nil {
|
|
log.Printf("ignoring invalid debug key %q: %v", keyStr, err)
|
|
} else {
|
|
verboseDropKeys[k] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
perClientSendQueueDepth = 32 // packets buffered for sending
|
|
writeTimeout = 2 * time.Second
|
|
privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key
|
|
)
|
|
|
|
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
|
|
// of how duplicate connection for the same key are handled.
|
|
type dupPolicy int8
|
|
|
|
const (
|
|
// lastWriterIsActive is a dupPolicy where the connection
|
|
// to send traffic for a peer is the active one.
|
|
lastWriterIsActive dupPolicy = iota
|
|
|
|
// disableFighters is a dupPolicy that detects if peers
|
|
// are trying to send interleaved with each other and
|
|
// then disables all of them.
|
|
disableFighters
|
|
)
|
|
|
|
type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment
|
|
|
|
// Server is a DERP server.
|
|
type Server struct {
|
|
// WriteTimeout, if non-zero, specifies how long to wait
|
|
// before failing when writing to a client.
|
|
WriteTimeout time.Duration
|
|
|
|
privateKey key.NodePrivate
|
|
publicKey key.NodePublic
|
|
logf logger.Logf
|
|
memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish)
|
|
meshKey string
|
|
limitedLogf logger.Logf
|
|
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
|
|
dupPolicy dupPolicy
|
|
debug bool
|
|
|
|
// Counters:
|
|
packetsSent, bytesSent expvar.Int
|
|
packetsRecv, bytesRecv expvar.Int
|
|
packetsRecvByKind metrics.LabelMap
|
|
packetsRecvDisco *expvar.Int
|
|
packetsRecvOther *expvar.Int
|
|
_ align64
|
|
packetsDropped expvar.Int
|
|
packetsDroppedReason metrics.LabelMap
|
|
packetsDroppedReasonCounters []*expvar.Int // indexed by dropReason
|
|
packetsDroppedType metrics.LabelMap
|
|
packetsDroppedTypeDisco *expvar.Int
|
|
packetsDroppedTypeOther *expvar.Int
|
|
_ align64
|
|
packetsForwardedOut expvar.Int
|
|
packetsForwardedIn expvar.Int
|
|
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
|
|
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
|
|
gotPing expvar.Int // number of ping frames from client
|
|
sentPong expvar.Int // number of pong frames enqueued to client
|
|
accepts expvar.Int
|
|
curClients expvar.Int
|
|
curClientsNotIdeal expvar.Int
|
|
curHomeClients expvar.Int // ones with preferred
|
|
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
|
dupClientConns expvar.Int // current number of connections sharing a public key
|
|
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
|
unknownFrames expvar.Int
|
|
homeMovesIn expvar.Int // established clients announce home server moves in
|
|
homeMovesOut expvar.Int // established clients announce home server moves out
|
|
multiForwarderCreated expvar.Int
|
|
multiForwarderDeleted expvar.Int
|
|
removePktForwardOther expvar.Int
|
|
sclientWriteTimeouts expvar.Int
|
|
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
|
tcpRtt metrics.LabelMap // histogram
|
|
meshUpdateBatchSize *metrics.Histogram
|
|
meshUpdateLoopCount *metrics.Histogram
|
|
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
|
|
|
// verifyClientsLocalTailscaled only accepts client connections to the DERP
|
|
// server if the clientKey is a known peer in the network, as specified by a
|
|
// running tailscaled's client's LocalAPI.
|
|
verifyClientsLocalTailscaled bool
|
|
|
|
verifyClientsURL string
|
|
verifyClientsURLFailOpen bool
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
|
clients map[key.NodePublic]*clientSet
|
|
watchers set.Set[*sclient] // mesh peers
|
|
// clientsMesh tracks all clients in the cluster, both locally
|
|
// and to mesh peers. If the value is nil, that means the
|
|
// peer is only local (and thus in the clients Map, but not
|
|
// remote). If the value is non-nil, it's remote (+ maybe also
|
|
// local).
|
|
clientsMesh map[key.NodePublic]PacketForwarder
|
|
// peerGoneWatchers is the set of watchers that subscribed to a
|
|
// peer disconnecting from the region overall. When a peer
|
|
// is gone from the region, we notify all of these watchers,
|
|
// calling their funcs in a new goroutine.
|
|
peerGoneWatchers map[key.NodePublic]set.HandleSet[func(key.NodePublic)]
|
|
|
|
// maps from netip.AddrPort to a client's public key
|
|
keyOfAddr map[netip.AddrPort]key.NodePublic
|
|
|
|
clock tstime.Clock
|
|
}
|
|
|
|
// clientSet represents 1 or more *sclients.
|
|
//
|
|
// In the common case, client should only have one connection to the
|
|
// DERP server for a given key. When they're connected multiple times,
|
|
// we record their set of connections in dupClientSet and keep their
|
|
// connections open to make them happy (to keep them from spinning,
|
|
// etc) and keep track of which is the latest connection. If only the last
|
|
// is sending traffic, that last one is the active connection and it
|
|
// gets traffic. Otherwise, in the case of a cloned node key, the
|
|
// whole set of dups doesn't receive data frames.
|
|
//
|
|
// All methods should only be called while holding Server.mu.
|
|
//
|
|
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
|
|
// "health_error" frame to them that'll communicate to the end users
|
|
// that they cloned a device key, and we'll also surface it in the
|
|
// admin panel, etc.
|
|
type clientSet struct {
|
|
// activeClient holds the currently active connection for the set. It's nil
|
|
// if there are no connections or the connection is disabled.
|
|
//
|
|
// A pointer to a clientSet can be held by peers for long periods of time
|
|
// without holding Server.mu to avoid mutex contention on Server.mu, only
|
|
// re-acquiring the mutex and checking the clients map if activeClient is
|
|
// nil.
|
|
activeClient atomic.Pointer[sclient]
|
|
|
|
// dup is non-nil if there are multiple connections for the
|
|
// public key. It's nil in the common case of only one
|
|
// client being connected.
|
|
//
|
|
// dup is guarded by Server.mu.
|
|
dup *dupClientSet
|
|
}
|
|
|
|
// Len returns the number of clients in s, which can be
|
|
// 0, 1 (the common case), or more (for buggy or transiently
|
|
// reconnecting clients).
|
|
func (s *clientSet) Len() int {
|
|
if s.dup != nil {
|
|
return len(s.dup.set)
|
|
}
|
|
if s.activeClient.Load() != nil {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// ForeachClient calls f for each client in the set.
|
|
//
|
|
// The Server.mu must be held.
|
|
func (s *clientSet) ForeachClient(f func(*sclient)) {
|
|
if s.dup != nil {
|
|
for c := range s.dup.set {
|
|
f(c)
|
|
}
|
|
} else if c := s.activeClient.Load(); c != nil {
|
|
f(c)
|
|
}
|
|
}
|
|
|
|
// A dupClientSet is a clientSet of more than 1 connection.
|
|
//
|
|
// This can occur in some reasonable cases (temporarily while users
|
|
// are changing networks) or in the case of a cloned key. In the
|
|
// cloned key case, both peers are speaking and the clients get
|
|
// disabled.
|
|
//
|
|
// All fields are guarded by Server.mu.
|
|
type dupClientSet struct {
|
|
// set is the set of connected clients for sclient.key,
|
|
// including the clientSet's active one.
|
|
set set.Set[*sclient]
|
|
|
|
// last is the most recent addition to set, or nil if the most
|
|
// recent one has since disconnected and nobody else has sent
|
|
// data since.
|
|
last *sclient
|
|
|
|
// sendHistory is a log of which members of set have sent
|
|
// frames to the derp server, with adjacent duplicates
|
|
// removed. When a member of set is removed, the same
|
|
// element(s) are removed from sendHistory.
|
|
sendHistory []*sclient
|
|
}
|
|
|
|
func (s *clientSet) pickActiveClient() *sclient {
|
|
d := s.dup
|
|
if d == nil {
|
|
return s.activeClient.Load()
|
|
}
|
|
if d.last != nil && !d.last.isDisabled.Load() {
|
|
return d.last
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// removeClient removes c from s and reports whether it was in s
|
|
// to begin with.
|
|
func (s *dupClientSet) removeClient(c *sclient) bool {
|
|
n := len(s.set)
|
|
delete(s.set, c)
|
|
if s.last == c {
|
|
s.last = nil
|
|
}
|
|
if len(s.set) == n {
|
|
return false
|
|
}
|
|
|
|
trim := s.sendHistory[:0]
|
|
for _, v := range s.sendHistory {
|
|
if s.set.Contains(v) && (len(trim) == 0 || trim[len(trim)-1] != v) {
|
|
trim = append(trim, v)
|
|
}
|
|
}
|
|
for i := len(trim); i < len(s.sendHistory); i++ {
|
|
s.sendHistory[i] = nil
|
|
}
|
|
s.sendHistory = trim
|
|
if s.last == nil && len(s.sendHistory) > 0 {
|
|
s.last = s.sendHistory[len(s.sendHistory)-1]
|
|
}
|
|
return true
|
|
}
|
|
|
|
// PacketForwarder is something that can forward packets.
|
|
//
|
|
// It's mostly an interface for circular dependency reasons; the
|
|
// typical implementation is derphttp.Client. The other implementation
|
|
// is a multiForwarder, which this package creates as needed if a
|
|
// public key gets more than one PacketForwarder registered for it.
|
|
type PacketForwarder interface {
|
|
ForwardPacket(src, dst key.NodePublic, payload []byte) error
|
|
String() string
|
|
}
|
|
|
|
// Conn is the subset of the underlying net.Conn the DERP Server needs.
|
|
// It is a defined type so that non-net connections can be used.
|
|
type Conn interface {
|
|
io.WriteCloser
|
|
LocalAddr() net.Addr
|
|
// The *Deadline methods follow the semantics of net.Conn.
|
|
SetDeadline(time.Time) error
|
|
SetReadDeadline(time.Time) error
|
|
SetWriteDeadline(time.Time) error
|
|
}
|
|
|
|
// NewServer returns a new DERP server. It doesn't listen on its own.
|
|
// Connections are given to it via Server.Accept.
|
|
func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
|
var ms runtime.MemStats
|
|
runtime.ReadMemStats(&ms)
|
|
|
|
s := &Server{
|
|
debug: envknob.Bool("DERP_DEBUG_LOGS"),
|
|
privateKey: privateKey,
|
|
publicKey: privateKey.Public(),
|
|
logf: logf,
|
|
limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
|
|
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
|
|
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
|
|
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
|
clients: map[key.NodePublic]*clientSet{},
|
|
clientsMesh: map[key.NodePublic]PacketForwarder{},
|
|
netConns: map[Conn]chan struct{}{},
|
|
memSys0: ms.Sys,
|
|
watchers: set.Set[*sclient]{},
|
|
peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
|
|
avgQueueDuration: new(uint64),
|
|
tcpRtt: metrics.LabelMap{Label: "le"},
|
|
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
|
|
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
|
|
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
|
|
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
|
|
clock: tstime.StdClock{},
|
|
}
|
|
s.initMetacert()
|
|
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
|
|
s.packetsRecvOther = s.packetsRecvByKind.Get("other")
|
|
|
|
s.packetsDroppedReasonCounters = s.genPacketsDroppedReasonCounters()
|
|
|
|
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
|
|
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
|
|
return s
|
|
}
|
|
|
|
func (s *Server) genPacketsDroppedReasonCounters() []*expvar.Int {
|
|
getMetric := s.packetsDroppedReason.Get
|
|
ret := []*expvar.Int{
|
|
dropReasonUnknownDest: getMetric("unknown_dest"),
|
|
dropReasonUnknownDestOnFwd: getMetric("unknown_dest_on_fwd"),
|
|
dropReasonGoneDisconnected: getMetric("gone_disconnected"),
|
|
dropReasonQueueHead: getMetric("queue_head"),
|
|
dropReasonQueueTail: getMetric("queue_tail"),
|
|
dropReasonWriteError: getMetric("write_error"),
|
|
dropReasonDupClient: getMetric("dup_client"),
|
|
}
|
|
if len(ret) != int(numDropReasons) {
|
|
panic("dropReason metrics out of sync")
|
|
}
|
|
for i := range numDropReasons {
|
|
if ret[i] == nil {
|
|
panic("dropReason metrics out of sync")
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
|
|
// amongst themselves.
|
|
//
|
|
// It must be called before serving begins.
|
|
func (s *Server) SetMeshKey(v string) {
|
|
s.meshKey = v
|
|
}
|
|
|
|
// SetVerifyClients sets whether this DERP server verifies clients through tailscaled.
|
|
//
|
|
// It must be called before serving begins.
|
|
func (s *Server) SetVerifyClient(v bool) {
|
|
s.verifyClientsLocalTailscaled = v
|
|
}
|
|
|
|
// SetVerifyClientURL sets the admission controller URL to use for verifying clients.
|
|
// If empty, all clients are accepted (unless restricted by SetVerifyClient checking
|
|
// against tailscaled).
|
|
func (s *Server) SetVerifyClientURL(v string) {
|
|
s.verifyClientsURL = v
|
|
}
|
|
|
|
// SetVerifyClientURLFailOpen sets whether to allow clients to connect if the
|
|
// admission controller URL is unreachable.
|
|
func (s *Server) SetVerifyClientURLFailOpen(v bool) {
|
|
s.verifyClientsURLFailOpen = v
|
|
}
|
|
|
|
// HasMeshKey reports whether the server is configured with a mesh key.
|
|
func (s *Server) HasMeshKey() bool { return s.meshKey != "" }
|
|
|
|
// MeshKey returns the configured mesh key, if any.
|
|
func (s *Server) MeshKey() string { return s.meshKey }
|
|
|
|
// PrivateKey returns the server's private key.
|
|
func (s *Server) PrivateKey() key.NodePrivate { return s.privateKey }
|
|
|
|
// PublicKey returns the server's public key.
|
|
func (s *Server) PublicKey() key.NodePublic { return s.publicKey }
|
|
|
|
// Close closes the server and waits for the connections to disconnect.
|
|
func (s *Server) Close() error {
|
|
s.mu.Lock()
|
|
wasClosed := s.closed
|
|
s.closed = true
|
|
s.mu.Unlock()
|
|
if wasClosed {
|
|
return nil
|
|
}
|
|
|
|
var closedChs []chan struct{}
|
|
|
|
s.mu.Lock()
|
|
for nc, closed := range s.netConns {
|
|
nc.Close()
|
|
closedChs = append(closedChs, closed)
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
for _, closed := range closedChs {
|
|
<-closed
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) isClosed() bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.closed
|
|
}
|
|
|
|
// IsClientConnectedForTest reports whether the client with specified key is connected.
|
|
// This is used in tests to verify that nodes are connected.
|
|
func (s *Server) IsClientConnectedForTest(k key.NodePublic) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
x, ok := s.clients[k]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return x.activeClient.Load() != nil
|
|
}
|
|
|
|
// Accept adds a new connection to the server and serves it.
|
|
//
|
|
// The provided bufio ReadWriter must be already connected to nc.
|
|
// Accept blocks until the Server is closed or the connection closes
|
|
// on its own.
|
|
//
|
|
// Accept closes nc.
|
|
func (s *Server) Accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
|
|
closed := make(chan struct{})
|
|
|
|
s.mu.Lock()
|
|
s.accepts.Add(1) // while holding s.mu for connNum read on next line
|
|
connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1)
|
|
s.netConns[nc] = closed
|
|
s.mu.Unlock()
|
|
|
|
defer func() {
|
|
nc.Close()
|
|
close(closed)
|
|
|
|
s.mu.Lock()
|
|
delete(s.netConns, nc)
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
if err := s.accept(ctx, nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() {
|
|
s.logf("derp: %s: %v", remoteAddr, err)
|
|
}
|
|
}
|
|
|
|
// initMetacert initialized s.metaCert with a self-signed x509 cert
|
|
// encoding this server's public key and protocol version. cmd/derper
|
|
// then sends this after the Let's Encrypt leaf + intermediate certs
|
|
// after the ServerHello (encrypted in TLS 1.3, not that it matters
|
|
// much).
|
|
//
|
|
// Then the client can save a round trip getting that and can start
|
|
// speaking DERP right away. (We don't use ALPN because that's sent in
|
|
// the clear and we're being paranoid to not look too weird to any
|
|
// middleboxes, given that DERP is an ultimate fallback path). But
|
|
// since the post-ServerHello certs are encrypted we can have the
|
|
// client also use them as a signal to be able to start speaking DERP
|
|
// right away, starting with its identity proof, encrypted to the
|
|
// server's public key.
|
|
//
|
|
// This RTT optimization fails where there's a corp-mandated
|
|
// TLS proxy with corp-mandated root certs on employee machines and
|
|
// and TLS proxy cleans up unnecessary certs. In that case we just fall
|
|
// back to the extra RTT.
|
|
func (s *Server) initMetacert() {
|
|
pub, priv, err := ed25519.GenerateKey(crand.Reader)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
tmpl := &x509.Certificate{
|
|
SerialNumber: big.NewInt(ProtocolVersion),
|
|
Subject: pkix.Name{
|
|
CommonName: fmt.Sprintf("derpkey%s", s.publicKey.UntypedHexString()),
|
|
},
|
|
// Windows requires NotAfter and NotBefore set:
|
|
NotAfter: s.clock.Now().Add(30 * 24 * time.Hour),
|
|
NotBefore: s.clock.Now().Add(-30 * 24 * time.Hour),
|
|
// Per https://github.com/golang/go/issues/51759#issuecomment-1071147836,
|
|
// macOS requires BasicConstraints when subject == issuer:
|
|
BasicConstraintsValid: true,
|
|
}
|
|
cert, err := x509.CreateCertificate(crand.Reader, tmpl, tmpl, pub, priv)
|
|
if err != nil {
|
|
log.Fatalf("CreateCertificate: %v", err)
|
|
}
|
|
s.metaCert = cert
|
|
}
|
|
|
|
// MetaCert returns the server metadata cert that can be sent by the
|
|
// TLS server to let the client skip a round trip during start-up.
|
|
func (s *Server) MetaCert() []byte { return s.metaCert }
|
|
|
|
// registerClient notes that client c is now authenticated and ready for packets.
|
|
//
|
|
// If c.key is connected more than once, the earlier connection(s) are
|
|
// placed in a non-active state where we read from them (primarily to
|
|
// observe EOFs/timeouts) but won't send them frames on the assumption
|
|
// that they're dead.
|
|
func (s *Server) registerClient(c *sclient) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
cs, ok := s.clients[c.key]
|
|
if !ok {
|
|
c.debugLogf("register single client")
|
|
cs = &clientSet{}
|
|
s.clients[c.key] = cs
|
|
}
|
|
was := cs.activeClient.Load()
|
|
if was == nil {
|
|
// Common case.
|
|
} else {
|
|
was.isDup.Store(true)
|
|
c.isDup.Store(true)
|
|
}
|
|
|
|
dup := cs.dup
|
|
if dup == nil && was != nil {
|
|
s.dupClientKeys.Add(1)
|
|
s.dupClientConns.Add(2) // both old and new count
|
|
s.dupClientConnTotal.Add(1)
|
|
dup = &dupClientSet{
|
|
set: set.Of(c, was),
|
|
last: c,
|
|
sendHistory: []*sclient{was},
|
|
}
|
|
cs.dup = dup
|
|
c.debugLogf("register duplicate client")
|
|
} else if dup != nil {
|
|
s.dupClientConns.Add(1) // the gauge
|
|
s.dupClientConnTotal.Add(1) // the counter
|
|
dup.set.Add(c)
|
|
dup.last = c
|
|
dup.sendHistory = append(dup.sendHistory, c)
|
|
c.debugLogf("register another duplicate client")
|
|
}
|
|
|
|
cs.activeClient.Store(c)
|
|
|
|
if _, ok := s.clientsMesh[c.key]; !ok {
|
|
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
|
|
}
|
|
s.keyOfAddr[c.remoteIPPort] = c.key
|
|
s.curClients.Add(1)
|
|
if c.isNotIdealConn {
|
|
s.curClientsNotIdeal.Add(1)
|
|
}
|
|
s.broadcastPeerStateChangeLocked(c.key, c.remoteIPPort, c.presentFlags(), true)
|
|
}
|
|
|
|
// broadcastPeerStateChangeLocked enqueues a message to all watchers
|
|
// (other DERP nodes in the region, or trusted clients) that peer's
|
|
// presence changed.
|
|
//
|
|
// s.mu must be held.
|
|
func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags, present bool) {
|
|
for w := range s.watchers {
|
|
w.peerStateChange = append(w.peerStateChange, peerConnState{
|
|
peer: peer,
|
|
present: present,
|
|
ipPort: ipPort,
|
|
flags: flags,
|
|
})
|
|
go w.requestMeshUpdate()
|
|
}
|
|
}
|
|
|
|
// unregisterClient removes a client from the server.
|
|
func (s *Server) unregisterClient(c *sclient) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
set, ok := s.clients[c.key]
|
|
if !ok {
|
|
c.logf("[unexpected]; clients map is empty")
|
|
return
|
|
}
|
|
|
|
dup := set.dup
|
|
if dup == nil {
|
|
// The common case.
|
|
cur := set.activeClient.Load()
|
|
if cur == nil {
|
|
c.logf("[unexpected]; active client is nil")
|
|
return
|
|
}
|
|
if cur != c {
|
|
c.logf("[unexpected]; active client is not c")
|
|
return
|
|
}
|
|
c.debugLogf("removed connection")
|
|
set.activeClient.Store(nil)
|
|
delete(s.clients, c.key)
|
|
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
|
|
delete(s.clientsMesh, c.key)
|
|
s.notePeerGoneFromRegionLocked(c.key)
|
|
}
|
|
s.broadcastPeerStateChangeLocked(c.key, netip.AddrPort{}, 0, false)
|
|
} else {
|
|
c.debugLogf("removed duplicate client")
|
|
if dup.removeClient(c) {
|
|
s.dupClientConns.Add(-1)
|
|
} else {
|
|
c.logf("[unexpected]; dup client set didn't shrink")
|
|
}
|
|
if dup.set.Len() == 1 {
|
|
// If we drop down to one connection, demote it down
|
|
// to a regular single client (a nil dup set).
|
|
set.dup = nil
|
|
s.dupClientConns.Add(-1) // again; for the original one's
|
|
s.dupClientKeys.Add(-1)
|
|
var remain *sclient
|
|
for remain = range dup.set {
|
|
break
|
|
}
|
|
if remain == nil {
|
|
panic("unexpected nil remain from single element dup set")
|
|
}
|
|
remain.isDisabled.Store(false)
|
|
remain.isDup.Store(false)
|
|
set.activeClient.Store(remain)
|
|
} else {
|
|
// Still a duplicate. Pick a winner.
|
|
set.activeClient.Store(set.pickActiveClient())
|
|
}
|
|
}
|
|
|
|
if c.canMesh {
|
|
delete(s.watchers, c)
|
|
}
|
|
|
|
delete(s.keyOfAddr, c.remoteIPPort)
|
|
|
|
s.curClients.Add(-1)
|
|
if c.preferred {
|
|
s.curHomeClients.Add(-1)
|
|
}
|
|
if c.isNotIdealConn {
|
|
s.curClientsNotIdeal.Add(-1)
|
|
}
|
|
}
|
|
|
|
// addPeerGoneFromRegionWatcher adds a function to be called when peer is gone
|
|
// from the region overall. It returns a handle that can be used to remove the
|
|
// watcher later.
|
|
//
|
|
// The provided f func is usually [sclient.onPeerGoneFromRegion], added by
|
|
// [sclient.noteSendFromSrc]; this func doesn't take a whole *sclient to make it
|
|
// clear what has access to what.
|
|
func (s *Server) addPeerGoneFromRegionWatcher(peer key.NodePublic, f func(key.NodePublic)) set.Handle {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
hset, ok := s.peerGoneWatchers[peer]
|
|
if !ok {
|
|
hset = set.HandleSet[func(key.NodePublic)]{}
|
|
s.peerGoneWatchers[peer] = hset
|
|
}
|
|
return hset.Add(f)
|
|
}
|
|
|
|
// removePeerGoneFromRegionWatcher removes a peer watcher previously added by
|
|
// addPeerGoneFromRegionWatcher, using the handle returned by
|
|
// addPeerGoneFromRegionWatcher.
|
|
func (s *Server) removePeerGoneFromRegionWatcher(peer key.NodePublic, h set.Handle) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
hset, ok := s.peerGoneWatchers[peer]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(hset, h)
|
|
if len(hset) == 0 {
|
|
delete(s.peerGoneWatchers, peer)
|
|
}
|
|
}
|
|
|
|
// notePeerGoneFromRegionLocked sends peerGone frames to parties that
|
|
// key has sent to previously (whether those sends were from a local
|
|
// client or forwarded). It must only be called after the key has
|
|
// been removed from clientsMesh.
|
|
func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) {
|
|
if _, ok := s.clientsMesh[key]; ok {
|
|
panic("usage")
|
|
}
|
|
|
|
// Find still-connected peers and either notify that we've gone away
|
|
// so they can drop their route entries to us (issue 150)
|
|
// or move them over to the active client (in case a replaced client
|
|
// connection is being unregistered).
|
|
set := s.peerGoneWatchers[key]
|
|
for _, f := range set {
|
|
go f(key)
|
|
}
|
|
delete(s.peerGoneWatchers, key)
|
|
}
|
|
|
|
// requestPeerGoneWriteLimited sends a request to write a "peer gone"
|
|
// frame, but only in reply to a disco packet, and only if we haven't
|
|
// sent one recently.
|
|
func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason PeerGoneReasonType) {
|
|
if disco.LooksLikeDiscoWrapper(contents) != true {
|
|
return
|
|
}
|
|
|
|
if c.peerGoneLim.Allow() {
|
|
go c.requestPeerGoneWrite(peer, reason)
|
|
}
|
|
}
|
|
|
|
func (s *Server) addWatcher(c *sclient) {
|
|
if !c.canMesh {
|
|
panic("invariant: addWatcher called without permissions")
|
|
}
|
|
|
|
if c.key == s.publicKey {
|
|
// We're connecting to ourself. Do nothing.
|
|
return
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Queue messages for each already-connected client.
|
|
for peer, clientSet := range s.clients {
|
|
ac := clientSet.activeClient.Load()
|
|
if ac == nil {
|
|
continue
|
|
}
|
|
c.peerStateChange = append(c.peerStateChange, peerConnState{
|
|
peer: peer,
|
|
present: true,
|
|
ipPort: ac.remoteIPPort,
|
|
flags: ac.presentFlags(),
|
|
})
|
|
}
|
|
|
|
// And enroll the watcher in future updates (of both
|
|
// connections & disconnections).
|
|
s.watchers.Add(c)
|
|
|
|
go c.requestMeshUpdate()
|
|
}
|
|
|
|
func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error {
|
|
br := brw.Reader
|
|
nc.SetDeadline(time.Now().Add(10 * time.Second))
|
|
bw := &lazyBufioWriter{w: nc, lbw: brw.Writer}
|
|
if err := s.sendServerKey(bw); err != nil {
|
|
return fmt.Errorf("send server key: %v", err)
|
|
}
|
|
nc.SetDeadline(time.Now().Add(10 * time.Second))
|
|
clientKey, clientInfo, err := s.recvClientKey(br)
|
|
if err != nil {
|
|
return fmt.Errorf("receive client key: %v", err)
|
|
}
|
|
|
|
remoteIPPort, _ := netip.ParseAddrPort(remoteAddr)
|
|
if err := s.verifyClient(ctx, clientKey, clientInfo, remoteIPPort.Addr()); err != nil {
|
|
return fmt.Errorf("client %v rejected: %v", clientKey, err)
|
|
}
|
|
|
|
// At this point we trust the client so we don't time out.
|
|
nc.SetDeadline(time.Time{})
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
c := &sclient{
|
|
connNum: connNum,
|
|
s: s,
|
|
key: clientKey,
|
|
nc: nc,
|
|
br: br,
|
|
bw: bw,
|
|
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v%s: ", remoteAddr, clientKey.ShortString())),
|
|
done: ctx.Done(),
|
|
remoteIPPort: remoteIPPort,
|
|
connectedAt: s.clock.Now(),
|
|
sendQueue: make(chan pkt, perClientSendQueueDepth),
|
|
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
|
|
sendPongCh: make(chan [8]byte, 1),
|
|
peerGone: make(chan peerGoneMsg),
|
|
canMesh: s.isMeshPeer(clientInfo),
|
|
isNotIdealConn: IdealNodeContextKey.Value(ctx) != "",
|
|
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
|
|
}
|
|
|
|
if c.canMesh {
|
|
c.meshUpdate = make(chan struct{}, 1) // must be buffered; >1 is fine but wasteful
|
|
}
|
|
if clientInfo != nil {
|
|
c.info = *clientInfo
|
|
if envknob.Bool("DERP_PROBER_DEBUG_LOGS") && clientInfo.IsProber {
|
|
c.debug = true
|
|
}
|
|
}
|
|
if s.debug {
|
|
c.debug = true
|
|
}
|
|
|
|
s.registerClient(c)
|
|
defer s.unregisterClient(c)
|
|
|
|
err = s.sendServerInfo(c.bw, clientKey)
|
|
if err != nil {
|
|
return fmt.Errorf("send server info: %v", err)
|
|
}
|
|
|
|
return c.run(ctx)
|
|
}
|
|
|
|
func (s *Server) debugLogf(format string, v ...any) {
|
|
if s.debug {
|
|
s.logf(format, v...)
|
|
}
|
|
}
|
|
|
|
// run serves the client until there's an error.
|
|
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
|
|
func (c *sclient) run(ctx context.Context) error {
|
|
// Launch sender, but don't return from run until sender goroutine is done.
|
|
var grp errgroup.Group
|
|
sendCtx, cancelSender := context.WithCancel(ctx)
|
|
grp.Go(func() error { return c.sendLoop(sendCtx) })
|
|
defer func() {
|
|
cancelSender()
|
|
if err := grp.Wait(); err != nil && !c.s.isClosed() {
|
|
if errors.Is(err, context.Canceled) {
|
|
c.debugLogf("sender canceled by reader exiting")
|
|
} else {
|
|
if errors.Is(err, os.ErrDeadlineExceeded) {
|
|
c.s.sclientWriteTimeouts.Add(1)
|
|
}
|
|
c.logf("sender failed: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
c.startStatsLoop(sendCtx)
|
|
|
|
for {
|
|
ft, fl, err := readFrameHeader(c.br)
|
|
c.debugLogf("read frame type %d len %d err %v", ft, fl, err)
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
c.debugLogf("read EOF")
|
|
return nil
|
|
}
|
|
if c.s.isClosed() {
|
|
c.logf("closing; server closed")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err)
|
|
}
|
|
c.s.noteClientActivity(c)
|
|
switch ft {
|
|
case frameNotePreferred:
|
|
err = c.handleFrameNotePreferred(ft, fl)
|
|
case frameSendPacket:
|
|
err = c.handleFrameSendPacket(ft, fl)
|
|
case frameForwardPacket:
|
|
err = c.handleFrameForwardPacket(ft, fl)
|
|
case frameWatchConns:
|
|
err = c.handleFrameWatchConns(ft, fl)
|
|
case frameClosePeer:
|
|
err = c.handleFrameClosePeer(ft, fl)
|
|
case framePing:
|
|
err = c.handleFramePing(ft, fl)
|
|
default:
|
|
err = c.handleUnknownFrame(ft, fl)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error {
|
|
_, err := io.CopyN(io.Discard, c.br, int64(fl))
|
|
return err
|
|
}
|
|
|
|
func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
|
|
if fl != 1 {
|
|
return fmt.Errorf("frameNotePreferred wrong size")
|
|
}
|
|
v, err := c.br.ReadByte()
|
|
if err != nil {
|
|
return fmt.Errorf("frameNotePreferred ReadByte: %v", err)
|
|
}
|
|
c.setPreferred(v != 0)
|
|
return nil
|
|
}
|
|
|
|
func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error {
|
|
if fl != 0 {
|
|
return fmt.Errorf("handleFrameWatchConns wrong size")
|
|
}
|
|
if !c.canMesh {
|
|
return fmt.Errorf("insufficient permissions")
|
|
}
|
|
c.s.addWatcher(c)
|
|
return nil
|
|
}
|
|
|
|
func (c *sclient) handleFramePing(ft frameType, fl uint32) error {
|
|
c.s.gotPing.Add(1)
|
|
var m PingMessage
|
|
if fl < uint32(len(m)) {
|
|
return fmt.Errorf("short ping: %v", fl)
|
|
}
|
|
if fl > 1000 {
|
|
// unreasonably extra large. We leave some extra
|
|
// space for future extensibility, but not too much.
|
|
return fmt.Errorf("ping body too large: %v", fl)
|
|
}
|
|
_, err := io.ReadFull(c.br, m[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if extra := int64(fl) - int64(len(m)); extra > 0 {
|
|
_, err = io.CopyN(io.Discard, c.br, extra)
|
|
}
|
|
select {
|
|
case c.sendPongCh <- [8]byte(m):
|
|
default:
|
|
// They're pinging too fast. Ignore.
|
|
// TODO(bradfitz): add a rate limiter too.
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
|
|
if fl != keyLen {
|
|
return fmt.Errorf("handleFrameClosePeer wrong size")
|
|
}
|
|
if !c.canMesh {
|
|
return fmt.Errorf("insufficient permissions")
|
|
}
|
|
var targetKey key.NodePublic
|
|
if err := targetKey.ReadRawWithoutAllocating(c.br); err != nil {
|
|
return err
|
|
}
|
|
s := c.s
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if set, ok := s.clients[targetKey]; ok {
|
|
if set.Len() == 1 {
|
|
c.logf("frameClosePeer closing peer %x", targetKey)
|
|
} else {
|
|
c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len())
|
|
}
|
|
set.ForeachClient(func(target *sclient) {
|
|
go target.nc.Close()
|
|
})
|
|
} else {
|
|
c.logf("frameClosePeer failed to find peer %x", targetKey)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleFrameForwardPacket reads a "forward packet" frame from the client
|
|
// (which must be a trusted client, a peer in our mesh).
|
|
func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
|
if !c.canMesh {
|
|
return fmt.Errorf("insufficient permissions")
|
|
}
|
|
s := c.s
|
|
|
|
srcKey, dstKey, contents, err := s.recvForwardPacket(c.br, fl)
|
|
if err != nil {
|
|
return fmt.Errorf("client %v: recvForwardPacket: %v", c.key, err)
|
|
}
|
|
s.packetsForwardedIn.Add(1)
|
|
|
|
var dstLen int
|
|
var dst *sclient
|
|
|
|
s.mu.Lock()
|
|
if set, ok := s.clients[dstKey]; ok {
|
|
dstLen = set.Len()
|
|
dst = set.activeClient.Load()
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if dst == nil {
|
|
reason := dropReasonUnknownDestOnFwd
|
|
if dstLen > 1 {
|
|
reason = dropReasonDupClient
|
|
} else {
|
|
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
|
}
|
|
s.recordDrop(contents, srcKey, dstKey, reason)
|
|
return nil
|
|
}
|
|
|
|
dst.debugLogf("received forwarded packet from %s via %s", srcKey.ShortString(), c.key.ShortString())
|
|
|
|
return c.sendPkt(dst, pkt{
|
|
bs: contents,
|
|
enqueuedAt: c.s.clock.Now(),
|
|
src: srcKey,
|
|
})
|
|
}
|
|
|
|
// handleFrameSendPacket reads a "send packet" frame from the client.
|
|
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|
s := c.s
|
|
|
|
dstKey, contents, err := s.recvPacket(c.br, fl)
|
|
if err != nil {
|
|
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
|
|
}
|
|
|
|
var fwd PacketForwarder
|
|
var dstLen int
|
|
var dst *sclient
|
|
|
|
s.mu.Lock()
|
|
if set, ok := s.clients[dstKey]; ok {
|
|
dstLen = set.Len()
|
|
dst = set.activeClient.Load()
|
|
}
|
|
if dst == nil && dstLen < 1 {
|
|
fwd = s.clientsMesh[dstKey]
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if dst == nil {
|
|
if fwd != nil {
|
|
s.packetsForwardedOut.Add(1)
|
|
err := fwd.ForwardPacket(c.key, dstKey, contents)
|
|
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
|
|
if err != nil {
|
|
// TODO:
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
reason := dropReasonUnknownDest
|
|
if dstLen > 1 {
|
|
reason = dropReasonDupClient
|
|
} else {
|
|
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
|
|
}
|
|
s.recordDrop(contents, c.key, dstKey, reason)
|
|
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
|
|
return nil
|
|
}
|
|
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
|
|
|
|
p := pkt{
|
|
bs: contents,
|
|
enqueuedAt: c.s.clock.Now(),
|
|
src: c.key,
|
|
}
|
|
return c.sendPkt(dst, p)
|
|
}
|
|
|
|
func (c *sclient) debugLogf(format string, v ...any) {
|
|
if c.debug {
|
|
c.logf(format, v...)
|
|
}
|
|
}
|
|
|
|
// dropReason is why we dropped a DERP frame.
|
|
type dropReason int
|
|
|
|
//go:generate go run tailscale.com/cmd/addlicense -file dropreason_string.go go run golang.org/x/tools/cmd/stringer -type=dropReason -trimprefix=dropReason
|
|
|
|
const (
|
|
dropReasonUnknownDest dropReason = iota // unknown destination pubkey
|
|
dropReasonUnknownDestOnFwd // unknown destination pubkey on a derp-forwarded packet
|
|
dropReasonGoneDisconnected // destination tailscaled disconnected before we could send
|
|
dropReasonQueueHead // destination queue is full, dropped packet at queue head
|
|
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
|
|
dropReasonWriteError // OS write() failed
|
|
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
|
|
numDropReasons // unused; keep last
|
|
)
|
|
|
|
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
|
|
s.packetsDropped.Add(1)
|
|
s.packetsDroppedReasonCounters[reason].Add(1)
|
|
looksDisco := disco.LooksLikeDiscoWrapper(packetBytes)
|
|
if looksDisco {
|
|
s.packetsDroppedTypeDisco.Add(1)
|
|
} else {
|
|
s.packetsDroppedTypeOther.Add(1)
|
|
}
|
|
if verboseDropKeys[dstKey] {
|
|
// Preformat the log string prior to calling limitedLogf. The
|
|
// limiter acts based on the format string, and we want to
|
|
// rate-limit per src/dst keys, not on the generic "dropped
|
|
// stuff" message.
|
|
msg := fmt.Sprintf("drop (%s) %s -> %s", srcKey.ShortString(), reason, dstKey.ShortString())
|
|
s.limitedLogf(msg)
|
|
}
|
|
s.debugLogf("dropping packet reason=%s dst=%s disco=%v", reason, dstKey, looksDisco)
|
|
}
|
|
|
|
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
|
|
s := c.s
|
|
dstKey := dst.key
|
|
|
|
// Attempt to queue for sending up to 3 times. On each attempt, if
|
|
// the queue is full, try to drop from queue head to prioritize
|
|
// fresher packets.
|
|
sendQueue := dst.sendQueue
|
|
if disco.LooksLikeDiscoWrapper(p.bs) {
|
|
sendQueue = dst.discoSendQueue
|
|
}
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
select {
|
|
case <-dst.done:
|
|
s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected)
|
|
dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt)
|
|
return nil
|
|
default:
|
|
}
|
|
select {
|
|
case sendQueue <- p:
|
|
dst.debugLogf("sendPkt attempt %d enqueued", attempt)
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case pkt := <-sendQueue:
|
|
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
|
|
c.recordQueueTime(pkt.enqueuedAt)
|
|
default:
|
|
}
|
|
}
|
|
// Failed to make room for packet. This can happen in a heavily
|
|
// contended queue with racing writers. Give up and tail-drop in
|
|
// this case to keep reader unblocked.
|
|
s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail)
|
|
dst.debugLogf("sendPkt attempt %d dropped, queue full")
|
|
|
|
return nil
|
|
}
|
|
|
|
// onPeerGoneFromRegion is the callback registered with the Server to be
|
|
// notified (in a new goroutine) whenever a peer has disconnected from all DERP
|
|
// nodes in the current region.
|
|
func (c *sclient) onPeerGoneFromRegion(peer key.NodePublic) {
|
|
c.requestPeerGoneWrite(peer, PeerGoneReasonDisconnected)
|
|
}
|
|
|
|
// requestPeerGoneWrite sends a request to write a "peer gone" frame
|
|
// with an explanation of why it is gone. It blocks until either the
|
|
// write request is scheduled, or the client has closed.
|
|
func (c *sclient) requestPeerGoneWrite(peer key.NodePublic, reason PeerGoneReasonType) {
|
|
select {
|
|
case c.peerGone <- peerGoneMsg{
|
|
peer: peer,
|
|
reason: reason,
|
|
}:
|
|
case <-c.done:
|
|
}
|
|
}
|
|
|
|
// requestMeshUpdate notes that a c's peerStateChange has been appended to and
|
|
// should now be written.
|
|
//
|
|
// It does not block. If a meshUpdate is already pending for this client, it
|
|
// does nothing.
|
|
func (c *sclient) requestMeshUpdate() {
|
|
if !c.canMesh {
|
|
panic("unexpected requestMeshUpdate")
|
|
}
|
|
select {
|
|
case c.meshUpdate <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var localClient tailscale.LocalClient
|
|
|
|
// isMeshPeer reports whether the client is a trusted mesh peer
|
|
// node in the DERP region.
|
|
func (s *Server) isMeshPeer(info *clientInfo) bool {
|
|
return info != nil && info.MeshKey != "" && info.MeshKey == s.meshKey
|
|
}
|
|
|
|
// verifyClient checks whether the client is allowed to connect to the derper,
|
|
// depending on how & whether the server's been configured to verify.
|
|
func (s *Server) verifyClient(ctx context.Context, clientKey key.NodePublic, info *clientInfo, clientIP netip.Addr) error {
|
|
if s.isMeshPeer(info) {
|
|
// Trusted mesh peer. No need to verify further. In fact, verifying
|
|
// further wouldn't work: it's not part of the tailnet so tailscaled and
|
|
// likely the admission control URL wouldn't know about it.
|
|
return nil
|
|
}
|
|
|
|
// tailscaled-based verification:
|
|
if s.verifyClientsLocalTailscaled {
|
|
_, err := localClient.WhoIsNodeKey(ctx, clientKey)
|
|
if err == tailscale.ErrPeerNotFound {
|
|
return fmt.Errorf("peer %v not authorized (not found in local tailscaled)", clientKey)
|
|
}
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "invalid 'addr' parameter") {
|
|
// Issue 12617
|
|
return errors.New("tailscaled version is too old (out of sync with derper binary)")
|
|
}
|
|
return fmt.Errorf("failed to query local tailscaled status for %v: %w", clientKey, err)
|
|
}
|
|
}
|
|
|
|
// admission controller-based verification:
|
|
if s.verifyClientsURL != "" {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
jreq, err := json.Marshal(&tailcfg.DERPAdmitClientRequest{
|
|
NodePublic: clientKey,
|
|
Source: clientIP,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, "POST", s.verifyClientsURL, bytes.NewReader(jreq))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
res, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
if s.verifyClientsURLFailOpen {
|
|
s.logf("admission controller unreachable; allowing client %v", clientKey)
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode != 200 {
|
|
return fmt.Errorf("admission controller: %v", res.Status)
|
|
}
|
|
var jres tailcfg.DERPAdmitClientResponse
|
|
if err := json.NewDecoder(io.LimitReader(res.Body, 4<<10)).Decode(&jres); err != nil {
|
|
return err
|
|
}
|
|
if !jres.Allow {
|
|
return fmt.Errorf("admission controller: %v/%v not allowed", clientKey, clientIP)
|
|
}
|
|
// TODO(bradfitz): add policy for configurable bandwidth rate per client?
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
|
|
buf := make([]byte, 0, len(magic)+key.NodePublicRawLen)
|
|
buf = append(buf, magic...)
|
|
buf = s.publicKey.AppendTo(buf)
|
|
err := writeFrame(lw.bw(), frameServerKey, buf)
|
|
lw.Flush() // redundant (no-op) flush to release bufio.Writer
|
|
return err
|
|
}
|
|
|
|
func (s *Server) noteClientActivity(c *sclient) {
|
|
if !c.isDup.Load() {
|
|
// Fast path for clients that aren't in a dup set.
|
|
return
|
|
}
|
|
if c.isDisabled.Load() {
|
|
// If they're already disabled, no point checking more.
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
cs, ok := s.clients[c.key]
|
|
if !ok {
|
|
return
|
|
}
|
|
dup := cs.dup
|
|
if dup == nil {
|
|
// It became unduped in between the isDup fast path check above
|
|
// and the mutex check. Nothing to do.
|
|
return
|
|
}
|
|
|
|
if s.dupPolicy == lastWriterIsActive {
|
|
dup.last = c
|
|
cs.activeClient.Store(c)
|
|
} else if dup.last == nil {
|
|
// If we didn't have a primary, let the current
|
|
// speaker be the primary.
|
|
dup.last = c
|
|
cs.activeClient.Store(c)
|
|
}
|
|
|
|
if slicesx.LastEqual(dup.sendHistory, c) {
|
|
// The client c was the last client to make activity
|
|
// in this set and it was already recorded. Nothing to
|
|
// do.
|
|
return
|
|
}
|
|
|
|
// If we saw this connection send previously, then consider
|
|
// the group fighting and disable them all.
|
|
if s.dupPolicy == disableFighters {
|
|
for _, prior := range dup.sendHistory {
|
|
if prior == c {
|
|
cs.ForeachClient(func(c *sclient) {
|
|
c.isDisabled.Store(true)
|
|
if cs.activeClient.Load() == c {
|
|
cs.activeClient.Store(nil)
|
|
}
|
|
})
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Append this client to the list of clients who spoke last.
|
|
dup.sendHistory = append(dup.sendHistory, c)
|
|
}
|
|
|
|
type serverInfo struct {
|
|
Version int `json:"version,omitempty"`
|
|
|
|
TokenBucketBytesPerSecond int `json:",omitempty"`
|
|
TokenBucketBytesBurst int `json:",omitempty"`
|
|
}
|
|
|
|
func (s *Server) sendServerInfo(bw *lazyBufioWriter, clientKey key.NodePublic) error {
|
|
msg, err := json.Marshal(serverInfo{Version: ProtocolVersion})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
msgbox := s.privateKey.SealTo(clientKey, msg)
|
|
if err := writeFrameHeader(bw.bw(), frameServerInfo, uint32(len(msgbox))); err != nil {
|
|
return err
|
|
}
|
|
if _, err := bw.Write(msgbox); err != nil {
|
|
return err
|
|
}
|
|
return bw.Flush()
|
|
}
|
|
|
|
// recvClientKey reads the frameClientInfo frame from the client (its
|
|
// proof of identity) upon its initial connection. It should be
|
|
// considered especially untrusted at this point.
|
|
func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info *clientInfo, err error) {
|
|
fl, err := readFrameTypeHeader(br, frameClientInfo)
|
|
if err != nil {
|
|
return zpub, nil, err
|
|
}
|
|
const minLen = keyLen + nonceLen
|
|
if fl < minLen {
|
|
return zpub, nil, errors.New("short client info")
|
|
}
|
|
// We don't trust the client at all yet, so limit its input size to limit
|
|
// things like JSON resource exhausting (http://github.com/golang/go/issues/31789).
|
|
if fl > 256<<10 {
|
|
return zpub, nil, errors.New("long client info")
|
|
}
|
|
if err := clientKey.ReadRawWithoutAllocating(br); err != nil {
|
|
return zpub, nil, err
|
|
}
|
|
msgLen := int(fl - keyLen)
|
|
msgbox := make([]byte, msgLen)
|
|
if _, err := io.ReadFull(br, msgbox); err != nil {
|
|
return zpub, nil, fmt.Errorf("msgbox: %v", err)
|
|
}
|
|
msg, ok := s.privateKey.OpenFrom(clientKey, msgbox)
|
|
if !ok {
|
|
return zpub, nil, fmt.Errorf("msgbox: cannot open len=%d with client key %s", msgLen, clientKey)
|
|
}
|
|
info = new(clientInfo)
|
|
if err := json.Unmarshal(msg, info); err != nil {
|
|
return zpub, nil, fmt.Errorf("msg: %v", err)
|
|
}
|
|
return clientKey, info, nil
|
|
}
|
|
|
|
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
|
|
if frameLen < keyLen {
|
|
return zpub, nil, errors.New("short send packet frame")
|
|
}
|
|
if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
|
|
return zpub, nil, err
|
|
}
|
|
packetLen := frameLen - keyLen
|
|
if packetLen > MaxPacketSize {
|
|
return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
|
|
}
|
|
contents = make([]byte, packetLen)
|
|
if _, err := io.ReadFull(br, contents); err != nil {
|
|
return zpub, nil, err
|
|
}
|
|
s.packetsRecv.Add(1)
|
|
s.bytesRecv.Add(int64(len(contents)))
|
|
if disco.LooksLikeDiscoWrapper(contents) {
|
|
s.packetsRecvDisco.Add(1)
|
|
} else {
|
|
s.packetsRecvOther.Add(1)
|
|
}
|
|
return dstKey, contents, nil
|
|
}
|
|
|
|
// zpub is the key.NodePublic zero value.
|
|
var zpub key.NodePublic
|
|
|
|
func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, dstKey key.NodePublic, contents []byte, err error) {
|
|
if frameLen < keyLen*2 {
|
|
return zpub, zpub, nil, errors.New("short send packet frame")
|
|
}
|
|
if err := srcKey.ReadRawWithoutAllocating(br); err != nil {
|
|
return zpub, zpub, nil, err
|
|
}
|
|
if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
|
|
return zpub, zpub, nil, err
|
|
}
|
|
packetLen := frameLen - keyLen*2
|
|
if packetLen > MaxPacketSize {
|
|
return zpub, zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
|
|
}
|
|
contents = make([]byte, packetLen)
|
|
if _, err := io.ReadFull(br, contents); err != nil {
|
|
return zpub, zpub, nil, err
|
|
}
|
|
// TODO: was s.packetsRecv.Add(1)
|
|
// TODO: was s.bytesRecv.Add(int64(len(contents)))
|
|
return srcKey, dstKey, contents, nil
|
|
}
|
|
|
|
// sclient is a client connection to the server.
|
|
//
|
|
// A node (a wireguard public key) can be connected multiple times to a DERP server
|
|
// and thus have multiple sclient instances. An sclient represents
|
|
// only one of these possibly multiple connections. See clientSet for the
|
|
// type that represents the set of all connections for a given key.
|
|
//
|
|
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
|
|
type sclient struct {
|
|
// Static after construction.
|
|
connNum int64 // process-wide unique counter, incremented each Accept
|
|
s *Server
|
|
nc Conn
|
|
key key.NodePublic
|
|
info clientInfo
|
|
logf logger.Logf
|
|
done <-chan struct{} // closed when connection closes
|
|
remoteIPPort netip.AddrPort // zero if remoteAddr is not ip:port.
|
|
sendQueue chan pkt // packets queued to this client; never closed
|
|
discoSendQueue chan pkt // important packets queued to this client; never closed
|
|
sendPongCh chan [8]byte // pong replies to send to the client; never closed
|
|
peerGone chan peerGoneMsg // write request that a peer is not at this server (not used by mesh peers)
|
|
meshUpdate chan struct{} // write request to write peerStateChange
|
|
canMesh bool // clientInfo had correct mesh token for inter-region routing
|
|
isNotIdealConn bool // client indicated it is not its ideal node in the region
|
|
isDup atomic.Bool // whether more than 1 sclient for key is connected
|
|
isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups
|
|
debug bool // turn on for verbose logging
|
|
|
|
// Owned by run, not thread-safe.
|
|
br *bufio.Reader
|
|
connectedAt time.Time
|
|
preferred bool
|
|
|
|
// Owned by sendLoop, not thread-safe.
|
|
sawSrc map[key.NodePublic]set.Handle
|
|
bw *lazyBufioWriter
|
|
|
|
// Guarded by s.mu
|
|
//
|
|
// peerStateChange is used by mesh peers (a set of regional
|
|
// DERP servers) and contains records that need to be sent to
|
|
// the client for them to update their map of who's connected
|
|
// to this node.
|
|
peerStateChange []peerConnState
|
|
|
|
// peerGoneLimiter limits how often the server will inform a
|
|
// client that it's trying to establish a direct connection
|
|
// through us with a peer we have no record of.
|
|
peerGoneLim *rate.Limiter
|
|
}
|
|
|
|
func (c *sclient) presentFlags() PeerPresentFlags {
|
|
var f PeerPresentFlags
|
|
if c.info.IsProber {
|
|
f |= PeerPresentIsProber
|
|
}
|
|
if c.canMesh {
|
|
f |= PeerPresentIsMeshPeer
|
|
}
|
|
if c.isNotIdealConn {
|
|
f |= PeerPresentNotIdeal
|
|
}
|
|
if f == 0 {
|
|
return PeerPresentIsRegular
|
|
}
|
|
return f
|
|
}
|
|
|
|
// peerConnState represents whether a peer is connected to the server
|
|
// or not.
|
|
type peerConnState struct {
|
|
ipPort netip.AddrPort // if present, the peer's IP:port
|
|
peer key.NodePublic
|
|
flags PeerPresentFlags
|
|
present bool
|
|
}
|
|
|
|
// pkt is a request to write a data frame to an sclient.
|
|
type pkt struct {
|
|
// enqueuedAt is when a packet was put onto a queue before it was sent,
|
|
// and is used for reporting metrics on the duration of packets in the queue.
|
|
enqueuedAt time.Time
|
|
|
|
// bs is the data packet bytes.
|
|
// The memory is owned by pkt.
|
|
bs []byte
|
|
|
|
// src is the who's the sender of the packet.
|
|
src key.NodePublic
|
|
}
|
|
|
|
// peerGoneMsg is a request to write a peerGone frame to an sclient
|
|
type peerGoneMsg struct {
|
|
peer key.NodePublic
|
|
reason PeerGoneReasonType
|
|
}
|
|
|
|
func (c *sclient) setPreferred(v bool) {
|
|
if c.preferred == v {
|
|
return
|
|
}
|
|
c.preferred = v
|
|
var homeMove *expvar.Int
|
|
if v {
|
|
c.s.curHomeClients.Add(1)
|
|
homeMove = &c.s.homeMovesIn
|
|
} else {
|
|
c.s.curHomeClients.Add(-1)
|
|
homeMove = &c.s.homeMovesOut
|
|
}
|
|
|
|
// Keep track of varz for home serve moves in/out. But ignore
|
|
// the initial packet set when a client connects, which we
|
|
// assume happens within 5 seconds. In any case, just for
|
|
// graphs, so not important to miss a move. But it shouldn't:
|
|
// the netcheck/re-STUNs in magicsock only happen about every
|
|
// 30 seconds.
|
|
if c.s.clock.Since(c.connectedAt) > 5*time.Second {
|
|
homeMove.Add(1)
|
|
}
|
|
}
|
|
|
|
// expMovingAverage returns the new moving average given the previous average,
|
|
// a new value, and an alpha decay factor.
|
|
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
|
|
func expMovingAverage(prev, newValue, alpha float64) float64 {
|
|
return alpha*newValue + (1-alpha)*prev
|
|
}
|
|
|
|
// recordQueueTime updates the average queue duration metric after a packet has been sent.
|
|
func (c *sclient) recordQueueTime(enqueuedAt time.Time) {
|
|
elapsed := float64(c.s.clock.Since(enqueuedAt).Milliseconds())
|
|
for {
|
|
old := atomic.LoadUint64(c.s.avgQueueDuration)
|
|
newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1)
|
|
if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// onSendLoopDone is called when the send loop is done
|
|
// to clean up.
|
|
//
|
|
// It must only be called from the sendLoop goroutine.
|
|
func (c *sclient) onSendLoopDone() {
|
|
// If the sender shuts down unilaterally due to an error, close so
|
|
// that the receive loop unblocks and cleans up the rest.
|
|
c.nc.Close()
|
|
|
|
// Clean up watches.
|
|
for peer, h := range c.sawSrc {
|
|
c.s.removePeerGoneFromRegionWatcher(peer, h)
|
|
}
|
|
|
|
// Drain the send queue to count dropped packets
|
|
for {
|
|
select {
|
|
case pkt := <-c.sendQueue:
|
|
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
|
case pkt := <-c.discoSendQueue:
|
|
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (c *sclient) sendLoop(ctx context.Context) error {
|
|
defer c.onSendLoopDone()
|
|
|
|
jitter := rand.N(5 * time.Second)
|
|
keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(keepAlive + jitter)
|
|
defer keepAliveTick.Stop()
|
|
|
|
var werr error // last write error
|
|
inBatch := -1 // for bufferedWriteFrames
|
|
for {
|
|
if werr != nil {
|
|
return werr
|
|
}
|
|
inBatch++
|
|
// First, a non-blocking select (with a default) that
|
|
// does as many non-flushing writes as possible.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case msg := <-c.peerGone:
|
|
werr = c.sendPeerGone(msg.peer, msg.reason)
|
|
continue
|
|
case <-c.meshUpdate:
|
|
werr = c.sendMeshUpdates()
|
|
continue
|
|
case msg := <-c.sendQueue:
|
|
werr = c.sendPacket(msg.src, msg.bs)
|
|
c.recordQueueTime(msg.enqueuedAt)
|
|
continue
|
|
case msg := <-c.discoSendQueue:
|
|
werr = c.sendPacket(msg.src, msg.bs)
|
|
c.recordQueueTime(msg.enqueuedAt)
|
|
continue
|
|
case msg := <-c.sendPongCh:
|
|
werr = c.sendPong(msg)
|
|
continue
|
|
case <-keepAliveTickChannel:
|
|
werr = c.sendKeepAlive()
|
|
continue
|
|
default:
|
|
// Flush any writes from the 3 sends above, or from
|
|
// the blocking loop below.
|
|
if werr = c.bw.Flush(); werr != nil {
|
|
return werr
|
|
}
|
|
if inBatch != 0 { // the first loop will almost always hit default & be size zero
|
|
c.s.bufferedWriteFrames.Observe(float64(inBatch))
|
|
inBatch = 0
|
|
}
|
|
}
|
|
|
|
// Then a blocking select with same:
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case msg := <-c.peerGone:
|
|
werr = c.sendPeerGone(msg.peer, msg.reason)
|
|
case <-c.meshUpdate:
|
|
werr = c.sendMeshUpdates()
|
|
case msg := <-c.sendQueue:
|
|
werr = c.sendPacket(msg.src, msg.bs)
|
|
c.recordQueueTime(msg.enqueuedAt)
|
|
case msg := <-c.discoSendQueue:
|
|
werr = c.sendPacket(msg.src, msg.bs)
|
|
c.recordQueueTime(msg.enqueuedAt)
|
|
case msg := <-c.sendPongCh:
|
|
werr = c.sendPong(msg)
|
|
case <-keepAliveTickChannel:
|
|
werr = c.sendKeepAlive()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *sclient) setWriteDeadline() {
|
|
d := writeTimeout
|
|
if c.canMesh {
|
|
// Trusted peers get more tolerance.
|
|
//
|
|
// The "canMesh" is a bit of a misnomer; mesh peers typically run over a
|
|
// different interface for a per-region private VPC and are not
|
|
// throttled. But monitoring software elsewhere over the internet also
|
|
// use the private mesh key to subscribe to connect/disconnect events
|
|
// and might hit throttling and need more time to get the initial dump
|
|
// of connected peers.
|
|
d = privilegedWriteTimeout
|
|
}
|
|
c.nc.SetWriteDeadline(time.Now().Add(d))
|
|
}
|
|
|
|
// sendKeepAlive sends a keep-alive frame, without flushing.
|
|
func (c *sclient) sendKeepAlive() error {
|
|
c.setWriteDeadline()
|
|
return writeFrameHeader(c.bw.bw(), frameKeepAlive, 0)
|
|
}
|
|
|
|
// sendPong sends a pong reply, without flushing.
|
|
func (c *sclient) sendPong(data [8]byte) error {
|
|
c.s.sentPong.Add(1)
|
|
c.setWriteDeadline()
|
|
if err := writeFrameHeader(c.bw.bw(), framePong, uint32(len(data))); err != nil {
|
|
return err
|
|
}
|
|
_, err := c.bw.Write(data[:])
|
|
return err
|
|
}
|
|
|
|
const (
|
|
peerGoneFrameLen = keyLen + 1
|
|
peerPresentFrameLen = keyLen + 16 + 2 + 1 // 16 byte IP + 2 byte port + 1 byte flags
|
|
)
|
|
|
|
// sendPeerGone sends a peerGone frame, without flushing.
|
|
func (c *sclient) sendPeerGone(peer key.NodePublic, reason PeerGoneReasonType) error {
|
|
switch reason {
|
|
case PeerGoneReasonDisconnected:
|
|
c.s.peerGoneDisconnectedFrames.Add(1)
|
|
case PeerGoneReasonNotHere:
|
|
c.s.peerGoneNotHereFrames.Add(1)
|
|
}
|
|
c.setWriteDeadline()
|
|
data := make([]byte, 0, peerGoneFrameLen)
|
|
data = peer.AppendTo(data)
|
|
data = append(data, byte(reason))
|
|
if err := writeFrameHeader(c.bw.bw(), framePeerGone, uint32(len(data))); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := c.bw.Write(data)
|
|
return err
|
|
}
|
|
|
|
// sendPeerPresent sends a peerPresent frame, without flushing.
|
|
func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags) error {
|
|
c.setWriteDeadline()
|
|
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, peerPresentFrameLen); err != nil {
|
|
return err
|
|
}
|
|
payload := make([]byte, peerPresentFrameLen)
|
|
_ = peer.AppendTo(payload[:0])
|
|
a16 := ipPort.Addr().As16()
|
|
copy(payload[keyLen:], a16[:])
|
|
binary.BigEndian.PutUint16(payload[keyLen+16:], ipPort.Port())
|
|
payload[keyLen+18] = byte(flags)
|
|
_, err := c.bw.Write(payload)
|
|
return err
|
|
}
|
|
|
|
// sendMeshUpdates drains all mesh peerStateChange entries into the write buffer
|
|
// without flushing.
|
|
func (c *sclient) sendMeshUpdates() error {
|
|
var lastBatch []peerConnState // memory to best effort reuse
|
|
|
|
// takeAll returns c.peerStateChange and empties it.
|
|
takeAll := func() []peerConnState {
|
|
c.s.mu.Lock()
|
|
defer c.s.mu.Unlock()
|
|
if len(c.peerStateChange) == 0 {
|
|
return nil
|
|
}
|
|
batch := c.peerStateChange
|
|
if cap(lastBatch) > 16 {
|
|
lastBatch = nil
|
|
}
|
|
c.peerStateChange = lastBatch[:0]
|
|
return batch
|
|
}
|
|
|
|
for loops := 0; ; loops++ {
|
|
batch := takeAll()
|
|
if len(batch) == 0 {
|
|
c.s.meshUpdateLoopCount.Observe(float64(loops))
|
|
return nil
|
|
}
|
|
c.s.meshUpdateBatchSize.Observe(float64(len(batch)))
|
|
|
|
for _, pcs := range batch {
|
|
var err error
|
|
if pcs.present {
|
|
err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags)
|
|
} else {
|
|
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
lastBatch = batch
|
|
}
|
|
}
|
|
|
|
// sendPacket writes contents to the client in a RecvPacket frame. If
|
|
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
|
|
// DERPv2. The bytes of contents are only valid until this function
|
|
// returns, do not retain slices.
|
|
// It does not flush its bufio.Writer.
|
|
func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) {
|
|
defer func() {
|
|
// Stats update.
|
|
if err != nil {
|
|
c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError)
|
|
} else {
|
|
c.s.packetsSent.Add(1)
|
|
c.s.bytesSent.Add(int64(len(contents)))
|
|
}
|
|
c.debugLogf("sendPacket from %s: %v", srcKey.ShortString(), err)
|
|
}()
|
|
|
|
c.setWriteDeadline()
|
|
|
|
withKey := !srcKey.IsZero()
|
|
pktLen := len(contents)
|
|
if withKey {
|
|
pktLen += key.NodePublicRawLen
|
|
c.noteSendFromSrc(srcKey)
|
|
}
|
|
if err = writeFrameHeader(c.bw.bw(), frameRecvPacket, uint32(pktLen)); err != nil {
|
|
return err
|
|
}
|
|
if withKey {
|
|
if err := srcKey.WriteRawWithoutAllocating(c.bw.bw()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
_, err = c.bw.Write(contents)
|
|
return err
|
|
}
|
|
|
|
// noteSendFromSrc notes that we are about to write a packet
|
|
// from src to sclient.
|
|
//
|
|
// It must only be called from the sendLoop goroutine.
|
|
func (c *sclient) noteSendFromSrc(src key.NodePublic) {
|
|
if _, ok := c.sawSrc[src]; ok {
|
|
return
|
|
}
|
|
h := c.s.addPeerGoneFromRegionWatcher(src, c.onPeerGoneFromRegion)
|
|
mak.Set(&c.sawSrc, src, h)
|
|
}
|
|
|
|
// AddPacketForwarder registers fwd as a packet forwarder for dst.
|
|
// fwd must be comparable.
|
|
func (s *Server) AddPacketForwarder(dst key.NodePublic, fwd PacketForwarder) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if prev, ok := s.clientsMesh[dst]; ok {
|
|
if prev == fwd {
|
|
// Duplicate registration of same forwarder. Ignore.
|
|
return
|
|
}
|
|
if m, ok := prev.(*multiForwarder); ok {
|
|
if _, ok := m.all[fwd]; ok {
|
|
// Duplicate registration of same forwarder in set; ignore.
|
|
return
|
|
}
|
|
m.add(fwd)
|
|
return
|
|
}
|
|
if prev != nil {
|
|
// Otherwise, the existing value is not a set,
|
|
// not a dup, and not local-only (nil) so make
|
|
// it a set. `prev` existed first, so will have higher
|
|
// priority.
|
|
fwd = newMultiForwarder(prev, fwd)
|
|
s.multiForwarderCreated.Add(1)
|
|
}
|
|
}
|
|
s.clientsMesh[dst] = fwd
|
|
}
|
|
|
|
// RemovePacketForwarder removes fwd as a packet forwarder for dst.
|
|
// fwd must be comparable.
|
|
func (s *Server) RemovePacketForwarder(dst key.NodePublic, fwd PacketForwarder) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
v, ok := s.clientsMesh[dst]
|
|
if !ok {
|
|
return
|
|
}
|
|
if m, ok := v.(*multiForwarder); ok {
|
|
if len(m.all) < 2 {
|
|
panic("unexpected")
|
|
}
|
|
if remain, isLast := m.deleteLocked(fwd); isLast {
|
|
// If fwd was in m and we no longer need to be a
|
|
// multiForwarder, replace the entry with the
|
|
// remaining PacketForwarder.
|
|
s.clientsMesh[dst] = remain
|
|
s.multiForwarderDeleted.Add(1)
|
|
}
|
|
return
|
|
}
|
|
if v != fwd {
|
|
s.removePktForwardOther.Add(1)
|
|
// Delete of an entry that wasn't in the
|
|
// map. Harmless, so ignore.
|
|
// (This might happen if a user is moving around
|
|
// between nodes and/or the server sent duplicate
|
|
// connection change broadcasts.)
|
|
return
|
|
}
|
|
|
|
if _, isLocal := s.clients[dst]; isLocal {
|
|
s.clientsMesh[dst] = nil
|
|
} else {
|
|
delete(s.clientsMesh, dst)
|
|
s.notePeerGoneFromRegionLocked(dst)
|
|
}
|
|
}
|
|
|
|
// multiForwarder is a PacketForwarder that represents a set of
|
|
// forwarding options. It's used in the rare cases that a client is
|
|
// connected to multiple DERP nodes in a region. That shouldn't really
|
|
// happen except for perhaps during brief moments while the client is
|
|
// reconfiguring, in which case we don't want to forget where the
|
|
// client is. The map value is unique connection number; the lowest
|
|
// one has been seen the longest. It's used to make sure we forward
|
|
// packets consistently to the same node and don't pick randomly.
|
|
type multiForwarder struct {
|
|
fwd syncs.AtomicValue[PacketForwarder] // preferred forwarder.
|
|
all map[PacketForwarder]uint8 // all forwarders, protected by s.mu.
|
|
}
|
|
|
|
// newMultiForwarder creates a new multiForwarder.
|
|
// The first PacketForwarder passed to this function will be the preferred one.
|
|
func newMultiForwarder(fwds ...PacketForwarder) *multiForwarder {
|
|
f := &multiForwarder{all: make(map[PacketForwarder]uint8)}
|
|
f.fwd.Store(fwds[0])
|
|
for idx, fwd := range fwds {
|
|
f.all[fwd] = uint8(idx)
|
|
}
|
|
return f
|
|
}
|
|
|
|
// add adds a new forwarder to the map with a connection number that
|
|
// is higher than the existing ones.
|
|
func (f *multiForwarder) add(fwd PacketForwarder) {
|
|
var max uint8
|
|
for _, v := range f.all {
|
|
if v > max {
|
|
max = v
|
|
}
|
|
}
|
|
f.all[fwd] = max + 1
|
|
}
|
|
|
|
// deleteLocked removes a packet forwarder from the map. It expects Server.mu to be held.
|
|
// If only one forwarder remains after the removal, it will be returned alongside a `true` boolean value.
|
|
func (f *multiForwarder) deleteLocked(fwd PacketForwarder) (_ PacketForwarder, isLast bool) {
|
|
delete(f.all, fwd)
|
|
|
|
if fwd == f.fwd.Load() {
|
|
// The preferred forwarder has been removed, choose a new one
|
|
// based on the lowest index.
|
|
var lowestfwd PacketForwarder
|
|
var lowest uint8
|
|
for k, v := range f.all {
|
|
if lowestfwd == nil || v < lowest {
|
|
lowestfwd = k
|
|
lowest = v
|
|
}
|
|
}
|
|
if lowestfwd != nil {
|
|
f.fwd.Store(lowestfwd)
|
|
}
|
|
}
|
|
|
|
if len(f.all) == 1 {
|
|
for k := range f.all {
|
|
return k, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func (f *multiForwarder) ForwardPacket(src, dst key.NodePublic, payload []byte) error {
|
|
return f.fwd.Load().ForwardPacket(src, dst, payload)
|
|
}
|
|
|
|
func (f *multiForwarder) String() string {
|
|
return fmt.Sprintf("<MultiForwarder fwd=%s total=%d>", f.fwd.Load(), len(f.all))
|
|
}
|
|
|
|
func (s *Server) expVarFunc(f func() any) expvar.Func {
|
|
return expvar.Func(func() any {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return f()
|
|
})
|
|
}
|
|
|
|
// ExpVar returns an expvar variable suitable for registering with expvar.Publish.
|
|
func (s *Server) ExpVar() expvar.Var {
|
|
m := new(metrics.Set)
|
|
m.Set("gauge_memstats_sys0", expvar.Func(func() any { return int64(s.memSys0) }))
|
|
m.Set("gauge_watchers", s.expVarFunc(func() any { return len(s.watchers) }))
|
|
m.Set("gauge_current_file_descriptors", expvar.Func(func() any { return metrics.CurrentFDs() }))
|
|
m.Set("gauge_current_connections", &s.curClients)
|
|
m.Set("gauge_current_home_connections", &s.curHomeClients)
|
|
m.Set("gauge_current_notideal_connections", &s.curClientsNotIdeal)
|
|
m.Set("gauge_clients_total", expvar.Func(func() any { return len(s.clientsMesh) }))
|
|
m.Set("gauge_clients_local", expvar.Func(func() any { return len(s.clients) }))
|
|
m.Set("gauge_clients_remote", expvar.Func(func() any { return len(s.clientsMesh) - len(s.clients) }))
|
|
m.Set("gauge_current_dup_client_keys", &s.dupClientKeys)
|
|
m.Set("gauge_current_dup_client_conns", &s.dupClientConns)
|
|
m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal)
|
|
m.Set("accepts", &s.accepts)
|
|
m.Set("bytes_received", &s.bytesRecv)
|
|
m.Set("bytes_sent", &s.bytesSent)
|
|
m.Set("packets_dropped", &s.packetsDropped)
|
|
m.Set("counter_packets_dropped_reason", &s.packetsDroppedReason)
|
|
m.Set("counter_packets_dropped_type", &s.packetsDroppedType)
|
|
m.Set("counter_packets_received_kind", &s.packetsRecvByKind)
|
|
m.Set("packets_sent", &s.packetsSent)
|
|
m.Set("packets_received", &s.packetsRecv)
|
|
m.Set("unknown_frames", &s.unknownFrames)
|
|
m.Set("home_moves_in", &s.homeMovesIn)
|
|
m.Set("home_moves_out", &s.homeMovesOut)
|
|
m.Set("got_ping", &s.gotPing)
|
|
m.Set("sent_pong", &s.sentPong)
|
|
m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames)
|
|
m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames)
|
|
m.Set("packets_forwarded_out", &s.packetsForwardedOut)
|
|
m.Set("packets_forwarded_in", &s.packetsForwardedIn)
|
|
m.Set("multiforwarder_created", &s.multiForwarderCreated)
|
|
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
|
|
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
|
|
m.Set("sclient_write_timeouts", &s.sclientWriteTimeouts)
|
|
m.Set("average_queue_duration_ms", expvar.Func(func() any {
|
|
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
|
|
}))
|
|
m.Set("counter_tcp_rtt", &s.tcpRtt)
|
|
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
|
|
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
|
|
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
|
|
var expvarVersion expvar.String
|
|
expvarVersion.Set(version.Long())
|
|
m.Set("version", &expvarVersion)
|
|
return m
|
|
}
|
|
|
|
func (s *Server) ConsistencyCheck() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var errs []string
|
|
|
|
var nilMeshNotInClient int
|
|
for k, f := range s.clientsMesh {
|
|
if f == nil {
|
|
if _, ok := s.clients[k]; !ok {
|
|
nilMeshNotInClient++
|
|
}
|
|
}
|
|
}
|
|
if nilMeshNotInClient != 0 {
|
|
errs = append(errs, fmt.Sprintf("%d s.clientsMesh keys not in s.clients", nilMeshNotInClient))
|
|
}
|
|
|
|
var clientNotInMesh int
|
|
for k := range s.clients {
|
|
if _, ok := s.clientsMesh[k]; !ok {
|
|
clientNotInMesh++
|
|
}
|
|
}
|
|
if clientNotInMesh != 0 {
|
|
errs = append(errs, fmt.Sprintf("%d s.clients keys not in s.clientsMesh", clientNotInMesh))
|
|
}
|
|
|
|
if s.curClients.Value() != int64(len(s.clients)) {
|
|
errs = append(errs, fmt.Sprintf("expvar connections = %d != clients map says of %d",
|
|
s.curClients.Value(),
|
|
len(s.clients)))
|
|
}
|
|
|
|
if s.verifyClientsLocalTailscaled {
|
|
if err := s.checkVerifyClientsLocalTailscaled(); err != nil {
|
|
errs = append(errs, err.Error())
|
|
}
|
|
}
|
|
|
|
if len(errs) == 0 {
|
|
return nil
|
|
}
|
|
return errors.New(strings.Join(errs, ", "))
|
|
}
|
|
|
|
// checkVerifyClientsLocalTailscaled checks that a verifyClients call can be made successfully for the derper hosts own node key.
|
|
func (s *Server) checkVerifyClientsLocalTailscaled() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
status, err := localClient.StatusWithoutPeers(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("localClient.Status: %w", err)
|
|
}
|
|
info := &clientInfo{
|
|
IsProber: true,
|
|
}
|
|
clientIP := netip.IPv6Loopback()
|
|
if err := s.verifyClient(ctx, status.Self.PublicKey, info, clientIP); err != nil {
|
|
return fmt.Errorf("verifyClient for self nodekey: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const minTimeBetweenLogs = 2 * time.Second
|
|
|
|
// BytesSentRecv records the number of bytes that have been sent since the last traffic check
|
|
// for a given process, as well as the public key of the process sending those bytes.
|
|
type BytesSentRecv struct {
|
|
Sent uint64
|
|
Recv uint64
|
|
// Key is the public key of the client which sent/received these bytes.
|
|
Key key.NodePublic
|
|
}
|
|
|
|
// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
|
|
// Separated out for ease of testing.
|
|
func parseSSOutput(raw string) map[netip.AddrPort]BytesSentRecv {
|
|
newState := map[netip.AddrPort]BytesSentRecv{}
|
|
// parse every 2 lines and get src and dst ips, and kv pairs
|
|
lines := strings.Split(raw, "\n")
|
|
for i := 0; i < len(lines); i += 2 {
|
|
ipInfo := strings.Fields(strings.TrimSpace(lines[i]))
|
|
if len(ipInfo) < 5 {
|
|
continue
|
|
}
|
|
src, err := netip.ParseAddrPort(ipInfo[4])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
stats := strings.Fields(strings.TrimSpace(lines[i+1]))
|
|
stat := BytesSentRecv{}
|
|
for _, s := range stats {
|
|
if strings.Contains(s, "bytes_sent") {
|
|
sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
|
|
if err == nil {
|
|
stat.Sent = uint64(sent)
|
|
}
|
|
} else if strings.Contains(s, "bytes_received") {
|
|
recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
|
|
if err == nil {
|
|
stat.Recv = uint64(recv)
|
|
}
|
|
}
|
|
}
|
|
newState[src] = stat
|
|
}
|
|
return newState
|
|
}
|
|
|
|
func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
|
|
prevState := map[netip.AddrPort]BytesSentRecv{}
|
|
enc := json.NewEncoder(w)
|
|
for r.Context().Err() == nil {
|
|
output, err := exec.Command("ss", "-i", "-H", "-t").Output()
|
|
if err != nil {
|
|
fmt.Fprintf(w, "ss failed: %v", err)
|
|
return
|
|
}
|
|
newState := parseSSOutput(string(output))
|
|
s.mu.Lock()
|
|
for k, next := range newState {
|
|
prev := prevState[k]
|
|
if prev.Sent < next.Sent || prev.Recv < next.Recv {
|
|
if pkey, ok := s.keyOfAddr[k]; ok {
|
|
next.Key = pkey
|
|
if err := enc.Encode(next); err != nil {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
prevState = newState
|
|
if _, err := fmt.Fprintln(w); err != nil {
|
|
return
|
|
}
|
|
if f, ok := w.(http.Flusher); ok {
|
|
f.Flush()
|
|
}
|
|
time.Sleep(minTimeBetweenLogs)
|
|
}
|
|
}
|
|
|
|
var bufioWriterPool = &sync.Pool{
|
|
New: func() any {
|
|
return bufio.NewWriterSize(io.Discard, 2<<10)
|
|
},
|
|
}
|
|
|
|
// lazyBufioWriter is a bufio.Writer-like wrapping writer that lazily
|
|
// allocates its actual bufio.Writer from a sync.Pool, releasing it to
|
|
// the pool upon flush.
|
|
//
|
|
// We do this to reduce memory overhead; most DERP connections are
|
|
// idle and the idle bufio.Writers were 30% of overall memory usage.
|
|
type lazyBufioWriter struct {
|
|
w io.Writer // underlying
|
|
lbw *bufio.Writer // lazy; nil means it needs an associated buffer
|
|
}
|
|
|
|
func (w *lazyBufioWriter) bw() *bufio.Writer {
|
|
if w.lbw == nil {
|
|
w.lbw = bufioWriterPool.Get().(*bufio.Writer)
|
|
w.lbw.Reset(w.w)
|
|
}
|
|
return w.lbw
|
|
}
|
|
|
|
func (w *lazyBufioWriter) Available() int { return w.bw().Available() }
|
|
|
|
func (w *lazyBufioWriter) Write(p []byte) (int, error) { return w.bw().Write(p) }
|
|
|
|
func (w *lazyBufioWriter) Flush() error {
|
|
if w.lbw == nil {
|
|
return nil
|
|
}
|
|
err := w.lbw.Flush()
|
|
|
|
w.lbw.Reset(io.Discard)
|
|
bufioWriterPool.Put(w.lbw)
|
|
w.lbw = nil
|
|
|
|
return err
|
|
}
|