mirror of
https://github.com/tailscale/tailscale.git
synced 2025-10-23 17:18:42 +00:00
wgengine/magicsock: refactor batchingUDPConn to batchingConn interface (#13042)
This commit adds a batchingConn interface, and renames batchingUDPConn to linuxBatchingConn. tryUpgradeToBatchingConn() may return a platform- specific implementation of batchingConn. So far only a Linux implementation of this interface exists, but this refactor is being done in anticipation of a Windows implementation. Updates tailscale/corp#21874 Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/tailscale/wireguard-go/conn"
|
||||
"go4.org/mem"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.org/x/net/ipv6"
|
||||
|
||||
"tailscale.com/control/controlknobs"
|
||||
@@ -1101,12 +1100,6 @@ var errNoUDP = errors.New("no UDP available on platform")
|
||||
|
||||
var errUnsupportedConnType = errors.New("unsupported connection type")
|
||||
|
||||
var (
|
||||
// This acts as a compile-time check for our usage of ipv6.Message in
|
||||
// batchingUDPConn for both IPv6 and IPv4 operations.
|
||||
_ ipv6.Message = ipv4.Message{}
|
||||
)
|
||||
|
||||
func (c *Conn) sendUDPBatch(addr netip.AddrPort, buffs [][]byte) (sent bool, err error) {
|
||||
isIPv6 := false
|
||||
switch {
|
||||
@@ -2656,153 +2649,6 @@ func (c *Conn) ParseEndpoint(nodeKeyStr string) (conn.Endpoint, error) {
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
func (c *batchingUDPConn) writeBatch(msgs []ipv6.Message) error {
|
||||
var head int
|
||||
for {
|
||||
n, err := c.xpc.WriteBatch(msgs[head:], 0)
|
||||
if err != nil || n == len(msgs[head:]) {
|
||||
// Returning the number of packets written would require
|
||||
// unraveling individual msg len and gso size during a coalesced
|
||||
// write. The top of the call stack disregards partial success,
|
||||
// so keep this simple for now.
|
||||
return err
|
||||
}
|
||||
head += n
|
||||
}
|
||||
}
|
||||
|
||||
// splitCoalescedMessages splits coalesced messages from the tail of dst
|
||||
// beginning at index 'firstMsgAt' into the head of the same slice. It reports
|
||||
// the number of elements to evaluate in msgs for nonzero len (msgs[i].N). An
|
||||
// error is returned if a socket control message cannot be parsed or a split
|
||||
// operation would overflow msgs.
|
||||
func (c *batchingUDPConn) splitCoalescedMessages(msgs []ipv6.Message, firstMsgAt int) (n int, err error) {
|
||||
for i := firstMsgAt; i < len(msgs); i++ {
|
||||
msg := &msgs[i]
|
||||
if msg.N == 0 {
|
||||
return n, err
|
||||
}
|
||||
var (
|
||||
gsoSize int
|
||||
start int
|
||||
end = msg.N
|
||||
numToSplit = 1
|
||||
)
|
||||
gsoSize, err = c.getGSOSizeFromControl(msg.OOB[:msg.NN])
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
if gsoSize > 0 {
|
||||
numToSplit = (msg.N + gsoSize - 1) / gsoSize
|
||||
end = gsoSize
|
||||
}
|
||||
for j := 0; j < numToSplit; j++ {
|
||||
if n > i {
|
||||
return n, errors.New("splitting coalesced packet resulted in overflow")
|
||||
}
|
||||
copied := copy(msgs[n].Buffers[0], msg.Buffers[0][start:end])
|
||||
msgs[n].N = copied
|
||||
msgs[n].Addr = msg.Addr
|
||||
start = end
|
||||
end += gsoSize
|
||||
if end > msg.N {
|
||||
end = msg.N
|
||||
}
|
||||
n++
|
||||
}
|
||||
if i != n-1 {
|
||||
// It is legal for bytes to move within msg.Buffers[0] as a result
|
||||
// of splitting, so we only zero the source msg len when it is not
|
||||
// the destination of the last split operation above.
|
||||
msg.N = 0
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (c *batchingUDPConn) ReadBatch(msgs []ipv6.Message, flags int) (n int, err error) {
|
||||
if !c.rxOffload || len(msgs) < 2 {
|
||||
return c.xpc.ReadBatch(msgs, flags)
|
||||
}
|
||||
// Read into the tail of msgs, split into the head.
|
||||
readAt := len(msgs) - 2
|
||||
numRead, err := c.xpc.ReadBatch(msgs[readAt:], 0)
|
||||
if err != nil || numRead == 0 {
|
||||
return 0, err
|
||||
}
|
||||
return c.splitCoalescedMessages(msgs, readAt)
|
||||
}
|
||||
|
||||
func (c *batchingUDPConn) LocalAddr() net.Addr {
|
||||
return c.pc.LocalAddr().(*net.UDPAddr)
|
||||
}
|
||||
|
||||
func (c *batchingUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) {
|
||||
return c.pc.WriteToUDPAddrPort(b, addr)
|
||||
}
|
||||
|
||||
func (c *batchingUDPConn) Close() error {
|
||||
return c.pc.Close()
|
||||
}
|
||||
|
||||
// tryUpgradeToBatchingUDPConn probes the capabilities of the OS and pconn, and
|
||||
// upgrades pconn to a *batchingUDPConn if appropriate.
|
||||
func tryUpgradeToBatchingUDPConn(pconn nettype.PacketConn, network string, batchSize int) nettype.PacketConn {
|
||||
if network != "udp4" && network != "udp6" {
|
||||
return pconn
|
||||
}
|
||||
if runtime.GOOS != "linux" {
|
||||
return pconn
|
||||
}
|
||||
if strings.HasPrefix(hostinfo.GetOSVersion(), "2.") {
|
||||
// recvmmsg/sendmmsg were added in 2.6.33, but we support down to
|
||||
// 2.6.32 for old NAS devices. See https://github.com/tailscale/tailscale/issues/6807.
|
||||
// As a cheap heuristic: if the Linux kernel starts with "2", just
|
||||
// consider it too old for mmsg. Nobody who cares about performance runs
|
||||
// such ancient kernels. UDP offload was added much later, so no
|
||||
// upgrades are available.
|
||||
return pconn
|
||||
}
|
||||
uc, ok := pconn.(*net.UDPConn)
|
||||
if !ok {
|
||||
return pconn
|
||||
}
|
||||
b := &batchingUDPConn{
|
||||
pc: pconn,
|
||||
getGSOSizeFromControl: getGSOSizeFromControl,
|
||||
setGSOSizeInControl: setGSOSizeInControl,
|
||||
sendBatchPool: sync.Pool{
|
||||
New: func() any {
|
||||
ua := &net.UDPAddr{
|
||||
IP: make([]byte, 16),
|
||||
}
|
||||
msgs := make([]ipv6.Message, batchSize)
|
||||
for i := range msgs {
|
||||
msgs[i].Buffers = make([][]byte, 1)
|
||||
msgs[i].Addr = ua
|
||||
msgs[i].OOB = make([]byte, controlMessageSize)
|
||||
}
|
||||
return &sendBatch{
|
||||
ua: ua,
|
||||
msgs: msgs,
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
switch network {
|
||||
case "udp4":
|
||||
b.xpc = ipv4.NewPacketConn(uc)
|
||||
case "udp6":
|
||||
b.xpc = ipv6.NewPacketConn(uc)
|
||||
default:
|
||||
panic("bogus network")
|
||||
}
|
||||
var txOffload bool
|
||||
txOffload, b.rxOffload = tryEnableUDPOffload(uc)
|
||||
b.txOffload.Store(txOffload)
|
||||
return b
|
||||
}
|
||||
|
||||
func newBlockForeverConn() *blockForeverConn {
|
||||
c := new(blockForeverConn)
|
||||
c.cond = sync.NewCond(&c.mu)
|
||||
|
Reference in New Issue
Block a user