net/tstun: remove multi-case selects from hot code

Every TUN Read went through several multi-case selects.
We know from past experience with wireguard-go that these are slow
and cause scheduler churn.

The selects served two purposes: they separated errors from data and
gracefully handled shutdown. The first is fairly easy to replace by sending
errors and data over a single channel. The second, less so.

We considered a few approaches: Intricate webs of channels,
global condition variables. They all get ugly fast.

Instead, let's embrace the ugly and handle shutdown ungracefully.
It's horrible, but the horror is simple and localized.

Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
Josh Bleecher Snyder 2021-07-01 14:45:17 -07:00 committed by Josh Bleecher Snyder
parent 64ee6cf64b
commit cc23049cd2

View File

@ -72,12 +72,14 @@ type Wrapper struct {
// It is made a static buffer in order to avoid allocations. // It is made a static buffer in order to avoid allocations.
buffer [maxBufferSize]byte buffer [maxBufferSize]byte
// bufferConsumed synchronizes access to buffer (shared by Read and poll). // bufferConsumed synchronizes access to buffer (shared by Read and poll).
//
// Close closes bufferConsumed. There may be outstanding sends to bufferConsumed
// when that happens; we catch any resulting panics.
// This lets us avoid expensive multi-case selects.
bufferConsumed chan struct{} bufferConsumed chan struct{}
// closed signals poll (by closing) when the device is closed. // closed signals poll (by closing) when the device is closed.
closed chan struct{} closed chan struct{}
// errors is the error queue populated by poll.
errors chan error
// outbound is the queue by which packets leave the TUN device. // outbound is the queue by which packets leave the TUN device.
// //
// The directions are relative to the network, not the device: // The directions are relative to the network, not the device:
@ -88,7 +90,11 @@ type Wrapper struct {
// //
// Empty reads are skipped by Wireguard, so it is always legal // Empty reads are skipped by Wireguard, so it is always legal
// to discard an empty packet instead of sending it through t.outbound. // to discard an empty packet instead of sending it through t.outbound.
outbound chan []byte //
// Close closes outbound. There may be outstanding sends to outbound
// when that happens; we catch any resulting panics.
// This lets us avoid expensive multi-case selects.
outbound chan tunReadResult
// eventsUpDown yields up and down tun.Events that arrive on a Wrapper's events channel. // eventsUpDown yields up and down tun.Events that arrive on a Wrapper's events channel.
eventsUpDown chan tun.Event eventsUpDown chan tun.Event
@ -125,6 +131,14 @@ type Wrapper struct {
disableTSMPRejected bool disableTSMPRejected bool
} }
// tunReadResult is the result of a TUN read: Some data and an error.
// The byte slice is not interpreted in the usual way for a Read method.
// See the comment in the middle of Wrap.Read.
type tunReadResult struct {
data []byte
err error
}
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
tun := &Wrapper{ tun := &Wrapper{
logf: logger.WithPrefix(logf, "tstun: "), logf: logger.WithPrefix(logf, "tstun: "),
@ -133,8 +147,7 @@ func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
// a goroutine should not block when setting it, even with no listeners. // a goroutine should not block when setting it, even with no listeners.
bufferConsumed: make(chan struct{}, 1), bufferConsumed: make(chan struct{}, 1),
closed: make(chan struct{}), closed: make(chan struct{}),
errors: make(chan error), outbound: make(chan tunReadResult),
outbound: make(chan []byte),
eventsUpDown: make(chan tun.Event), eventsUpDown: make(chan tun.Event),
eventsOther: make(chan tun.Event), eventsOther: make(chan tun.Event),
// TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets. // TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets.
@ -160,9 +173,9 @@ func (t *Wrapper) SetDestIPActivityFuncs(m map[netaddr.IP]func()) {
func (t *Wrapper) Close() error { func (t *Wrapper) Close() error {
var err error var err error
t.closeOnce.Do(func() { t.closeOnce.Do(func() {
// Other channels need not be closed: poll will exit gracefully after this.
close(t.closed) close(t.closed)
close(t.bufferConsumed)
close(t.outbound)
err = t.tdev.Close() err = t.tdev.Close()
}) })
return err return err
@ -230,30 +243,40 @@ func (t *Wrapper) Name() (string, error) {
return t.tdev.Name() return t.tdev.Name()
} }
// allowSendOnClosedChannel suppresses panics due to sending on a closed channel.
// This allows us to avoid synchronization between poll and Close.
// Such synchronization (particularly multi-case selects) is too expensive
// for code like poll or Read that is on the hot path of every packet.
// If this makes you sad or angry, you may want to join our
// weekly Go Performance Delinquents Anonymous meetings on Monday nights.
func allowSendOnClosedChannel() {
r := recover()
if r == nil {
return
}
e, _ := r.(error)
if e != nil && e.Error() == "send on closed channel" {
return
}
panic(r)
}
// poll polls t.tdev.Read, placing the oldest unconsumed packet into t.buffer. // poll polls t.tdev.Read, placing the oldest unconsumed packet into t.buffer.
// This is needed because t.tdev.Read in general may block (it does on Windows), // This is needed because t.tdev.Read in general may block (it does on Windows),
// so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly. // so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly.
func (t *Wrapper) poll() { func (t *Wrapper) poll() {
defer allowSendOnClosedChannel() // for send to t.outbound
for { for {
select { <-t.bufferConsumed
case <-t.closed:
return
case <-t.bufferConsumed:
// continue
}
// Read may use memory in t.buffer before PacketStartOffset for mandatory headers. // Read may use memory in t.buffer before PacketStartOffset for mandatory headers.
// This is the rationale behind the tun.Wrapper.{Read,Write} interfaces // This is the rationale behind the tun.Wrapper.{Read,Write} interfaces
// and the reason t.buffer has size MaxMessageSize and not MaxContentSize. // and the reason t.buffer has size MaxMessageSize and not MaxContentSize.
n, err := t.tdev.Read(t.buffer[:], PacketStartOffset) n, err := t.tdev.Read(t.buffer[:], PacketStartOffset)
if err != nil { if err != nil {
select { t.outbound <- tunReadResult{err: err}
case <-t.closed: // In principle, read errors are not fatal (but wireguard-go disagrees).
return t.bufferConsumed <- struct{}{}
case t.errors <- err:
// In principle, read errors are not fatal (but wireguard-go disagrees).
t.bufferConsumed <- struct{}{}
}
continue continue
} }
@ -264,12 +287,7 @@ func (t *Wrapper) poll() {
continue continue
} }
select { t.outbound <- tunReadResult{data: t.buffer[PacketStartOffset : PacketStartOffset+n]}
case <-t.closed:
return
case t.outbound <- t.buffer[PacketStartOffset : PacketStartOffset+n]:
// continue
}
} }
} }
@ -325,26 +343,26 @@ func (t *Wrapper) IdleDuration() time.Duration {
} }
func (t *Wrapper) Read(buf []byte, offset int) (int, error) { func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
var n int res, ok := <-t.outbound
if !ok {
wasInjectedPacket := false // Wrapper is closed.
select {
case <-t.closed:
return 0, io.EOF return 0, io.EOF
case err := <-t.errors: }
return 0, err if res.err != nil {
case pkt := <-t.outbound: return 0, res.err
n = copy(buf[offset:], pkt) }
// t.buffer has a fixed location in memory, defer allowSendOnClosedChannel() // for send to t.bufferConsumed
// so this is the easiest way to tell when it has been consumed. pkt := res.data
// &pkt[0] can be used because empty packets do not reach t.outbound. n := copy(buf[offset:], pkt)
if &pkt[0] == &t.buffer[PacketStartOffset] { wasInjectedPacket := false
t.bufferConsumed <- struct{}{} // t.buffer has a fixed location in memory,
} else { // so this is the easiest way to tell when it has been consumed.
// If the packet is not from t.buffer, then it is an injected packet. // &pkt[0] can be used because empty packets do not reach t.outbound.
wasInjectedPacket = true if &pkt[0] == &t.buffer[PacketStartOffset] {
} t.bufferConsumed <- struct{}{}
} else {
// If the packet is not from t.buffer, then it is an injected packet.
wasInjectedPacket = true
} }
p := parsedPacketPool.Get().(*packet.Parsed) p := parsedPacketPool.Get().(*packet.Parsed)
@ -566,12 +584,9 @@ func (t *Wrapper) InjectOutbound(packet []byte) error {
if len(packet) == 0 { if len(packet) == 0 {
return nil return nil
} }
select { defer allowSendOnClosedChannel() // for send to t.outbound
case <-t.closed: t.outbound <- tunReadResult{data: packet}
return ErrClosed return nil
case t.outbound <- packet:
return nil
}
} }
// Unwrap returns the underlying tun.Device. // Unwrap returns the underlying tun.Device.