mirror of
https://github.com/tailscale/tailscale.git
synced 2025-01-09 17:43:40 +00:00
f0230ce0b5
This commit implements TCP GRO for packets being written to gVisor on Linux. Windows support will follow later. The wireguard-go dependency is updated in order to make use of newly exported IP checksum functions. gVisor is updated in order to make use of newly exported stack.PacketBuffer GRO logic. TCP throughput towards gVisor, i.e. TUN write direction, is dramatically improved as a result of this commit. Benchmarks show substantial improvement, sometimes as high as 2x. High bandwidth-delay product paths remain receive window limited, bottlenecked by gVisor's default TCP receive socket buffer size. This will be addressed in a follow-on commit. The iperf3 results below demonstrate the effect of this commit between two Linux computers with i5-12400 CPUs. There is roughly ~13us of round trip latency between them. The first result is from commit 57856fc without TCP GRO. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 4.77 GBytes 4.10 Gbits/sec 20 sender [ 5] 0.00-10.00 sec 4.77 GBytes 4.10 Gbits/sec receiver The second result is from this commit with TCP GRO. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 10.6 GBytes 9.14 Gbits/sec 20 sender [ 5] 0.00-10.00 sec 10.6 GBytes 9.14 Gbits/sec receiver Updates #6816 Signed-off-by: Jordan Whited <jordan@tailscale.com>
416 lines
12 KiB
Go
416 lines
12 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package netstack
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/tailscale/wireguard-go/tun"
|
|
"gvisor.dev/gvisor/pkg/buffer"
|
|
"gvisor.dev/gvisor/pkg/tcpip"
|
|
"gvisor.dev/gvisor/pkg/tcpip/header"
|
|
"gvisor.dev/gvisor/pkg/tcpip/header/parse"
|
|
"gvisor.dev/gvisor/pkg/tcpip/stack"
|
|
"gvisor.dev/gvisor/pkg/tcpip/stack/gro"
|
|
"tailscale.com/net/packet"
|
|
"tailscale.com/types/ipproto"
|
|
)
|
|
|
|
type queue struct {
|
|
// TODO(jwhited): evaluate performance with mu as Mutex and/or alternative
|
|
// non-channel buffer.
|
|
c chan *stack.PacketBuffer
|
|
mu sync.RWMutex // mu guards closed
|
|
closed bool
|
|
}
|
|
|
|
func (q *queue) Close() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if !q.closed {
|
|
close(q.c)
|
|
}
|
|
q.closed = true
|
|
}
|
|
|
|
func (q *queue) Read() *stack.PacketBuffer {
|
|
select {
|
|
case p := <-q.c:
|
|
return p
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
|
|
select {
|
|
case pkt := <-q.c:
|
|
return pkt
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
|
|
// q holds the PacketBuffer.
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
if q.closed {
|
|
return &tcpip.ErrClosedForSend{}
|
|
}
|
|
|
|
wrote := false
|
|
select {
|
|
case q.c <- pkt.IncRef():
|
|
wrote = true
|
|
default:
|
|
// TODO(jwhited): reconsider/count
|
|
pkt.DecRef()
|
|
}
|
|
|
|
if wrote {
|
|
return nil
|
|
}
|
|
return &tcpip.ErrNoBufferSpace{}
|
|
}
|
|
|
|
func (q *queue) Num() int {
|
|
return len(q.c)
|
|
}
|
|
|
|
var _ stack.LinkEndpoint = (*linkEndpoint)(nil)
|
|
var _ stack.GSOEndpoint = (*linkEndpoint)(nil)
|
|
|
|
// linkEndpoint implements stack.LinkEndpoint and stack.GSOEndpoint. Outbound
|
|
// packets written by gVisor towards Tailscale are stored in a channel.
|
|
// Inbound is fed to gVisor via injectInbound or enqueueGRO. This is loosely
|
|
// modeled after gvisor.dev/pkg/tcpip/link/channel.Endpoint.
|
|
type linkEndpoint struct {
|
|
SupportedGSOKind stack.SupportedGSO
|
|
initGRO initGRO
|
|
|
|
mu sync.RWMutex // mu guards the following fields
|
|
dispatcher stack.NetworkDispatcher
|
|
linkAddr tcpip.LinkAddress
|
|
mtu uint32
|
|
gro gro.GRO // mu only guards access to gro.Dispatcher
|
|
|
|
q *queue // outbound
|
|
}
|
|
|
|
// TODO(jwhited): move to linkEndpointOpts struct or similar.
|
|
type initGRO bool
|
|
|
|
const (
|
|
disableGRO initGRO = false
|
|
enableGRO initGRO = true
|
|
)
|
|
|
|
func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, gro initGRO) *linkEndpoint {
|
|
le := &linkEndpoint{
|
|
q: &queue{
|
|
c: make(chan *stack.PacketBuffer, size),
|
|
},
|
|
mtu: mtu,
|
|
linkAddr: linkAddr,
|
|
}
|
|
le.initGRO = gro
|
|
le.gro.Init(bool(gro))
|
|
return le
|
|
}
|
|
|
|
// Close closes l. Further packet injections will return an error, and all
|
|
// pending packets are discarded. Close may be called concurrently with
|
|
// WritePackets.
|
|
func (l *linkEndpoint) Close() {
|
|
l.mu.Lock()
|
|
if l.gro.Dispatcher != nil {
|
|
l.gro.Flush()
|
|
}
|
|
l.dispatcher = nil
|
|
l.gro.Dispatcher = nil
|
|
l.mu.Unlock()
|
|
l.q.Close()
|
|
l.Drain()
|
|
}
|
|
|
|
// Read does non-blocking read one packet from the outbound packet queue.
|
|
func (l *linkEndpoint) Read() *stack.PacketBuffer {
|
|
return l.q.Read()
|
|
}
|
|
|
|
// ReadContext does blocking read for one packet from the outbound packet queue.
|
|
// It can be cancelled by ctx, and in this case, it returns nil.
|
|
func (l *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
|
|
return l.q.ReadContext(ctx)
|
|
}
|
|
|
|
// Drain removes all outbound packets from the channel and counts them.
|
|
func (l *linkEndpoint) Drain() int {
|
|
c := 0
|
|
for pkt := l.Read(); pkt != nil; pkt = l.Read() {
|
|
pkt.DecRef()
|
|
c++
|
|
}
|
|
return c
|
|
}
|
|
|
|
// NumQueued returns the number of packets queued for outbound.
|
|
func (l *linkEndpoint) NumQueued() int {
|
|
return l.q.Num()
|
|
}
|
|
|
|
// rxChecksumOffload validates IPv4, TCP, and UDP header checksums in p,
|
|
// returning an equivalent *stack.PacketBuffer if they are valid, otherwise nil.
|
|
// The set of headers validated covers where gVisor would perform validation if
|
|
// !stack.PacketBuffer.RXChecksumValidated, i.e. it satisfies
|
|
// stack.CapabilityRXChecksumOffload. Other protocols with checksum fields,
|
|
// e.g. ICMP{v6}, are still validated by gVisor regardless of rx checksum
|
|
// offloading capabilities.
|
|
func rxChecksumOffload(p *packet.Parsed) *stack.PacketBuffer {
|
|
var (
|
|
pn tcpip.NetworkProtocolNumber
|
|
csumStart int
|
|
)
|
|
buf := p.Buffer()
|
|
|
|
switch p.IPVersion {
|
|
case 4:
|
|
if len(buf) < header.IPv4MinimumSize {
|
|
return nil
|
|
}
|
|
csumStart = int((buf[0] & 0x0F) * 4)
|
|
if csumStart < header.IPv4MinimumSize || csumStart > header.IPv4MaximumHeaderSize || len(buf) < csumStart {
|
|
return nil
|
|
}
|
|
if ^tun.Checksum(buf[:csumStart], 0) != 0 {
|
|
return nil
|
|
}
|
|
pn = header.IPv4ProtocolNumber
|
|
case 6:
|
|
if len(buf) < header.IPv6FixedHeaderSize {
|
|
return nil
|
|
}
|
|
csumStart = header.IPv6FixedHeaderSize
|
|
pn = header.IPv6ProtocolNumber
|
|
if p.IPProto != ipproto.ICMPv6 && p.IPProto != ipproto.TCP && p.IPProto != ipproto.UDP {
|
|
// buf could have extension headers before a UDP or TCP header, but
|
|
// packet.Parsed.IPProto will be set to the ext header type, so we
|
|
// have to look deeper. We are still responsible for validating the
|
|
// L4 checksum in this case. So, make use of gVisor's existing
|
|
// extension header parsing via parse.IPv6() in order to unpack the
|
|
// L4 csumStart index. This is not particularly efficient as we have
|
|
// to allocate a short-lived stack.PacketBuffer that cannot be
|
|
// re-used. parse.IPv6() "consumes" the IPv6 headers, so we can't
|
|
// inject this stack.PacketBuffer into the stack at a later point.
|
|
packetBuf := stack.NewPacketBuffer(stack.PacketBufferOptions{
|
|
Payload: buffer.MakeWithData(bytes.Clone(buf)),
|
|
})
|
|
defer packetBuf.DecRef()
|
|
// The rightmost bool returns false only if packetBuf is too short,
|
|
// which we've already accounted for above.
|
|
transportProto, _, _, _, _ := parse.IPv6(packetBuf)
|
|
if transportProto == header.TCPProtocolNumber || transportProto == header.UDPProtocolNumber {
|
|
csumLen := packetBuf.Data().Size()
|
|
if len(buf) < csumLen {
|
|
return nil
|
|
}
|
|
csumStart = len(buf) - csumLen
|
|
p.IPProto = ipproto.Proto(transportProto)
|
|
}
|
|
}
|
|
}
|
|
|
|
if p.IPProto == ipproto.TCP || p.IPProto == ipproto.UDP {
|
|
lenForPseudo := len(buf) - csumStart
|
|
csum := tun.PseudoHeaderChecksum(
|
|
uint8(p.IPProto),
|
|
p.Src.Addr().AsSlice(),
|
|
p.Dst.Addr().AsSlice(),
|
|
uint16(lenForPseudo))
|
|
csum = tun.Checksum(buf[csumStart:], csum)
|
|
if ^csum != 0 {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
packetBuf := stack.NewPacketBuffer(stack.PacketBufferOptions{
|
|
Payload: buffer.MakeWithData(bytes.Clone(buf)),
|
|
})
|
|
packetBuf.NetworkProtocolNumber = pn
|
|
// Setting this is not technically required. gVisor overrides where
|
|
// stack.CapabilityRXChecksumOffload is advertised from Capabilities().
|
|
// https://github.com/google/gvisor/blob/64c016c92987cc04dfd4c7b091ddd21bdad875f8/pkg/tcpip/stack/nic.go#L763
|
|
// This is also why we offload for all packets since we cannot signal this
|
|
// per-packet.
|
|
packetBuf.RXChecksumValidated = true
|
|
return packetBuf
|
|
}
|
|
|
|
func (l *linkEndpoint) injectInbound(p *packet.Parsed) {
|
|
l.mu.RLock()
|
|
d := l.dispatcher
|
|
l.mu.RUnlock()
|
|
if d == nil {
|
|
return
|
|
}
|
|
pkt := rxChecksumOffload(p)
|
|
if pkt == nil {
|
|
return
|
|
}
|
|
d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, pkt)
|
|
pkt.DecRef()
|
|
}
|
|
|
|
// enqueueGRO enqueues the provided packet for GRO. It may immediately deliver
|
|
// it to the underlying stack.NetworkDispatcher depending on its contents and if
|
|
// GRO was initialized via newLinkEndpoint. To explicitly flush previously
|
|
// enqueued packets see flushGRO. enqueueGRO is not thread-safe and must not
|
|
// be called concurrently with flushGRO.
|
|
func (l *linkEndpoint) enqueueGRO(p *packet.Parsed) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
if l.gro.Dispatcher == nil {
|
|
return
|
|
}
|
|
pkt := rxChecksumOffload(p)
|
|
if pkt == nil {
|
|
return
|
|
}
|
|
// TODO(jwhited): gro.Enqueue() duplicates a lot of p.Decode().
|
|
// We may want to push stack.PacketBuffer further up as a
|
|
// replacement for packet.Parsed, or inversely push packet.Parsed
|
|
// down into refactored GRO logic.
|
|
l.gro.Enqueue(pkt)
|
|
pkt.DecRef()
|
|
}
|
|
|
|
// flushGRO flushes previously enqueueGRO'd packets to the underlying
|
|
// stack.NetworkDispatcher. flushGRO is not thread-safe, and must not be
|
|
// called concurrently with enqueueGRO.
|
|
func (l *linkEndpoint) flushGRO() {
|
|
if !l.initGRO {
|
|
// If GRO was not initialized fast path return to avoid scanning GRO
|
|
// buckets (see l.gro.Flush()) that will always be empty.
|
|
return
|
|
}
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
if l.gro.Dispatcher != nil {
|
|
l.gro.Flush()
|
|
}
|
|
}
|
|
|
|
// Attach saves the stack network-layer dispatcher for use later when packets
|
|
// are injected.
|
|
func (l *linkEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.dispatcher = dispatcher
|
|
l.gro.Dispatcher = dispatcher
|
|
}
|
|
|
|
// IsAttached implements stack.LinkEndpoint.IsAttached.
|
|
func (l *linkEndpoint) IsAttached() bool {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.dispatcher != nil
|
|
}
|
|
|
|
// MTU implements stack.LinkEndpoint.MTU.
|
|
func (l *linkEndpoint) MTU() uint32 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.mtu
|
|
}
|
|
|
|
// SetMTU implements stack.LinkEndpoint.SetMTU.
|
|
func (l *linkEndpoint) SetMTU(mtu uint32) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.mtu = mtu
|
|
}
|
|
|
|
// Capabilities implements stack.LinkEndpoint.Capabilities.
|
|
func (l *linkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
|
|
// We are required to offload RX checksum validation for the purposes of
|
|
// GRO.
|
|
return stack.CapabilityRXChecksumOffload
|
|
}
|
|
|
|
// GSOMaxSize implements stack.GSOEndpoint.
|
|
func (*linkEndpoint) GSOMaxSize() uint32 {
|
|
// This an increase from 32k returned by channel.Endpoint.GSOMaxSize() to
|
|
// 64k, which improves throughput.
|
|
return (1 << 16) - 1
|
|
}
|
|
|
|
// SupportedGSO implements stack.GSOEndpoint.
|
|
func (l *linkEndpoint) SupportedGSO() stack.SupportedGSO {
|
|
return l.SupportedGSOKind
|
|
}
|
|
|
|
// MaxHeaderLength returns the maximum size of the link layer header. Given it
|
|
// doesn't have a header, it just returns 0.
|
|
func (*linkEndpoint) MaxHeaderLength() uint16 {
|
|
return 0
|
|
}
|
|
|
|
// LinkAddress returns the link address of this endpoint.
|
|
func (l *linkEndpoint) LinkAddress() tcpip.LinkAddress {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.linkAddr
|
|
}
|
|
|
|
// SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
|
|
func (l *linkEndpoint) SetLinkAddress(addr tcpip.LinkAddress) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.linkAddr = addr
|
|
}
|
|
|
|
// WritePackets stores outbound packets into the channel.
|
|
// Multiple concurrent calls are permitted.
|
|
func (l *linkEndpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
|
|
n := 0
|
|
// TODO(jwhited): evaluate writing a stack.PacketBufferList instead of a
|
|
// single packet. We can split 2 x 64K GSO across
|
|
// wireguard-go/conn.IdealBatchSize (128 slots) @ 1280 MTU, and non-GSO we
|
|
// could do more. Read API would need to change to take advantage. Verify
|
|
// gVisor limits around max number of segments packed together. Since we
|
|
// control MTU (and by effect TCP MSS in gVisor) we *shouldn't* expect to
|
|
// ever overflow 128 slots (see wireguard-go/tun.ErrTooManySegments usage).
|
|
for _, pkt := range pkts.AsSlice() {
|
|
if err := l.q.Write(pkt); err != nil {
|
|
if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
|
|
return 0, err
|
|
}
|
|
break
|
|
}
|
|
n++
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// Wait implements stack.LinkEndpoint.Wait.
|
|
func (*linkEndpoint) Wait() {}
|
|
|
|
// ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
|
|
func (*linkEndpoint) ARPHardwareType() header.ARPHardwareType {
|
|
return header.ARPHardwareNone
|
|
}
|
|
|
|
// AddHeader implements stack.LinkEndpoint.AddHeader.
|
|
func (*linkEndpoint) AddHeader(*stack.PacketBuffer) {}
|
|
|
|
// ParseHeader implements stack.LinkEndpoint.ParseHeader.
|
|
func (*linkEndpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
|
|
|
|
// SetOnCloseAction implements stack.LinkEndpoint.
|
|
func (*linkEndpoint) SetOnCloseAction(func()) {}
|