David Anderson cb96b14bf4 net/packet: remove the custom IP4/IP6 types in favor of netaddr.IP.
Upstream netaddr has a change that makes it alloc-free, so it's safe to
use in hot codepaths. This gets rid of one of the many IP types in our
codebase.

Performance is currently worse across the board. This is likely due in
part to netaddr.IP being a larger value type (4b -> 24b for IPv4,
16b -> 24b for IPv6), and in other part due to missing low-hanging fruit
optimizations in netaddr. However, the regression is less bad than
it looks at first glance, because we'd micro-optimized packet.IP* in
the past few weeks. This change drops us back to roughly where we
were at the 1.2 release, but with the benefit of a significant
code and architectural simplification.

name                   old time/op    new time/op    delta
pkg:tailscale.com/net/packet goos:linux goarch:amd64
Decode/tcp4-8            12.2ns ± 5%    29.7ns ± 2%  +142.32%  (p=0.008 n=5+5)
Decode/tcp6-8            12.6ns ± 3%    65.1ns ± 2%  +418.47%  (p=0.008 n=5+5)
Decode/udp4-8            11.8ns ± 3%    30.5ns ± 2%  +157.94%  (p=0.008 n=5+5)
Decode/udp6-8            27.1ns ± 1%    65.7ns ± 2%  +142.36%  (p=0.016 n=4+5)
Decode/icmp4-8           24.6ns ± 2%    30.5ns ± 2%   +23.65%  (p=0.016 n=4+5)
Decode/icmp6-8           22.9ns ±51%    65.5ns ± 2%  +186.19%  (p=0.008 n=5+5)
Decode/igmp-8            18.1ns ±44%    30.2ns ± 1%   +66.89%  (p=0.008 n=5+5)
Decode/unknown-8         20.8ns ± 1%    10.6ns ± 9%   -49.11%  (p=0.016 n=4+5)
pkg:tailscale.com/wgengine/filter goos:linux goarch:amd64
Filter/icmp4-8           30.5ns ± 1%    77.9ns ± 3%  +155.01%  (p=0.008 n=5+5)
Filter/tcp4_syn_in-8     43.7ns ± 3%   123.0ns ± 3%  +181.72%  (p=0.008 n=5+5)
Filter/tcp4_syn_out-8    24.5ns ± 2%    45.7ns ± 6%   +86.22%  (p=0.008 n=5+5)
Filter/udp4_in-8         64.8ns ± 1%   210.0ns ± 2%  +223.87%  (p=0.008 n=5+5)
Filter/udp4_out-8         119ns ± 0%     278ns ± 0%  +133.78%  (p=0.016 n=4+5)
Filter/icmp6-8           40.3ns ± 2%   204.4ns ± 4%  +407.70%  (p=0.008 n=5+5)
Filter/tcp6_syn_in-8     35.3ns ± 3%   199.2ns ± 2%  +464.95%  (p=0.008 n=5+5)
Filter/tcp6_syn_out-8    32.8ns ± 2%    81.0ns ± 2%  +147.10%  (p=0.008 n=5+5)
Filter/udp6_in-8          106ns ± 2%     290ns ± 2%  +174.48%  (p=0.008 n=5+5)
Filter/udp6_out-8         184ns ± 2%     314ns ± 3%   +70.43%  (p=0.016 n=4+5)
pkg:tailscale.com/wgengine/tstun goos:linux goarch:amd64
Write-8                  9.02ns ± 3%    8.92ns ± 1%      ~     (p=0.421 n=5+5)

name                   old alloc/op   new alloc/op   delta
pkg:tailscale.com/net/packet goos:linux goarch:amd64
Decode/tcp4-8             0.00B          0.00B           ~     (all equal)
Decode/tcp6-8             0.00B          0.00B           ~     (all equal)
Decode/udp4-8             0.00B          0.00B           ~     (all equal)
Decode/udp6-8             0.00B          0.00B           ~     (all equal)
Decode/icmp4-8            0.00B          0.00B           ~     (all equal)
Decode/icmp6-8            0.00B          0.00B           ~     (all equal)
Decode/igmp-8             0.00B          0.00B           ~     (all equal)
Decode/unknown-8          0.00B          0.00B           ~     (all equal)
pkg:tailscale.com/wgengine/filter goos:linux goarch:amd64
Filter/icmp4-8            0.00B          0.00B           ~     (all equal)
Filter/tcp4_syn_in-8      0.00B          0.00B           ~     (all equal)
Filter/tcp4_syn_out-8     0.00B          0.00B           ~     (all equal)
Filter/udp4_in-8          0.00B          0.00B           ~     (all equal)
Filter/udp4_out-8         16.0B ± 0%     64.0B ± 0%  +300.00%  (p=0.008 n=5+5)
Filter/icmp6-8            0.00B          0.00B           ~     (all equal)
Filter/tcp6_syn_in-8      0.00B          0.00B           ~     (all equal)
Filter/tcp6_syn_out-8     0.00B          0.00B           ~     (all equal)
Filter/udp6_in-8          0.00B          0.00B           ~     (all equal)
Filter/udp6_out-8         48.0B ± 0%     64.0B ± 0%   +33.33%  (p=0.008 n=5+5)

name                   old allocs/op  new allocs/op  delta
pkg:tailscale.com/net/packet goos:linux goarch:amd64
Decode/tcp4-8              0.00           0.00           ~     (all equal)
Decode/tcp6-8              0.00           0.00           ~     (all equal)
Decode/udp4-8              0.00           0.00           ~     (all equal)
Decode/udp6-8              0.00           0.00           ~     (all equal)
Decode/icmp4-8             0.00           0.00           ~     (all equal)
Decode/icmp6-8             0.00           0.00           ~     (all equal)
Decode/igmp-8              0.00           0.00           ~     (all equal)
Decode/unknown-8           0.00           0.00           ~     (all equal)
pkg:tailscale.com/wgengine/filter goos:linux goarch:amd64
Filter/icmp4-8             0.00           0.00           ~     (all equal)
Filter/tcp4_syn_in-8       0.00           0.00           ~     (all equal)
Filter/tcp4_syn_out-8      0.00           0.00           ~     (all equal)
Filter/udp4_in-8           0.00           0.00           ~     (all equal)
Filter/udp4_out-8          1.00 ± 0%      1.00 ± 0%      ~     (all equal)
Filter/icmp6-8             0.00           0.00           ~     (all equal)
Filter/tcp6_syn_in-8       0.00           0.00           ~     (all equal)
Filter/tcp6_syn_out-8      0.00           0.00           ~     (all equal)
Filter/udp6_in-8           0.00           0.00           ~     (all equal)
Filter/udp6_out-8          1.00 ± 0%      1.00 ± 0%      ~     (all equal)

Signed-off-by: David Anderson <danderson@tailscale.com>
2020-12-22 14:44:36 -08:00

422 lines
12 KiB
Go

// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package tstun provides a TUN struct implementing the tun.Device interface
// with additional features as required by wgengine.
package tstun
import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
"time"
"github.com/tailscale/wireguard-go/device"
"github.com/tailscale/wireguard-go/tun"
"inet.af/netaddr"
"tailscale.com/net/packet"
"tailscale.com/types/logger"
"tailscale.com/wgengine/filter"
)
const maxBufferSize = device.MaxMessageSize
// PacketStartOffset is the minimal amount of leading space that must exist
// before &packet[offset] in a packet passed to Read, Write, or InjectInboundDirect.
// This is necessary to avoid reallocation in wireguard-go internals.
const PacketStartOffset = device.MessageTransportHeaderSize
// MaxPacketSize is the maximum size (in bytes)
// of a packet that can be injected into a tstun.TUN.
const MaxPacketSize = device.MaxContentSize
var (
// ErrClosed is returned when attempting an operation on a closed TUN.
ErrClosed = errors.New("device closed")
// ErrFiltered is returned when the acted-on packet is rejected by a filter.
ErrFiltered = errors.New("packet dropped by filter")
)
var (
errPacketTooBig = errors.New("packet too big")
errOffsetTooBig = errors.New("offset larger than buffer length")
errOffsetTooSmall = errors.New("offset smaller than PacketStartOffset")
)
// parsedPacketPool holds a pool of Parsed structs for use in filtering.
// This is needed because escape analysis cannot see that parsed packets
// do not escape through {Pre,Post}Filter{In,Out}.
var parsedPacketPool = sync.Pool{New: func() interface{} { return new(packet.Parsed) }}
// FilterFunc is a packet-filtering function with access to the TUN device.
// It must not hold onto the packet struct, as its backing storage will be reused.
type FilterFunc func(*packet.Parsed, *TUN) filter.Response
// TUN wraps a tun.Device from wireguard-go,
// augmenting it with filtering and packet injection.
// All the added work happens in Read and Write:
// the other methods delegate to the underlying tdev.
type TUN struct {
logf logger.Logf
// tdev is the underlying TUN device.
tdev tun.Device
closeOnce sync.Once
lastActivityAtomic int64 // unix seconds of last send or receive
destIPActivity atomic.Value // of map[netaddr.IP]func()
// buffer stores the oldest unconsumed packet from tdev.
// It is made a static buffer in order to avoid allocations.
buffer [maxBufferSize]byte
// bufferConsumed synchronizes access to buffer (shared by Read and poll).
bufferConsumed chan struct{}
// closed signals poll (by closing) when the device is closed.
closed chan struct{}
// errors is the error queue populated by poll.
errors chan error
// outbound is the queue by which packets leave the TUN device.
//
// The directions are relative to the network, not the device:
// inbound packets arrive via UDP and are written into the TUN device;
// outbound packets are read from the TUN device and sent out via UDP.
// This queue is needed because although inbound writes are synchronous,
// the other direction must wait on a Wireguard goroutine to poll it.
//
// Empty reads are skipped by Wireguard, so it is always legal
// to discard an empty packet instead of sending it through t.outbound.
outbound chan []byte
// fitler stores the currently active package filter
filter atomic.Value // of *filter.Filter
// filterFlags control the verbosity of logging packet drops/accepts.
filterFlags filter.RunFlags
// PreFilterIn is the inbound filter function that runs before the main filter
// and therefore sees the packets that may be later dropped by it.
PreFilterIn FilterFunc
// PostFilterIn is the inbound filter function that runs after the main filter.
PostFilterIn FilterFunc
// PreFilterOut is the outbound filter function that runs before the main filter
// and therefore sees the packets that may be later dropped by it.
PreFilterOut FilterFunc
// PostFilterOut is the outbound filter function that runs after the main filter.
PostFilterOut FilterFunc
// disableFilter disables all filtering when set. This should only be used in tests.
disableFilter bool
}
func WrapTUN(logf logger.Logf, tdev tun.Device) *TUN {
tun := &TUN{
logf: logger.WithPrefix(logf, "tstun: "),
tdev: tdev,
// bufferConsumed is conceptually a condition variable:
// a goroutine should not block when setting it, even with no listeners.
bufferConsumed: make(chan struct{}, 1),
closed: make(chan struct{}),
errors: make(chan error),
outbound: make(chan []byte),
// TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets.
filterFlags: filter.LogAccepts | filter.LogDrops,
}
go tun.poll()
// The buffer starts out consumed.
tun.bufferConsumed <- struct{}{}
return tun
}
// SetDestIPActivityFuncs sets a map of funcs to run per packet
// destination (the map keys).
//
// The map ownership passes to the TUN. It must be non-nil.
func (t *TUN) SetDestIPActivityFuncs(m map[netaddr.IP]func()) {
t.destIPActivity.Store(m)
}
func (t *TUN) Close() error {
var err error
t.closeOnce.Do(func() {
// Other channels need not be closed: poll will exit gracefully after this.
close(t.closed)
err = t.tdev.Close()
})
return err
}
func (t *TUN) Events() chan tun.Event {
return t.tdev.Events()
}
func (t *TUN) File() *os.File {
return t.tdev.File()
}
func (t *TUN) Flush() error {
return t.tdev.Flush()
}
func (t *TUN) MTU() (int, error) {
return t.tdev.MTU()
}
func (t *TUN) Name() (string, error) {
return t.tdev.Name()
}
// poll polls t.tdev.Read, placing the oldest unconsumed packet into t.buffer.
// This is needed because t.tdev.Read in general may block (it does on Windows),
// so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly.
func (t *TUN) poll() {
for {
select {
case <-t.closed:
return
case <-t.bufferConsumed:
// continue
}
// Read may use memory in t.buffer before PacketStartOffset for mandatory headers.
// This is the rationale behind the tun.TUN.{Read,Write} interfaces
// and the reason t.buffer has size MaxMessageSize and not MaxContentSize.
n, err := t.tdev.Read(t.buffer[:], PacketStartOffset)
if err != nil {
select {
case <-t.closed:
return
case t.errors <- err:
// In principle, read errors are not fatal (but wireguard-go disagrees).
t.bufferConsumed <- struct{}{}
}
continue
}
// Wireguard will skip an empty read,
// so we might as well do it here to avoid the send through t.outbound.
if n == 0 {
t.bufferConsumed <- struct{}{}
continue
}
select {
case <-t.closed:
return
case t.outbound <- t.buffer[PacketStartOffset : PacketStartOffset+n]:
// continue
}
}
}
func (t *TUN) filterOut(p *packet.Parsed) filter.Response {
if t.PreFilterOut != nil {
if t.PreFilterOut(p, t) == filter.Drop {
return filter.Drop
}
}
filt, _ := t.filter.Load().(*filter.Filter)
if filt == nil {
return filter.Drop
}
if filt.RunOut(p, t.filterFlags) != filter.Accept {
return filter.Drop
}
if t.PostFilterOut != nil {
if t.PostFilterOut(p, t) == filter.Drop {
return filter.Drop
}
}
return filter.Accept
}
// noteActivity records that there was a read or write at the current time.
func (t *TUN) noteActivity() {
atomic.StoreInt64(&t.lastActivityAtomic, time.Now().Unix())
}
// IdleDuration reports how long it's been since the last read or write to this device.
//
// Its value is only accurate to roughly second granularity.
// If there's never been activity, the duration is since 1970.
func (t *TUN) IdleDuration() time.Duration {
sec := atomic.LoadInt64(&t.lastActivityAtomic)
return time.Since(time.Unix(sec, 0))
}
func (t *TUN) Read(buf []byte, offset int) (int, error) {
var n int
select {
case <-t.closed:
return 0, io.EOF
case err := <-t.errors:
return 0, err
case packet := <-t.outbound:
n = copy(buf[offset:], packet)
// t.buffer has a fixed location in memory,
// so this is the easiest way to tell when it has been consumed.
// &packet[0] can be used because empty packets do not reach t.outbound.
if &packet[0] == &t.buffer[PacketStartOffset] {
t.bufferConsumed <- struct{}{}
} else {
// If the packet is not from t.buffer, then it is an injected packet.
// In this case, we return early to bypass filtering
t.noteActivity()
return n, nil
}
}
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf[offset : offset+n])
if m, ok := t.destIPActivity.Load().(map[netaddr.IP]func()); ok {
if fn := m[p.Dst.IP]; fn != nil {
fn()
}
}
if !t.disableFilter {
response := t.filterOut(p)
if response != filter.Accept {
// Wireguard considers read errors fatal; pretend nothing was read
return 0, nil
}
}
t.noteActivity()
return n, nil
}
func (t *TUN) filterIn(buf []byte) filter.Response {
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf)
if t.PreFilterIn != nil {
if t.PreFilterIn(p, t) == filter.Drop {
return filter.Drop
}
}
filt, _ := t.filter.Load().(*filter.Filter)
if filt == nil {
return filter.Drop
}
if filt.RunIn(p, t.filterFlags) != filter.Accept {
return filter.Drop
}
if t.PostFilterIn != nil {
if t.PostFilterIn(p, t) == filter.Drop {
return filter.Drop
}
}
return filter.Accept
}
func (t *TUN) Write(buf []byte, offset int) (int, error) {
if !t.disableFilter {
response := t.filterIn(buf[offset:])
if response != filter.Accept {
return 0, ErrFiltered
}
}
t.noteActivity()
return t.tdev.Write(buf, offset)
}
func (t *TUN) GetFilter() *filter.Filter {
filt, _ := t.filter.Load().(*filter.Filter)
return filt
}
func (t *TUN) SetFilter(filt *filter.Filter) {
t.filter.Store(filt)
}
// InjectInboundDirect makes the TUN device behave as if a packet
// with the given contents was received from the network.
// It blocks and does not take ownership of the packet.
// The injected packet will not pass through inbound filters.
//
// The packet contents are to start at &buf[offset].
// offset must be greater or equal to PacketStartOffset.
// The space before &buf[offset] will be used by Wireguard.
func (t *TUN) InjectInboundDirect(buf []byte, offset int) error {
if len(buf) > MaxPacketSize {
return errPacketTooBig
}
if len(buf) < offset {
return errOffsetTooBig
}
if offset < PacketStartOffset {
return errOffsetTooSmall
}
// Write to the underlying device to skip filters.
_, err := t.tdev.Write(buf, offset)
return err
}
// InjectInboundCopy takes a packet without leading space,
// reallocates it to conform to the InjectInboundDirect interface
// and calls InjectInboundDirect on it. Injecting a nil packet is a no-op.
func (t *TUN) InjectInboundCopy(packet []byte) error {
// We duplicate this check from InjectInboundDirect here
// to avoid wasting an allocation on an oversized packet.
if len(packet) > MaxPacketSize {
return errPacketTooBig
}
if len(packet) == 0 {
return nil
}
buf := make([]byte, PacketStartOffset+len(packet))
copy(buf[PacketStartOffset:], packet)
return t.InjectInboundDirect(buf, PacketStartOffset)
}
// InjectOutbound makes the TUN device behave as if a packet
// with the given contents was sent to the network.
// It does not block, but takes ownership of the packet.
// The injected packet will not pass through outbound filters.
// Injecting an empty packet is a no-op.
func (t *TUN) InjectOutbound(packet []byte) error {
if len(packet) > MaxPacketSize {
return errPacketTooBig
}
if len(packet) == 0 {
return nil
}
select {
case <-t.closed:
return ErrClosed
case t.outbound <- packet:
return nil
}
}
// Unwrap returns the underlying TUN device.
func (t *TUN) Unwrap() tun.Device {
return t.tdev
}