wgengine/magicsock: simplify ReceiveIPv4+DERP path

name           old time/op  new time/op  delta
ReceiveFrom-4  35.8µs ± 3%  21.9µs ± 5%  -38.92%  (p=0.008 n=5+5)

Fixes #1145
Updates #414

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2021-01-18 08:39:52 -08:00 committed by Brad Fitzpatrick
parent 9936cffc1a
commit 5c619882bc
2 changed files with 159 additions and 194 deletions

View File

@ -49,7 +49,6 @@
"tailscale.com/types/logger"
"tailscale.com/types/nettype"
"tailscale.com/types/opt"
"tailscale.com/types/structs"
"tailscale.com/types/wgkey"
"tailscale.com/version"
)
@ -147,21 +146,21 @@ type Conn struct {
// TODO(danderson): now that we have global rate-limiting, is this still useful?
sendLogLimit *rate.Limiter
// bufferedIPv4From and bufferedIPv4Packet are owned by
// ReceiveIPv4, and used when both a DERP and IPv4 packet arrive
// at the same time. It stores the IPv4 packet for use in the next call.
bufferedIPv4From netaddr.IPPort // if non-zero, then bufferedIPv4Packet is valid
bufferedIPv4Packet []byte // the received packet (reused, owned by ReceiveIPv4)
// stunReceiveFunc holds the current STUN packet processing func.
// Its Loaded value is always non-nil.
stunReceiveFunc atomic.Value // of func(p []byte, fromAddr *net.UDPAddr)
// udpRecvCh and derpRecvCh are used by ReceiveIPv4 to multiplex
// reads from DERP and the pconn4.
udpRecvCh chan udpReadResult
// derpRecvCh is used by ReceiveIPv4 to read DERP messages.
derpRecvCh chan derpReadResult
// derpRecvCountAtomic is atomically incremented by runDerpReader whenever
// a DERP message arrives. It's incremented before runDerpReader is interrupted.
derpRecvCountAtomic int64
// derpRecvCountLast is used by ReceiveIPv4 to compare against
// its last read value of derpRecvCountAtomic to determine
// whether a DERP channel read should be done.
derpRecvCountLast int64 // owned by ReceiveIPv4
// ============================================================
mu sync.Mutex // guards all following fields; see userspaceEngine lock ordering rules
muCond *sync.Cond
@ -421,7 +420,6 @@ func newConn() *Conn {
addrsByUDP: make(map[netaddr.IPPort]*addrSet),
addrsByKey: make(map[key.Public]*addrSet),
derpRecvCh: make(chan derpReadResult),
udpRecvCh: make(chan udpReadResult),
derpStarted: make(chan struct{}),
peerLastDerp: make(map[key.Public]int),
endpointOfDisco: make(map[tailcfg.DiscoKey]*discoEndpoint),
@ -1379,7 +1377,16 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netaddr.IPPort, d
// Ignore.
// TODO: handle endpoint notification messages.
continue
}
// Before we wake up ReceiveIPv4 with SetReadDeadline,
// note that a DERP packet has arrived. ReceiveIPv4
// will read this field to note that its UDP read
// error is due to us.
atomic.AddInt64(&c.derpRecvCountAtomic, 1)
// Cancel the pconn read goroutine.
c.pconn4.SetReadDeadline(aLongTimeAgo)
select {
case <-ctx.Done():
return
@ -1445,54 +1452,16 @@ func (c *Conn) findEndpoint(ipp netaddr.IPPort, addr *net.UDPAddr, packet []byte
}
}
if addr == nil {
addr = ipp.UDPAddr()
}
return c.findLegacyEndpointLocked(ipp, addr, packet)
}
type udpReadResult struct {
_ structs.Incomparable
n int
err error
addr *net.UDPAddr
ipp netaddr.IPPort
}
// aLongTimeAgo is a non-zero time, far in the past, used for
// immediate cancellation of network operations.
var aLongTimeAgo = time.Unix(233431200, 0)
// awaitUDP4 reads a single IPv4 UDP packet (or an error) and sends it
// to c.udpRecvCh, skipping over (but handling) any STUN replies.
func (c *Conn) awaitUDP4(b []byte) {
for {
n, pAddr, err := c.pconn4.ReadFrom(b)
if err != nil {
select {
case c.udpRecvCh <- udpReadResult{err: err}:
case <-c.donec:
}
return
}
addr := pAddr.(*net.UDPAddr)
ipp, ok := netaddr.FromStdAddr(addr.IP, addr.Port, addr.Zone)
if !ok {
continue
}
if stun.Is(b[:n]) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b[:n], ipp)
continue
}
if c.handleDiscoMessage(b[:n], ipp) {
continue
}
select {
case c.udpRecvCh <- udpReadResult{n: n, addr: addr, ipp: ipp}:
case <-c.donec:
}
return
}
}
// noteRecvActivityFromEndpoint calls the c.noteRecvActivity hook if
// e is a discovery-capable peer and this is the first receive activity
// it's got in awhile (in last 10 seconds).
@ -1505,128 +1474,88 @@ func (c *Conn) noteRecvActivityFromEndpoint(e conn.Endpoint) {
}
}
func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
var addr *net.UDPAddr
Top:
// First, process any buffered packet from earlier.
if from := c.bufferedIPv4From; !from.IsZero() {
c.bufferedIPv4From = netaddr.IPPort{}
addr = from.UDPAddr()
ep := c.findEndpoint(from, addr, c.bufferedIPv4Packet)
if ep == nil {
goto Top
}
c.noteRecvActivityFromEndpoint(ep)
return copy(b, c.bufferedIPv4Packet), ep, nil
func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, error) {
if c.pconn6 == nil {
return 0, nil, syscall.EAFNOSUPPORT
}
for {
n, pAddr, err := c.pconn6.ReadFrom(b)
if err != nil {
return 0, nil, err
}
if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok {
return n, ep, nil
}
}
}
go c.awaitUDP4(b)
func (c *Conn) derpPacketArrived() bool {
rc := atomic.LoadInt64(&c.derpRecvCountAtomic)
if rc != c.derpRecvCountLast {
c.derpRecvCountLast = rc
return true
}
return false
}
// Once the above goroutine has started, it owns b until it writes
// to udpRecvCh. The code below must not access b until it's
// completed a successful receive on udpRecvCh.
// ReceiveIPv4 is called by wireguard-go to receive an IPv4 packet.
// In Tailscale's case, that packet might also arrive via DERP. A DERP packet arrival
// aborts the pconn4 read deadline to make it fail.
func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
for {
n, pAddr, err := c.pconn4.ReadFrom(b)
if err != nil {
// If the pconn4 read failed, the likely reason is a DERP reader received
// a packet and interrupted us.
// It's possible for ReadFrom to return a non deadline exceeded error
// and for there to have also had a DERP packet arrive, but that's fine:
// we'll get the same error from ReadFrom later.
if c.derpPacketArrived() {
c.pconn4.SetReadDeadline(time.Time{}) // restore
n, ep, err = c.receiveIPv4DERP(b)
if err == errLoopAgain {
continue
}
return n, ep, err
}
return 0, nil, err
}
if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok {
return n, ep, nil
}
}
}
var ipp netaddr.IPPort
// receiveIP is the shared bits of ReceiveIPv4 and ReceiveIPv6.
func (c *Conn) receiveIP(b []byte, ua *net.UDPAddr) (ep conn.Endpoint, ok bool) {
ipp, ok := netaddr.FromStdAddr(ua.IP, ua.Port, ua.Zone)
if !ok {
return
}
if stun.Is(b) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp)
return
}
if c.handleDiscoMessage(b, ipp) {
return
}
ep = c.findEndpoint(ipp, ua, b)
if ep == nil {
return
}
c.noteRecvActivityFromEndpoint(ep)
return ep, true
}
var errLoopAgain = errors.New("received packet was not a wireguard-go packet or no endpoint found")
// receiveIPv4DERP reads a packet from c.derpRecvCh into b and returns the associated endpoint.
//
// If the packet was a disco message or the peer endpoint wasn't
// found, the returned error is errLoopAgain.
func (c *Conn) receiveIPv4DERP(b []byte) (n int, ep conn.Endpoint, err error) {
var dm derpReadResult
select {
case dm := <-c.derpRecvCh:
// Cancel the pconn read goroutine
c.pconn4.SetReadDeadline(aLongTimeAgo)
// Wait for the UDP-reading goroutine to be done, since it's currently
// the owner of the b []byte buffer:
select {
case um := <-c.udpRecvCh:
if um.err != nil {
// The normal case. The SetReadDeadline interrupted
// the read and we get an error which we now ignore.
} else {
// The pconn.ReadFrom succeeded and was about to send,
// but DERP sent first. So now we have both ready.
// Save the UDP packet away for use by the next
// ReceiveIPv4 call.
c.bufferedIPv4From = um.ipp
c.bufferedIPv4Packet = append(c.bufferedIPv4Packet[:0], b[:um.n]...)
}
c.pconn4.SetReadDeadline(time.Time{})
case <-c.donec:
return 0, nil, errors.New("Conn closed")
}
var regionID int
n, regionID = dm.n, dm.regionID
ncopy := dm.copyBuf(b)
if ncopy != n {
err = fmt.Errorf("received DERP packet of length %d that's too big for WireGuard ReceiveIPv4 buf size %d", n, ncopy)
c.logf("magicsock: %v", err)
return 0, nil, err
}
ipp = netaddr.IPPort{IP: derpMagicIPAddr, Port: uint16(regionID)}
if c.handleDiscoMessage(b[:n], ipp) {
goto Top
}
var (
didNoteRecvActivity bool
discoEp *discoEndpoint
asEp *addrSet
)
c.mu.Lock()
if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok {
discoEp = c.endpointOfDisco[dk]
// If we know about the node (it's in discoOfNode) but don't know about the
// endpoint, that's because it's an idle peer that doesn't yet exist in the
// wireguard config. So run the receive hook, if defined, which should
// create the wireguard peer.
if discoEp == nil && c.noteRecvActivity != nil {
didNoteRecvActivity = true
c.mu.Unlock() // release lock before calling noteRecvActivity
c.noteRecvActivity(dk) // (calls back into CreateEndpoint)
// Now require the lock. No invariants need to be rechecked; just
// 1-2 map lookups follow that are harmless if, say, the peer has
// been deleted during this time. In that case we'll treate it as a
// legacy pre-disco UDP receive and hand it to wireguard which'll
// likely just drop it.
c.mu.Lock()
discoEp = c.endpointOfDisco[dk]
c.logf("magicsock: DERP packet received from idle peer %v; created=%v", dm.src.ShortString(), ep != nil)
}
}
if !c.disableLegacy {
asEp = c.addrsByKey[dm.src]
}
c.mu.Unlock()
if discoEp != nil {
ep = discoEp
} else if asEp != nil {
ep = asEp
} else {
key := wgkey.Key(dm.src)
c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString())
ep = c.findEndpoint(ipp, addr, b[:n])
if ep == nil {
goto Top
}
}
if !didNoteRecvActivity {
c.noteRecvActivityFromEndpoint(ep)
}
return n, ep, nil
case um := <-c.udpRecvCh:
if um.err != nil {
return 0, nil, err
}
n, addr, ipp = um.n, um.addr, um.ipp
ep = c.findEndpoint(ipp, addr, b[:n])
if ep == nil {
goto Top
}
c.noteRecvActivityFromEndpoint(ep)
return n, ep, nil
case <-c.donec:
// Socket has been shut down. All the producers of packets
// respond to the context cancellation and go away, so we have
@ -1639,38 +1568,71 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
// Clos() on magicsock, and expects ReceiveIPv4 to unblock
// with an error so it can clean up.
return 0, nil, errors.New("socket closed")
case dm = <-c.derpRecvCh:
// Below.
}
}
func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, error) {
if c.pconn6 == nil {
return 0, nil, syscall.EAFNOSUPPORT
var regionID int
n, regionID = dm.n, dm.regionID
ncopy := dm.copyBuf(b)
if ncopy != n {
err = fmt.Errorf("received DERP packet of length %d that's too big for WireGuard ReceiveIPv4 buf size %d", n, ncopy)
c.logf("magicsock: %v", err)
return 0, nil, err
}
for {
n, pAddr, err := c.pconn6.ReadFrom(b)
if err != nil {
return 0, nil, err
}
addr := pAddr.(*net.UDPAddr)
ipp, ok := netaddr.FromStdAddr(addr.IP, addr.Port, addr.Zone)
if !ok {
continue
}
if stun.Is(b[:n]) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b[:n], ipp)
continue
}
if c.handleDiscoMessage(b[:n], ipp) {
continue
}
ep := c.findEndpoint(ipp, addr, b[:n])
ipp := netaddr.IPPort{IP: derpMagicIPAddr, Port: uint16(regionID)}
if c.handleDiscoMessage(b[:n], ipp) {
return 0, nil, errLoopAgain
}
var (
didNoteRecvActivity bool
discoEp *discoEndpoint
asEp *addrSet
)
c.mu.Lock()
if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok {
discoEp = c.endpointOfDisco[dk]
// If we know about the node (it's in discoOfNode) but don't know about the
// endpoint, that's because it's an idle peer that doesn't yet exist in the
// wireguard config. So run the receive hook, if defined, which should
// create the wireguard peer.
if discoEp == nil && c.noteRecvActivity != nil {
didNoteRecvActivity = true
c.mu.Unlock() // release lock before calling noteRecvActivity
c.noteRecvActivity(dk) // (calls back into CreateEndpoint)
// Now require the lock. No invariants need to be rechecked; just
// 1-2 map lookups follow that are harmless if, say, the peer has
// been deleted during this time.
c.mu.Lock()
discoEp = c.endpointOfDisco[dk]
c.logf("magicsock: DERP packet received from idle peer %v; created=%v", dm.src.ShortString(), ep != nil)
}
}
if !c.disableLegacy {
asEp = c.addrsByKey[dm.src]
}
c.mu.Unlock()
if discoEp != nil {
ep = discoEp
} else if asEp != nil {
ep = asEp
} else {
key := wgkey.Key(dm.src)
c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString())
ep = c.findEndpoint(ipp, nil, b[:n])
if ep == nil {
continue
return 0, nil, errLoopAgain
}
c.noteRecvActivityFromEndpoint(ep)
return n, ep, nil
}
if !didNoteRecvActivity {
c.noteRecvActivityFromEndpoint(ep)
}
return n, ep, nil
}
// discoLogLevel controls the verbosity of discovery log messages.

View File

@ -20,6 +20,7 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
@ -1381,7 +1382,7 @@ func stringifyConfig(cfg wgcfg.Config) string {
return string(j)
}
func TestDiscoEndpointAlignment(t *testing.T) {
func Test32bitAlignment(t *testing.T) {
var de discoEndpoint
off := unsafe.Offsetof(de.lastRecvUnixAtomic)
if off%8 != 0 {
@ -1393,6 +1394,8 @@ func TestDiscoEndpointAlignment(t *testing.T) {
if de.isFirstRecvActivityInAwhile() {
t.Error("expected false on second call")
}
var c Conn
atomic.AddInt64(&c.derpRecvCountAtomic, 1)
}
func BenchmarkReceiveFrom(b *testing.B) {