// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package derp import ( "bufio" "encoding/binary" "encoding/json" "errors" "fmt" "io" "sync" "sync/atomic" "time" "go4.org/mem" "golang.org/x/time/rate" "tailscale.com/net/netaddr" "tailscale.com/types/key" "tailscale.com/types/logger" ) // Client is a DERP client. type Client struct { serverKey key.NodePublic // of the DERP server; not a machine or node key privateKey key.NodePrivate publicKey key.NodePublic // of privateKey logf logger.Logf nc Conn br *bufio.Reader meshKey string canAckPings bool isProber bool wmu sync.Mutex // hold while writing to bw bw *bufio.Writer rate *rate.Limiter // if non-nil, rate limiter to use // Owned by Recv: peeked int // bytes to discard on next Recv readErr atomic.Value // of error; sticky (set by Recv) } // ClientOpt is an option passed to NewClient. type ClientOpt interface { update(*clientOpt) } type clientOptFunc func(*clientOpt) func (f clientOptFunc) update(o *clientOpt) { f(o) } // clientOpt are the options passed to newClient. type clientOpt struct { MeshKey string ServerPub key.NodePublic CanAckPings bool IsProber bool } // MeshKey returns a ClientOpt to pass to the DERP server during connect to get // access to join the mesh. // // An empty key means to not use a mesh key. func MeshKey(key string) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.MeshKey = key }) } // IsProber returns a ClientOpt to pass to the DERP server during connect to // declare that this client is a a prober. func IsProber(v bool) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.IsProber = v }) } // ServerPublicKey returns a ClientOpt to declare that the server's DERP public key is known. // If key is the zero value, the returned ClientOpt is a no-op. func ServerPublicKey(key key.NodePublic) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.ServerPub = key }) } // CanAckPings returns a ClientOpt to set whether it advertises to the // server that it's capable of acknowledging ping requests. func CanAckPings(v bool) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.CanAckPings = v }) } func NewClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf logger.Logf, opts ...ClientOpt) (*Client, error) { var opt clientOpt for _, o := range opts { if o == nil { return nil, errors.New("nil ClientOpt") } o.update(&opt) } return newClient(privateKey, nc, brw, logf, opt) } func newClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf logger.Logf, opt clientOpt) (*Client, error) { c := &Client{ privateKey: privateKey, publicKey: privateKey.Public(), logf: logf, nc: nc, br: brw.Reader, bw: brw.Writer, meshKey: opt.MeshKey, canAckPings: opt.CanAckPings, isProber: opt.IsProber, } if opt.ServerPub.IsZero() { if err := c.recvServerKey(); err != nil { return nil, fmt.Errorf("derp.Client: failed to receive server key: %v", err) } } else { c.serverKey = opt.ServerPub } if err := c.sendClientKey(); err != nil { return nil, fmt.Errorf("derp.Client: failed to send client key: %v", err) } return c, nil } func (c *Client) recvServerKey() error { var buf [40]byte t, flen, err := readFrame(c.br, 1<<10, buf[:]) if err == io.ErrShortBuffer { // For future-proofing, allow server to send more in its greeting. err = nil } if err != nil { return err } if flen < uint32(len(buf)) || t != frameServerKey || string(buf[:len(magic)]) != magic { return errors.New("invalid server greeting") } c.serverKey = key.NodePublicFromRaw32(mem.B(buf[len(magic):])) return nil } func (c *Client) parseServerInfo(b []byte) (*serverInfo, error) { const maxLength = nonceLen + maxInfoLen fl := len(b) if fl < nonceLen { return nil, fmt.Errorf("short serverInfo frame") } if fl > maxLength { return nil, fmt.Errorf("long serverInfo frame") } msg, ok := c.privateKey.OpenFrom(c.serverKey, b) if !ok { return nil, fmt.Errorf("failed to open naclbox from server key %s", c.serverKey) } info := new(serverInfo) if err := json.Unmarshal(msg, info); err != nil { return nil, fmt.Errorf("invalid JSON: %v", err) } return info, nil } type clientInfo struct { // Version is the DERP protocol version that the client was built with. // See the ProtocolVersion const. Version int `json:"version,omitempty"` // MeshKey optionally specifies a pre-shared key used by // trusted clients. It's required to subscribe to the // connection list & forward packets. It's empty for regular // users. MeshKey string `json:"meshKey,omitempty"` // CanAckPings is whether the client declares it's able to ack // pings. CanAckPings bool // IsProber is whether this client is a prober. IsProber bool `json:",omitempty"` } func (c *Client) sendClientKey() error { msg, err := json.Marshal(clientInfo{ Version: ProtocolVersion, MeshKey: c.meshKey, CanAckPings: c.canAckPings, IsProber: c.isProber, }) if err != nil { return err } msgbox := c.privateKey.SealTo(c.serverKey, msg) buf := make([]byte, 0, keyLen+len(msgbox)) buf = c.publicKey.AppendTo(buf) buf = append(buf, msgbox...) return writeFrame(c.bw, frameClientInfo, buf) } // ServerPublicKey returns the server's public key. func (c *Client) ServerPublicKey() key.NodePublic { return c.serverKey } // Send sends a packet to the Tailscale node identified by dstKey. // // It is an error if the packet is larger than 64KB. func (c *Client) Send(dstKey key.NodePublic, pkt []byte) error { return c.send(dstKey, pkt) } func (c *Client) send(dstKey key.NodePublic, pkt []byte) (ret error) { defer func() { if ret != nil { ret = fmt.Errorf("derp.Send: %w", ret) } }() if len(pkt) > MaxPacketSize { return fmt.Errorf("packet too big: %d", len(pkt)) } c.wmu.Lock() defer c.wmu.Unlock() if c.rate != nil { pktLen := frameHeaderLen + key.NodePublicRawLen + len(pkt) if !c.rate.AllowN(time.Now(), pktLen) { return nil // drop } } if err := writeFrameHeader(c.bw, frameSendPacket, uint32(key.NodePublicRawLen+len(pkt))); err != nil { return err } if _, err := c.bw.Write(dstKey.AppendTo(nil)); err != nil { return err } if _, err := c.bw.Write(pkt); err != nil { return err } return c.bw.Flush() } func (c *Client) ForwardPacket(srcKey, dstKey key.NodePublic, pkt []byte) (err error) { defer func() { if err != nil { err = fmt.Errorf("derp.ForwardPacket: %w", err) } }() if len(pkt) > MaxPacketSize { return fmt.Errorf("packet too big: %d", len(pkt)) } c.wmu.Lock() defer c.wmu.Unlock() timer := time.AfterFunc(5*time.Second, c.writeTimeoutFired) defer timer.Stop() if err := writeFrameHeader(c.bw, frameForwardPacket, uint32(keyLen*2+len(pkt))); err != nil { return err } if _, err := c.bw.Write(srcKey.AppendTo(nil)); err != nil { return err } if _, err := c.bw.Write(dstKey.AppendTo(nil)); err != nil { return err } if _, err := c.bw.Write(pkt); err != nil { return err } return c.bw.Flush() } func (c *Client) writeTimeoutFired() { c.nc.Close() } func (c *Client) SendPing(data [8]byte) error { return c.sendPingOrPong(framePing, data) } func (c *Client) SendPong(data [8]byte) error { return c.sendPingOrPong(framePong, data) } func (c *Client) sendPingOrPong(typ frameType, data [8]byte) error { c.wmu.Lock() defer c.wmu.Unlock() if err := writeFrameHeader(c.bw, typ, 8); err != nil { return err } if _, err := c.bw.Write(data[:]); err != nil { return err } return c.bw.Flush() } // NotePreferred sends a packet that tells the server whether this // client is the user's preferred server. This is only used in the // server for stats. func (c *Client) NotePreferred(preferred bool) (err error) { defer func() { if err != nil { err = fmt.Errorf("derp.NotePreferred: %v", err) } }() c.wmu.Lock() defer c.wmu.Unlock() if err := writeFrameHeader(c.bw, frameNotePreferred, 1); err != nil { return err } var b byte = 0x00 if preferred { b = 0x01 } if err := c.bw.WriteByte(b); err != nil { return err } return c.bw.Flush() } // WatchConnectionChanges sends a request to subscribe to the peer's connection list. // It's a fatal error if the client wasn't created using MeshKey. func (c *Client) WatchConnectionChanges() error { c.wmu.Lock() defer c.wmu.Unlock() if err := writeFrameHeader(c.bw, frameWatchConns, 0); err != nil { return err } return c.bw.Flush() } // ClosePeer asks the server to close target's TCP connection. // It's a fatal error if the client wasn't created using MeshKey. func (c *Client) ClosePeer(target key.NodePublic) error { c.wmu.Lock() defer c.wmu.Unlock() return writeFrame(c.bw, frameClosePeer, target.AppendTo(nil)) } // ReceivedMessage represents a type returned by Client.Recv. Unless // otherwise documented, the returned message aliases the byte slice // provided to Recv and thus the message is only as good as that // buffer, which is up to the caller. type ReceivedMessage interface { msg() } // ReceivedPacket is a ReceivedMessage representing an incoming packet. type ReceivedPacket struct { Source key.NodePublic // Data is the received packet bytes. It aliases the memory // passed to Client.Recv. Data []byte } func (ReceivedPacket) msg() {} // PeerGoneMessage is a ReceivedMessage that indicates that the client // identified by the underlying public key had previously sent you a // packet but has now disconnected from the server. type PeerGoneMessage key.NodePublic func (PeerGoneMessage) msg() {} // PeerPresentMessage is a ReceivedMessage that indicates that the client // is connected to the server. (Only used by trusted mesh clients) type PeerPresentMessage key.NodePublic func (PeerPresentMessage) msg() {} // ServerInfoMessage is sent by the server upon first connect. type ServerInfoMessage struct { // TokenBucketBytesPerSecond is how many bytes per second the // server says it will accept, including all framing bytes. // // Zero means unspecified. There might be a limit, but the // client need not try to respect it. TokenBucketBytesPerSecond int // TokenBucketBytesBurst is how many bytes the server will // allow to burst, temporarily violating // TokenBucketBytesPerSecond. // // Zero means unspecified. There might be a limit, but the // client need not try to respect it. TokenBucketBytesBurst int } func (ServerInfoMessage) msg() {} // PingMessage is a request from a client or server to reply to the // other side with a PongMessage with the given payload. type PingMessage [8]byte func (PingMessage) msg() {} // PongMessage is a reply to a PingMessage from a client or server // with the payload sent previously in a PingMessage. type PongMessage [8]byte func (PongMessage) msg() {} // KeepAliveMessage is a one-way empty message from server to client, just to // keep the connection alive. It's like a PingMessage, but doesn't solicit // a reply from the client. type KeepAliveMessage struct{} func (KeepAliveMessage) msg() {} // HealthMessage is a one-way message from server to client, declaring the // connection health state. type HealthMessage struct { // Problem, if non-empty, is a description of why the connection // is unhealthy. // // The empty string means the connection is healthy again. // // The default condition is healthy, so the server doesn't // broadcast a HealthMessage until a problem exists. Problem string } func (HealthMessage) msg() {} // ServerRestartingMessage is a one-way message from server to client, // advertising that the server is restarting. type ServerRestartingMessage struct { // ReconnectIn is an advisory duration that the client should wait // before attempting to reconnect. It might be zero. // It exists for the server to smear out the reconnects. ReconnectIn time.Duration // TryFor is an advisory duration for how long the client // should attempt to reconnect before giving up and proceeding // with its normal connection failure logic. The interval // between retries is undefined for now. // A server should not send a TryFor duration more than a few // seconds. TryFor time.Duration } func (ServerRestartingMessage) msg() {} // Recv reads a message from the DERP server. // // The returned message may alias memory owned by the Client; it // should only be accessed until the next call to Client. // // Once Recv returns an error, the Client is dead forever. func (c *Client) Recv() (m ReceivedMessage, err error) { return c.recvTimeout(120 * time.Second) } func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err error) { readErr, _ := c.readErr.Load().(error) if readErr != nil { return nil, readErr } defer func() { if err != nil { err = fmt.Errorf("derp.Recv: %w", err) c.readErr.Store(err) } }() for { c.nc.SetReadDeadline(time.Now().Add(timeout)) // Discard any peeked bytes from a previous Recv call. if c.peeked != 0 { if n, err := c.br.Discard(c.peeked); err != nil || n != c.peeked { // Documented to never fail, but might as well check. return nil, fmt.Errorf("bufio.Reader.Discard(%d bytes): got %v, %v", c.peeked, n, err) } c.peeked = 0 } t, n, err := readFrameHeader(c.br) if err != nil { return nil, err } if n > 1<<20 { return nil, fmt.Errorf("unexpectedly large frame of %d bytes returned", n) } var b []byte // frame payload (past the 5 byte header) // If the frame fits in our bufio.Reader buffer, just use it. // In practice it's 4KB (from derphttp.Client's bufio.NewReader(httpConn)) and // in practive, WireGuard packets (and thus DERP frames) are under 1.5KB. // So this is the common path. if int(n) <= c.br.Size() { b, err = c.br.Peek(int(n)) c.peeked = int(n) } else { // But if for some reason we read a large DERP message (which isn't necessarily // a WireGuard packet), then just allocate memory for it. // TODO(bradfitz): use a pool if large frames ever happen in practice. b = make([]byte, n) _, err = io.ReadFull(c.br, b) } if err != nil { return nil, err } switch t { default: continue case frameServerInfo: // Server sends this at start-up. Currently unused. // Just has a JSON message saying "version: 2", // but the protocol seems extensible enough as-is without // needing to wait an RTT to discover the version at startup. // We'd prefer to give the connection to the client (magicsock) // to start writing as soon as possible. si, err := c.parseServerInfo(b) if err != nil { return nil, fmt.Errorf("invalid server info frame: %v", err) } sm := ServerInfoMessage{ TokenBucketBytesPerSecond: si.TokenBucketBytesPerSecond, TokenBucketBytesBurst: si.TokenBucketBytesBurst, } c.setSendRateLimiter(sm) return sm, nil case frameKeepAlive: // A one-way keep-alive message that doesn't require an acknowledgement. // This predated framePing/framePong. return KeepAliveMessage{}, nil case framePeerGone: if n < keyLen { c.logf("[unexpected] dropping short peerGone frame from DERP server") continue } pg := PeerGoneMessage(key.NodePublicFromRaw32(mem.B(b[:keyLen]))) return pg, nil case framePeerPresent: if n < keyLen { c.logf("[unexpected] dropping short peerPresent frame from DERP server") continue } pg := PeerPresentMessage(key.NodePublicFromRaw32(mem.B(b[:keyLen]))) return pg, nil case frameRecvPacket: var rp ReceivedPacket if n < keyLen { c.logf("[unexpected] dropping short packet from DERP server") continue } rp.Source = key.NodePublicFromRaw32(mem.B(b[:keyLen])) rp.Data = b[keyLen:n] return rp, nil case framePing: var pm PingMessage if n < 8 { c.logf("[unexpected] dropping short ping frame") continue } copy(pm[:], b[:]) return pm, nil case framePong: var pm PongMessage if n < 8 { c.logf("[unexpected] dropping short ping frame") continue } copy(pm[:], b[:]) return pm, nil case frameHealth: return HealthMessage{Problem: string(b[:])}, nil case frameRestarting: var m ServerRestartingMessage if n < 8 { c.logf("[unexpected] dropping short server restarting frame") continue } m.ReconnectIn = time.Duration(binary.BigEndian.Uint32(b[0:4])) * time.Millisecond m.TryFor = time.Duration(binary.BigEndian.Uint32(b[4:8])) * time.Millisecond return m, nil } } } func (c *Client) setSendRateLimiter(sm ServerInfoMessage) { c.wmu.Lock() defer c.wmu.Unlock() if sm.TokenBucketBytesPerSecond == 0 { c.rate = nil } else { c.rate = rate.NewLimiter( rate.Limit(sm.TokenBucketBytesPerSecond), sm.TokenBucketBytesBurst) } } // LocalAddr returns the TCP connection's local address. // // If the client is broken in some previously detectable way, it // returns an error. func (c *Client) LocalAddr() (netaddr.IPPort, error) { readErr, _ := c.readErr.Load().(error) if readErr != nil { return netaddr.IPPort{}, readErr } if c.nc == nil { return netaddr.IPPort{}, errors.New("nil conn") } a := c.nc.LocalAddr() if a == nil { return netaddr.IPPort{}, errors.New("nil addr") } return netaddr.ParseIPPort(a.String()) }