From 6d895708602bd8c623c0f4f4b6a90fc5ce496e62 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 May 2020 06:44:51 -0500 Subject: [PATCH] eliminate most sync.Pool use, gives a safer but slightly slower interface --- src/crypto/crypto.go | 18 ++++++++++++------ src/tuntap/conn.go | 5 ----- src/tuntap/iface.go | 16 +++++++--------- src/util/bytes_mobile.go | 21 --------------------- src/util/bytes_other.go | 18 ------------------ src/yggdrasil/conn.go | 6 +++--- src/yggdrasil/link.go | 4 ---- src/yggdrasil/packetqueue.go | 4 +--- src/yggdrasil/peer.go | 3 --- src/yggdrasil/pool.go | 20 ++++++++++++++++++++ src/yggdrasil/router.go | 4 ---- src/yggdrasil/session.go | 15 +-------------- src/yggdrasil/stream.go | 7 ++++--- src/yggdrasil/wire.go | 8 +++++--- 14 files changed, 53 insertions(+), 96 deletions(-) delete mode 100644 src/util/bytes_mobile.go delete mode 100644 src/util/bytes_other.go create mode 100644 src/yggdrasil/pool.go diff --git a/src/crypto/crypto.go b/src/crypto/crypto.go index 211a0e54..595e6f40 100644 --- a/src/crypto/crypto.go +++ b/src/crypto/crypto.go @@ -17,12 +17,11 @@ import ( "crypto/rand" "crypto/sha512" "encoding/hex" + "sync" "golang.org/x/crypto/curve25519" "golang.org/x/crypto/ed25519" "golang.org/x/crypto/nacl/box" - - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) //////////////////////////////////////////////////////////////////////////////// @@ -225,29 +224,36 @@ func GetSharedKey(myPrivKey *BoxPrivKey, return (*BoxSharedKey)(&shared) } +// pool is used internally by BoxOpen and BoxSeal to avoid allocating temporary space +var pool = sync.Pool{New: func() interface{} { return []byte(nil) }} + // BoxOpen returns a message and true if it successfully opens a crypto box using the provided shared key and nonce. +// The boxed input slice's backing array is reused for the unboxed output when possible. func BoxOpen(shared *BoxSharedKey, boxed []byte, nonce *BoxNonce) ([]byte, bool) { - out := util.GetBytes() s := (*[BoxSharedKeyLen]byte)(shared) n := (*[BoxNonceLen]byte)(nonce) - unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s) + temp := append(pool.Get().([]byte), boxed...) + unboxed, success := box.OpenAfterPrecomputation(boxed[:0], temp, n, s) + pool.Put(temp[:0]) return unboxed, success } // BoxSeal seals a crypto box using the provided shared key, returning the box and the nonce needed to decrypt it. // If nonce is nil, a random BoxNonce will be used and returned. // If nonce is non-nil, then nonce.Increment() will be called before using it, and the incremented BoxNonce is what is returned. +// The unboxed input slice's backing array is reused for the boxed output when possible. func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *BoxNonce) { if nonce == nil { nonce = NewBoxNonce() } nonce.Increment() - out := util.GetBytes() s := (*[BoxSharedKeyLen]byte)(shared) n := (*[BoxNonceLen]byte)(nonce) - boxed := box.SealAfterPrecomputation(out, unboxed, n, s) + temp := append(pool.Get().([]byte), unboxed...) + boxed := box.SealAfterPrecomputation(unboxed[:0], temp, n, s) + pool.Put(temp[:0]) return boxed, nonce } diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index 24ea5ef3..ddd89e9b 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -44,13 +44,11 @@ func (s *tunConn) _read(bs []byte) (err error) { select { case <-s.stop: err = errors.New("session was already closed") - util.PutBytes(bs) return default: } if len(bs) == 0 { err = errors.New("read packet with 0 size") - util.PutBytes(bs) return } ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40 @@ -107,7 +105,6 @@ func (s *tunConn) _read(bs []byte) (err error) { } if skip { err = errors.New("address not allowed") - util.PutBytes(bs) return } s.tun.writer.writeFrom(s, bs) @@ -125,7 +122,6 @@ func (s *tunConn) _write(bs []byte) (err error) { select { case <-s.stop: err = errors.New("session was already closed") - util.PutBytes(bs) return default: } @@ -183,7 +179,6 @@ func (s *tunConn) _write(bs []byte) (err error) { } if skip { err = errors.New("address not allowed") - util.PutBytes(bs) return } msg := yggdrasil.FlowKeyMessage{ diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 1e5902e8..9250665a 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -3,7 +3,6 @@ package tuntap import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/Arceliar/phony" @@ -14,6 +13,7 @@ const TUN_OFFSET_BYTES = 4 type tunWriter struct { phony.Inbox tun *TunAdapter + buf [TUN_OFFSET_BYTES + 65536]byte } func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { @@ -25,15 +25,13 @@ func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { // write is pretty loose with the memory safety rules, e.g. it assumes it can // read w.tun.iface.IsTap() safely func (w *tunWriter) _write(b []byte) { - defer util.PutBytes(b) var written int var err error n := len(b) if n == 0 { return } - temp := append(util.ResizeBytes(util.GetBytes(), TUN_OFFSET_BYTES), b...) - defer util.PutBytes(temp) + temp := append(w.buf[:TUN_OFFSET_BYTES], b...) written, err = w.tun.iface.Write(temp, TUN_OFFSET_BYTES) if err != nil { w.tun.Act(w, func() { @@ -51,22 +49,23 @@ func (w *tunWriter) _write(b []byte) { type tunReader struct { phony.Inbox tun *TunAdapter + buf [TUN_OFFSET_BYTES + 65536]byte } func (r *tunReader) _read() { // Get a slice to store the packet in - recvd := util.ResizeBytes(util.GetBytes(), int(r.tun.mtu)+TUN_OFFSET_BYTES) // Wait for a packet to be delivered to us through the TUN adapter - n, err := r.tun.iface.Read(recvd, TUN_OFFSET_BYTES) + n, err := r.tun.iface.Read(r.buf[:], TUN_OFFSET_BYTES) if n <= TUN_OFFSET_BYTES || err != nil { r.tun.log.Errorln("Error reading TUN:", err) ferr := r.tun.iface.Flush() if ferr != nil { r.tun.log.Errorln("Unable to flush packets:", ferr) } - util.PutBytes(recvd) } else { - r.tun.handlePacketFrom(r, recvd[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES], err) + bs := make([]byte, n, n+crypto.BoxOverhead) // extra capacity for later... + copy(bs, r.buf[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES]) + r.tun.handlePacketFrom(r, bs, err) } if err == nil { // Now read again @@ -175,7 +174,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { _, known := tun.dials[dstString] tun.dials[dstString] = append(tun.dials[dstString], bs) for len(tun.dials[dstString]) > 32 { - util.PutBytes(tun.dials[dstString][0]) tun.dials[dstString] = tun.dials[dstString][1:] } if !known { diff --git a/src/util/bytes_mobile.go b/src/util/bytes_mobile.go deleted file mode 100644 index f862c0cd..00000000 --- a/src/util/bytes_mobile.go +++ /dev/null @@ -1,21 +0,0 @@ -//+build mobile - -package util - -import "runtime/debug" - -func init() { - debug.SetGCPercent(25) -} - -// GetBytes always returns a nil slice on mobile platforms. -func GetBytes() []byte { - return nil -} - -// PutBytes does literally nothing on mobile platforms. -// This is done rather than keeping a free list of bytes on platforms with memory constraints. -// It's needed to help keep memory usage low enough to fall under the limits set for e.g. iOS NEPacketTunnelProvider apps. -func PutBytes(bs []byte) { - return -} diff --git a/src/util/bytes_other.go b/src/util/bytes_other.go deleted file mode 100644 index 7c966087..00000000 --- a/src/util/bytes_other.go +++ /dev/null @@ -1,18 +0,0 @@ -//+build !mobile - -package util - -import "sync" - -// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops. -var byteStore = sync.Pool{New: func() interface{} { return []byte(nil) }} - -// GetBytes returns a 0-length (possibly nil) slice of bytes from a free list, so it may have a larger capacity. -func GetBytes() []byte { - return byteStore.Get().([]byte)[:0] -} - -// PutBytes stores a slice in a free list, where it can potentially be reused to prevent future allocations. -func PutBytes(bs []byte) { - byteStore.Put(bs) -} diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index eef57683..ae34e45a 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -252,7 +252,6 @@ func (c *Conn) Read(b []byte) (int, error) { } // Copy results to the output slice and clean up copy(b, bs) - util.PutBytes(bs) // Return the number of bytes copied to the slice, along with any error return n, err } @@ -323,10 +322,11 @@ func (c *Conn) writeNoCopy(msg FlowKeyMessage) error { // returned. func (c *Conn) Write(b []byte) (int, error) { written := len(b) - msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)} + bs := make([]byte, 0, len(b)+crypto.BoxOverhead) + bs = append(bs, b...) + msg := FlowKeyMessage{Message: bs} err := c.writeNoCopy(msg) if err != nil { - util.PutBytes(msg.Message) written = 0 } return written, err diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 78986286..733b9ac1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -406,10 +406,6 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool w.intf.notifySending(size, isLinkTraffic) w.intf.msgIO.writeMsgs(bss) w.intf.notifySent(size, isLinkTraffic) - // Cleanup - for _, bs := range bss { - util.PutBytes(bs) - } }) } diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index ff717258..2000ffa6 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -2,8 +2,6 @@ package yggdrasil import ( "time" - - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) // TODO take max size from config @@ -59,7 +57,7 @@ func (q *packetQueue) cleanup() { worstStream.infos = worstStream.infos[1:] worstStream.size -= uint64(len(packet)) q.size -= uint64(len(packet)) - util.PutBytes(packet) + pool_putBytes(packet) // save the modified stream to queues if len(worstStream.infos) > 0 { q.streams[worst] = worstStream diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index bc9de04c..7eef9a11 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -9,7 +9,6 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -241,7 +240,6 @@ func (p *peer) _handlePacket(packet []byte) { case wire_LinkProtocolTraffic: p._handleLinkTraffic(packet) default: - util.PutBytes(packet) } } @@ -347,7 +345,6 @@ func (p *peer) _handleLinkTraffic(bs []byte) { case wire_SwitchMsg: p._handleSwitchMsg(payload) default: - util.PutBytes(bs) } } diff --git a/src/yggdrasil/pool.go b/src/yggdrasil/pool.go new file mode 100644 index 00000000..e95236a5 --- /dev/null +++ b/src/yggdrasil/pool.go @@ -0,0 +1,20 @@ +package yggdrasil + +import "sync" + +// Used internally to reduce allocations in the hot loop +// I.e. packets being switched or between the crypto and the switch +// For safety reasons, these must not escape this package +var pool = sync.Pool{New: func() interface{} { return []byte(nil) }} + +func pool_getBytes(size int) []byte { + bs := pool.Get().([]byte) + if cap(bs) < size { + bs = make([]byte, size) + } + return bs[:size] +} + +func pool_putBytes(bs []byte) { + pool.Put(bs) +} diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 1be94661..71d92609 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -29,7 +29,6 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -178,14 +177,12 @@ func (r *router) _handlePacket(packet []byte) { // Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets. // Passes them to the crypto session worker to be decrypted and sent to the adapter. func (r *router) _handleTraffic(packet []byte) { - defer util.PutBytes(packet) p := wire_trafficPacket{} if !p.decode(packet) { return } sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle) if !isIn { - util.PutBytes(p.Payload) return } sinfo.recv(r, &p) @@ -231,7 +228,6 @@ func (r *router) _handleProto(packet []byte) { case wire_DHTLookupResponse: r._handleDHTRes(bs, &p.FromKey) default: - util.PutBytes(packet) } } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 223ea33f..360f2a1b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -448,12 +448,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { select { case <-sinfo.init: default: - // TODO find a better way to drop things until initialized - util.PutBytes(p.Payload) return } if !sinfo._nonceIsOK(&p.Nonce) { - util.PutBytes(p.Payload) return } k := sinfo.sharedSesKey @@ -463,11 +460,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { poolFunc := func() { bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) callback := func() { - util.PutBytes(p.Payload) if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { // Either we failed to decrypt, or the session was updated, or we // received this packet in the mean time - util.PutBytes(bs) return } sinfo._updateNonce(&p.Nonce) @@ -485,8 +480,6 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { select { case <-sinfo.init: default: - // TODO find a better way to drop things until initialized - util.PutBytes(msg.Message) return } sinfo.bytesSent += uint64(len(msg.Message)) @@ -505,14 +498,8 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { ch := make(chan func(), 1) poolFunc := func() { p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) + packet := p.encode() callback := func() { - // Encoding may block on a util.GetBytes(), so kept out of the worker pool - packet := p.encode() - // Cleanup - util.PutBytes(msg.Message) - util.PutBytes(p.Payload) - // Send the packet - // TODO replace this with a send to the peer struct if that becomes an actor sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 4ab37c29..be1398fc 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -6,8 +6,6 @@ import ( "fmt" "io" "net" - - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) // Test that this matches the interface we expect @@ -46,6 +44,9 @@ func (s *stream) writeMsgs(bss [][]byte) (int, error) { } s.outputBuffer = buf[:0] // So we can reuse the same underlying array later _, err := buf.WriteTo(s.rwc) + for _, bs := range bss { + pool_putBytes(bs) + } // TODO only include number of bytes from bs *successfully* written? return written, err } @@ -112,7 +113,7 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) { if msgLen > streamMsgSize { return nil, errors.New("oversized message") } - msg := util.ResizeBytes(util.GetBytes(), int(msgLen)) + msg := pool_getBytes(int(msgLen)) _, err = io.ReadFull(s.inputBuffer, msg) return msg, err } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 18a647d8..9746d46e 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -9,7 +9,6 @@ package yggdrasil import ( "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) const ( @@ -230,8 +229,9 @@ type wire_trafficPacket struct { } // Encodes a wire_trafficPacket into its wire format. +// The returned slice was taken from the pool. func (p *wire_trafficPacket) encode() []byte { - bs := util.GetBytes() + bs := pool_getBytes(0) bs = wire_put_uint64(wire_Traffic, bs) bs = wire_put_coords(p.Coords, bs) bs = append(bs, p.Handle[:]...) @@ -241,7 +241,9 @@ func (p *wire_trafficPacket) encode() []byte { } // Decodes an encoded wire_trafficPacket into the struct, returning true if successful. +// Either way, the argument slice is added to the pool. func (p *wire_trafficPacket) decode(bs []byte) bool { + defer pool_putBytes(bs) var pType uint64 switch { case !wire_chop_uint64(&pType, &bs): @@ -255,7 +257,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { case !wire_chop_slice(p.Nonce[:], &bs): return false } - p.Payload = append(util.GetBytes(), bs...) + p.Payload = append(p.Payload, bs...) return true }