mirror of
https://github.com/tailscale/tailscale.git
synced 2025-01-10 01:53:49 +00:00
df6014f1d7
In 2f27319baf71681e221904d3a3ffe1badedc8e2e we disabled GRO due to a data race around concurrent calls to tstun.Wrapper.Write(). This commit refactors GRO to be thread-safe, and re-enables it on Linux. This refactor now carries a GRO type across tstun and netstack APIs with a lifetime that is scoped to a single tstun.Wrapper.Write() call. In 25f0a3fc8f6f9cf681bb5afda8e1762816c67a8b we used build tags to prevent importation of gVisor's GRO package on iOS as at the time we believed it was contributing to additional memory usage on that platform. It wasn't, so this commit simplifies and removes those build tags. Updates tailscale/corp#22353 Updates tailscale/corp#22125 Updates #6816 Signed-off-by: Jordan Whited <jordan@tailscale.com>
303 lines
7.7 KiB
Go
303 lines
7.7 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package netstack
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"gvisor.dev/gvisor/pkg/tcpip"
|
|
"gvisor.dev/gvisor/pkg/tcpip/header"
|
|
"gvisor.dev/gvisor/pkg/tcpip/stack"
|
|
"tailscale.com/net/packet"
|
|
"tailscale.com/types/ipproto"
|
|
"tailscale.com/wgengine/netstack/gro"
|
|
)
|
|
|
|
type queue struct {
|
|
// TODO(jwhited): evaluate performance with mu as Mutex and/or alternative
|
|
// non-channel buffer.
|
|
c chan *stack.PacketBuffer
|
|
mu sync.RWMutex // mu guards closed
|
|
closed bool
|
|
}
|
|
|
|
func (q *queue) Close() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if !q.closed {
|
|
close(q.c)
|
|
}
|
|
q.closed = true
|
|
}
|
|
|
|
func (q *queue) Read() *stack.PacketBuffer {
|
|
select {
|
|
case p := <-q.c:
|
|
return p
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
|
|
select {
|
|
case pkt := <-q.c:
|
|
return pkt
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
|
|
// q holds the PacketBuffer.
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
if q.closed {
|
|
return &tcpip.ErrClosedForSend{}
|
|
}
|
|
|
|
wrote := false
|
|
select {
|
|
case q.c <- pkt.IncRef():
|
|
wrote = true
|
|
default:
|
|
// TODO(jwhited): reconsider/count
|
|
pkt.DecRef()
|
|
}
|
|
|
|
if wrote {
|
|
return nil
|
|
}
|
|
return &tcpip.ErrNoBufferSpace{}
|
|
}
|
|
|
|
func (q *queue) Num() int {
|
|
return len(q.c)
|
|
}
|
|
|
|
var _ stack.LinkEndpoint = (*linkEndpoint)(nil)
|
|
var _ stack.GSOEndpoint = (*linkEndpoint)(nil)
|
|
|
|
type supportedGRO int
|
|
|
|
const (
|
|
groNotSupported supportedGRO = iota
|
|
tcpGROSupported
|
|
)
|
|
|
|
// linkEndpoint implements stack.LinkEndpoint and stack.GSOEndpoint. Outbound
|
|
// packets written by gVisor towards Tailscale are stored in a channel.
|
|
// Inbound is fed to gVisor via injectInbound or gro. This is loosely
|
|
// modeled after gvisor.dev/pkg/tcpip/link/channel.Endpoint.
|
|
type linkEndpoint struct {
|
|
SupportedGSOKind stack.SupportedGSO
|
|
supportedGRO supportedGRO
|
|
|
|
mu sync.RWMutex // mu guards the following fields
|
|
dispatcher stack.NetworkDispatcher
|
|
linkAddr tcpip.LinkAddress
|
|
mtu uint32
|
|
|
|
q *queue // outbound
|
|
}
|
|
|
|
func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, supportedGRO supportedGRO) *linkEndpoint {
|
|
le := &linkEndpoint{
|
|
supportedGRO: supportedGRO,
|
|
q: &queue{
|
|
c: make(chan *stack.PacketBuffer, size),
|
|
},
|
|
mtu: mtu,
|
|
linkAddr: linkAddr,
|
|
}
|
|
return le
|
|
}
|
|
|
|
// gro attempts to enqueue p on g if l supports a GRO kind matching the
|
|
// transport protocol carried in p. gro may allocate g if it is nil. gro can
|
|
// either return the existing g, a newly allocated one, or nil. Callers are
|
|
// responsible for calling Flush() on the returned value if it is non-nil once
|
|
// they have finished iterating through all GRO candidates for a given vector.
|
|
// If gro allocates a *gro.GRO it will have l's stack.NetworkDispatcher set via
|
|
// SetDispatcher().
|
|
func (l *linkEndpoint) gro(p *packet.Parsed, g *gro.GRO) *gro.GRO {
|
|
if l.supportedGRO == groNotSupported || p.IPProto != ipproto.TCP {
|
|
// IPv6 may have extension headers preceding a TCP header, but we trade
|
|
// for a fast path and assume p cannot be coalesced in such a case.
|
|
l.injectInbound(p)
|
|
return g
|
|
}
|
|
if g == nil {
|
|
l.mu.RLock()
|
|
d := l.dispatcher
|
|
l.mu.RUnlock()
|
|
g = gro.NewGRO()
|
|
g.SetDispatcher(d)
|
|
}
|
|
g.Enqueue(p)
|
|
return g
|
|
}
|
|
|
|
// Close closes l. Further packet injections will return an error, and all
|
|
// pending packets are discarded. Close may be called concurrently with
|
|
// WritePackets.
|
|
func (l *linkEndpoint) Close() {
|
|
l.mu.Lock()
|
|
l.dispatcher = nil
|
|
l.mu.Unlock()
|
|
l.q.Close()
|
|
l.Drain()
|
|
}
|
|
|
|
// Read does non-blocking read one packet from the outbound packet queue.
|
|
func (l *linkEndpoint) Read() *stack.PacketBuffer {
|
|
return l.q.Read()
|
|
}
|
|
|
|
// ReadContext does blocking read for one packet from the outbound packet queue.
|
|
// It can be cancelled by ctx, and in this case, it returns nil.
|
|
func (l *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
|
|
return l.q.ReadContext(ctx)
|
|
}
|
|
|
|
// Drain removes all outbound packets from the channel and counts them.
|
|
func (l *linkEndpoint) Drain() int {
|
|
c := 0
|
|
for pkt := l.Read(); pkt != nil; pkt = l.Read() {
|
|
pkt.DecRef()
|
|
c++
|
|
}
|
|
return c
|
|
}
|
|
|
|
// NumQueued returns the number of packets queued for outbound.
|
|
func (l *linkEndpoint) NumQueued() int {
|
|
return l.q.Num()
|
|
}
|
|
|
|
func (l *linkEndpoint) injectInbound(p *packet.Parsed) {
|
|
l.mu.RLock()
|
|
d := l.dispatcher
|
|
l.mu.RUnlock()
|
|
if d == nil {
|
|
return
|
|
}
|
|
pkt := gro.RXChecksumOffload(p)
|
|
if pkt == nil {
|
|
return
|
|
}
|
|
d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, pkt)
|
|
pkt.DecRef()
|
|
}
|
|
|
|
// Attach saves the stack network-layer dispatcher for use later when packets
|
|
// are injected.
|
|
func (l *linkEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.dispatcher = dispatcher
|
|
}
|
|
|
|
// IsAttached implements stack.LinkEndpoint.IsAttached.
|
|
func (l *linkEndpoint) IsAttached() bool {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.dispatcher != nil
|
|
}
|
|
|
|
// MTU implements stack.LinkEndpoint.MTU.
|
|
func (l *linkEndpoint) MTU() uint32 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.mtu
|
|
}
|
|
|
|
// SetMTU implements stack.LinkEndpoint.SetMTU.
|
|
func (l *linkEndpoint) SetMTU(mtu uint32) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.mtu = mtu
|
|
}
|
|
|
|
// Capabilities implements stack.LinkEndpoint.Capabilities.
|
|
func (l *linkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
|
|
// We are required to offload RX checksum validation for the purposes of
|
|
// GRO.
|
|
return stack.CapabilityRXChecksumOffload
|
|
}
|
|
|
|
// GSOMaxSize implements stack.GSOEndpoint.
|
|
func (*linkEndpoint) GSOMaxSize() uint32 {
|
|
// This an increase from 32k returned by channel.Endpoint.GSOMaxSize() to
|
|
// 64k, which improves throughput.
|
|
return (1 << 16) - 1
|
|
}
|
|
|
|
// SupportedGSO implements stack.GSOEndpoint.
|
|
func (l *linkEndpoint) SupportedGSO() stack.SupportedGSO {
|
|
return l.SupportedGSOKind
|
|
}
|
|
|
|
// MaxHeaderLength returns the maximum size of the link layer header. Given it
|
|
// doesn't have a header, it just returns 0.
|
|
func (*linkEndpoint) MaxHeaderLength() uint16 {
|
|
return 0
|
|
}
|
|
|
|
// LinkAddress returns the link address of this endpoint.
|
|
func (l *linkEndpoint) LinkAddress() tcpip.LinkAddress {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.linkAddr
|
|
}
|
|
|
|
// SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
|
|
func (l *linkEndpoint) SetLinkAddress(addr tcpip.LinkAddress) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.linkAddr = addr
|
|
}
|
|
|
|
// WritePackets stores outbound packets into the channel.
|
|
// Multiple concurrent calls are permitted.
|
|
func (l *linkEndpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
|
|
n := 0
|
|
// TODO(jwhited): evaluate writing a stack.PacketBufferList instead of a
|
|
// single packet. We can split 2 x 64K GSO across
|
|
// wireguard-go/conn.IdealBatchSize (128 slots) @ 1280 MTU, and non-GSO we
|
|
// could do more. Read API would need to change to take advantage. Verify
|
|
// gVisor limits around max number of segments packed together. Since we
|
|
// control MTU (and by effect TCP MSS in gVisor) we *shouldn't* expect to
|
|
// ever overflow 128 slots (see wireguard-go/tun.ErrTooManySegments usage).
|
|
for _, pkt := range pkts.AsSlice() {
|
|
if err := l.q.Write(pkt); err != nil {
|
|
if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
|
|
return 0, err
|
|
}
|
|
break
|
|
}
|
|
n++
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// Wait implements stack.LinkEndpoint.Wait.
|
|
func (*linkEndpoint) Wait() {}
|
|
|
|
// ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
|
|
func (*linkEndpoint) ARPHardwareType() header.ARPHardwareType {
|
|
return header.ARPHardwareNone
|
|
}
|
|
|
|
// AddHeader implements stack.LinkEndpoint.AddHeader.
|
|
func (*linkEndpoint) AddHeader(*stack.PacketBuffer) {}
|
|
|
|
// ParseHeader implements stack.LinkEndpoint.ParseHeader.
|
|
func (*linkEndpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
|
|
|
|
// SetOnCloseAction implements stack.LinkEndpoint.
|
|
func (*linkEndpoint) SetOnCloseAction(func()) {}
|