Merge pull request #682 from Arceliar/bytes

remove util.GetBytes/util.PutBytes and use local sync.Pools instead
This commit is contained in:
Arceliar 2020-05-02 10:39:41 -05:00 committed by GitHub
commit 8b888305e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 53 additions and 96 deletions

View File

@ -17,12 +17,11 @@ import (
"crypto/rand" "crypto/rand"
"crypto/sha512" "crypto/sha512"
"encoding/hex" "encoding/hex"
"sync"
"golang.org/x/crypto/curve25519" "golang.org/x/crypto/curve25519"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"golang.org/x/crypto/nacl/box" "golang.org/x/crypto/nacl/box"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -225,29 +224,36 @@ func GetSharedKey(myPrivKey *BoxPrivKey,
return (*BoxSharedKey)(&shared) return (*BoxSharedKey)(&shared)
} }
// pool is used internally by BoxOpen and BoxSeal to avoid allocating temporary space
var pool = sync.Pool{New: func() interface{} { return []byte(nil) }}
// BoxOpen returns a message and true if it successfully opens a crypto box using the provided shared key and nonce. // BoxOpen returns a message and true if it successfully opens a crypto box using the provided shared key and nonce.
// The boxed input slice's backing array is reused for the unboxed output when possible.
func BoxOpen(shared *BoxSharedKey, func BoxOpen(shared *BoxSharedKey,
boxed []byte, boxed []byte,
nonce *BoxNonce) ([]byte, bool) { nonce *BoxNonce) ([]byte, bool) {
out := util.GetBytes()
s := (*[BoxSharedKeyLen]byte)(shared) s := (*[BoxSharedKeyLen]byte)(shared)
n := (*[BoxNonceLen]byte)(nonce) n := (*[BoxNonceLen]byte)(nonce)
unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s) temp := append(pool.Get().([]byte), boxed...)
unboxed, success := box.OpenAfterPrecomputation(boxed[:0], temp, n, s)
pool.Put(temp[:0])
return unboxed, success return unboxed, success
} }
// BoxSeal seals a crypto box using the provided shared key, returning the box and the nonce needed to decrypt it. // BoxSeal seals a crypto box using the provided shared key, returning the box and the nonce needed to decrypt it.
// If nonce is nil, a random BoxNonce will be used and returned. // If nonce is nil, a random BoxNonce will be used and returned.
// If nonce is non-nil, then nonce.Increment() will be called before using it, and the incremented BoxNonce is what is returned. // If nonce is non-nil, then nonce.Increment() will be called before using it, and the incremented BoxNonce is what is returned.
// The unboxed input slice's backing array is reused for the boxed output when possible.
func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *BoxNonce) { func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *BoxNonce) {
if nonce == nil { if nonce == nil {
nonce = NewBoxNonce() nonce = NewBoxNonce()
} }
nonce.Increment() nonce.Increment()
out := util.GetBytes()
s := (*[BoxSharedKeyLen]byte)(shared) s := (*[BoxSharedKeyLen]byte)(shared)
n := (*[BoxNonceLen]byte)(nonce) n := (*[BoxNonceLen]byte)(nonce)
boxed := box.SealAfterPrecomputation(out, unboxed, n, s) temp := append(pool.Get().([]byte), unboxed...)
boxed := box.SealAfterPrecomputation(unboxed[:0], temp, n, s)
pool.Put(temp[:0])
return boxed, nonce return boxed, nonce
} }

View File

@ -44,13 +44,11 @@ func (s *tunConn) _read(bs []byte) (err error) {
select { select {
case <-s.stop: case <-s.stop:
err = errors.New("session was already closed") err = errors.New("session was already closed")
util.PutBytes(bs)
return return
default: default:
} }
if len(bs) == 0 { if len(bs) == 0 {
err = errors.New("read packet with 0 size") err = errors.New("read packet with 0 size")
util.PutBytes(bs)
return return
} }
ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40 ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
@ -107,7 +105,6 @@ func (s *tunConn) _read(bs []byte) (err error) {
} }
if skip { if skip {
err = errors.New("address not allowed") err = errors.New("address not allowed")
util.PutBytes(bs)
return return
} }
s.tun.writer.writeFrom(s, bs) s.tun.writer.writeFrom(s, bs)
@ -125,7 +122,6 @@ func (s *tunConn) _write(bs []byte) (err error) {
select { select {
case <-s.stop: case <-s.stop:
err = errors.New("session was already closed") err = errors.New("session was already closed")
util.PutBytes(bs)
return return
default: default:
} }
@ -183,7 +179,6 @@ func (s *tunConn) _write(bs []byte) (err error) {
} }
if skip { if skip {
err = errors.New("address not allowed") err = errors.New("address not allowed")
util.PutBytes(bs)
return return
} }
msg := yggdrasil.FlowKeyMessage{ msg := yggdrasil.FlowKeyMessage{

View File

@ -3,7 +3,6 @@ package tuntap
import ( import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
@ -14,6 +13,7 @@ const TUN_OFFSET_BYTES = 4
type tunWriter struct { type tunWriter struct {
phony.Inbox phony.Inbox
tun *TunAdapter tun *TunAdapter
buf [TUN_OFFSET_BYTES + 65536]byte
} }
func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { func (w *tunWriter) writeFrom(from phony.Actor, b []byte) {
@ -25,15 +25,13 @@ func (w *tunWriter) writeFrom(from phony.Actor, b []byte) {
// write is pretty loose with the memory safety rules, e.g. it assumes it can // write is pretty loose with the memory safety rules, e.g. it assumes it can
// read w.tun.iface.IsTap() safely // read w.tun.iface.IsTap() safely
func (w *tunWriter) _write(b []byte) { func (w *tunWriter) _write(b []byte) {
defer util.PutBytes(b)
var written int var written int
var err error var err error
n := len(b) n := len(b)
if n == 0 { if n == 0 {
return return
} }
temp := append(util.ResizeBytes(util.GetBytes(), TUN_OFFSET_BYTES), b...) temp := append(w.buf[:TUN_OFFSET_BYTES], b...)
defer util.PutBytes(temp)
written, err = w.tun.iface.Write(temp, TUN_OFFSET_BYTES) written, err = w.tun.iface.Write(temp, TUN_OFFSET_BYTES)
if err != nil { if err != nil {
w.tun.Act(w, func() { w.tun.Act(w, func() {
@ -51,22 +49,23 @@ func (w *tunWriter) _write(b []byte) {
type tunReader struct { type tunReader struct {
phony.Inbox phony.Inbox
tun *TunAdapter tun *TunAdapter
buf [TUN_OFFSET_BYTES + 65536]byte
} }
func (r *tunReader) _read() { func (r *tunReader) _read() {
// Get a slice to store the packet in // Get a slice to store the packet in
recvd := util.ResizeBytes(util.GetBytes(), int(r.tun.mtu)+TUN_OFFSET_BYTES)
// Wait for a packet to be delivered to us through the TUN adapter // Wait for a packet to be delivered to us through the TUN adapter
n, err := r.tun.iface.Read(recvd, TUN_OFFSET_BYTES) n, err := r.tun.iface.Read(r.buf[:], TUN_OFFSET_BYTES)
if n <= TUN_OFFSET_BYTES || err != nil { if n <= TUN_OFFSET_BYTES || err != nil {
r.tun.log.Errorln("Error reading TUN:", err) r.tun.log.Errorln("Error reading TUN:", err)
ferr := r.tun.iface.Flush() ferr := r.tun.iface.Flush()
if ferr != nil { if ferr != nil {
r.tun.log.Errorln("Unable to flush packets:", ferr) r.tun.log.Errorln("Unable to flush packets:", ferr)
} }
util.PutBytes(recvd)
} else { } else {
r.tun.handlePacketFrom(r, recvd[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES], err) bs := make([]byte, n, n+crypto.BoxOverhead) // extra capacity for later...
copy(bs, r.buf[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES])
r.tun.handlePacketFrom(r, bs, err)
} }
if err == nil { if err == nil {
// Now read again // Now read again
@ -175,7 +174,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
_, known := tun.dials[dstString] _, known := tun.dials[dstString]
tun.dials[dstString] = append(tun.dials[dstString], bs) tun.dials[dstString] = append(tun.dials[dstString], bs)
for len(tun.dials[dstString]) > 32 { for len(tun.dials[dstString]) > 32 {
util.PutBytes(tun.dials[dstString][0])
tun.dials[dstString] = tun.dials[dstString][1:] tun.dials[dstString] = tun.dials[dstString][1:]
} }
if !known { if !known {

View File

@ -1,21 +0,0 @@
//+build mobile
package util
import "runtime/debug"
func init() {
debug.SetGCPercent(25)
}
// GetBytes always returns a nil slice on mobile platforms.
func GetBytes() []byte {
return nil
}
// PutBytes does literally nothing on mobile platforms.
// This is done rather than keeping a free list of bytes on platforms with memory constraints.
// It's needed to help keep memory usage low enough to fall under the limits set for e.g. iOS NEPacketTunnelProvider apps.
func PutBytes(bs []byte) {
return
}

View File

@ -1,18 +0,0 @@
//+build !mobile
package util
import "sync"
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
var byteStore = sync.Pool{New: func() interface{} { return []byte(nil) }}
// GetBytes returns a 0-length (possibly nil) slice of bytes from a free list, so it may have a larger capacity.
func GetBytes() []byte {
return byteStore.Get().([]byte)[:0]
}
// PutBytes stores a slice in a free list, where it can potentially be reused to prevent future allocations.
func PutBytes(bs []byte) {
byteStore.Put(bs)
}

View File

@ -252,7 +252,6 @@ func (c *Conn) Read(b []byte) (int, error) {
} }
// Copy results to the output slice and clean up // Copy results to the output slice and clean up
copy(b, bs) copy(b, bs)
util.PutBytes(bs)
// Return the number of bytes copied to the slice, along with any error // Return the number of bytes copied to the slice, along with any error
return n, err return n, err
} }
@ -323,10 +322,11 @@ func (c *Conn) writeNoCopy(msg FlowKeyMessage) error {
// returned. // returned.
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
written := len(b) written := len(b)
msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)} bs := make([]byte, 0, len(b)+crypto.BoxOverhead)
bs = append(bs, b...)
msg := FlowKeyMessage{Message: bs}
err := c.writeNoCopy(msg) err := c.writeNoCopy(msg)
if err != nil { if err != nil {
util.PutBytes(msg.Message)
written = 0 written = 0
} }
return written, err return written, err

View File

@ -406,10 +406,6 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size, isLinkTraffic)
w.intf.msgIO.writeMsgs(bss) w.intf.msgIO.writeMsgs(bss)
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(size, isLinkTraffic)
// Cleanup
for _, bs := range bss {
util.PutBytes(bs)
}
}) })
} }

View File

@ -2,8 +2,6 @@ package yggdrasil
import ( import (
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
// TODO take max size from config // TODO take max size from config
@ -59,7 +57,7 @@ func (q *packetQueue) cleanup() {
worstStream.infos = worstStream.infos[1:] worstStream.infos = worstStream.infos[1:]
worstStream.size -= uint64(len(packet)) worstStream.size -= uint64(len(packet))
q.size -= uint64(len(packet)) q.size -= uint64(len(packet))
util.PutBytes(packet) pool_putBytes(packet)
// save the modified stream to queues // save the modified stream to queues
if len(worstStream.infos) > 0 { if len(worstStream.infos) > 0 {
q.streams[worst] = worstStream q.streams[worst] = worstStream

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -241,7 +240,6 @@ func (p *peer) _handlePacket(packet []byte) {
case wire_LinkProtocolTraffic: case wire_LinkProtocolTraffic:
p._handleLinkTraffic(packet) p._handleLinkTraffic(packet)
default: default:
util.PutBytes(packet)
} }
} }
@ -347,7 +345,6 @@ func (p *peer) _handleLinkTraffic(bs []byte) {
case wire_SwitchMsg: case wire_SwitchMsg:
p._handleSwitchMsg(payload) p._handleSwitchMsg(payload)
default: default:
util.PutBytes(bs)
} }
} }

20
src/yggdrasil/pool.go Normal file
View File

@ -0,0 +1,20 @@
package yggdrasil
import "sync"
// Used internally to reduce allocations in the hot loop
// I.e. packets being switched or between the crypto and the switch
// For safety reasons, these must not escape this package
var pool = sync.Pool{New: func() interface{} { return []byte(nil) }}
func pool_getBytes(size int) []byte {
bs := pool.Get().([]byte)
if cap(bs) < size {
bs = make([]byte, size)
}
return bs[:size]
}
func pool_putBytes(bs []byte) {
pool.Put(bs)
}

View File

@ -29,7 +29,6 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -178,14 +177,12 @@ func (r *router) _handlePacket(packet []byte) {
// Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets. // Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets.
// Passes them to the crypto session worker to be decrypted and sent to the adapter. // Passes them to the crypto session worker to be decrypted and sent to the adapter.
func (r *router) _handleTraffic(packet []byte) { func (r *router) _handleTraffic(packet []byte) {
defer util.PutBytes(packet)
p := wire_trafficPacket{} p := wire_trafficPacket{}
if !p.decode(packet) { if !p.decode(packet) {
return return
} }
sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle) sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle)
if !isIn { if !isIn {
util.PutBytes(p.Payload)
return return
} }
sinfo.recv(r, &p) sinfo.recv(r, &p)
@ -231,7 +228,6 @@ func (r *router) _handleProto(packet []byte) {
case wire_DHTLookupResponse: case wire_DHTLookupResponse:
r._handleDHTRes(bs, &p.FromKey) r._handleDHTRes(bs, &p.FromKey)
default: default:
util.PutBytes(packet)
} }
} }

View File

@ -448,12 +448,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
select { select {
case <-sinfo.init: case <-sinfo.init:
default: default:
// TODO find a better way to drop things until initialized
util.PutBytes(p.Payload)
return return
} }
if !sinfo._nonceIsOK(&p.Nonce) { if !sinfo._nonceIsOK(&p.Nonce) {
util.PutBytes(p.Payload)
return return
} }
k := sinfo.sharedSesKey k := sinfo.sharedSesKey
@ -463,11 +460,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
poolFunc := func() { poolFunc := func() {
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
callback := func() { callback := func() {
util.PutBytes(p.Payload)
if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
// Either we failed to decrypt, or the session was updated, or we // Either we failed to decrypt, or the session was updated, or we
// received this packet in the mean time // received this packet in the mean time
util.PutBytes(bs)
return return
} }
sinfo._updateNonce(&p.Nonce) sinfo._updateNonce(&p.Nonce)
@ -485,8 +480,6 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
select { select {
case <-sinfo.init: case <-sinfo.init:
default: default:
// TODO find a better way to drop things until initialized
util.PutBytes(msg.Message)
return return
} }
sinfo.bytesSent += uint64(len(msg.Message)) sinfo.bytesSent += uint64(len(msg.Message))
@ -505,14 +498,8 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
ch := make(chan func(), 1) ch := make(chan func(), 1)
poolFunc := func() { poolFunc := func() {
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode() packet := p.encode()
// Cleanup callback := func() {
util.PutBytes(msg.Message)
util.PutBytes(p.Payload)
// Send the packet
// TODO replace this with a send to the peer struct if that becomes an actor
sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.Act(sinfo, func() {
sinfo.sessions.router.out(packet) sinfo.sessions.router.out(packet)
}) })

View File

@ -6,8 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
// Test that this matches the interface we expect // Test that this matches the interface we expect
@ -46,6 +44,9 @@ func (s *stream) writeMsgs(bss [][]byte) (int, error) {
} }
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
_, err := buf.WriteTo(s.rwc) _, err := buf.WriteTo(s.rwc)
for _, bs := range bss {
pool_putBytes(bs)
}
// TODO only include number of bytes from bs *successfully* written? // TODO only include number of bytes from bs *successfully* written?
return written, err return written, err
} }
@ -112,7 +113,7 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) {
if msgLen > streamMsgSize { if msgLen > streamMsgSize {
return nil, errors.New("oversized message") return nil, errors.New("oversized message")
} }
msg := util.ResizeBytes(util.GetBytes(), int(msgLen)) msg := pool_getBytes(int(msgLen))
_, err = io.ReadFull(s.inputBuffer, msg) _, err = io.ReadFull(s.inputBuffer, msg)
return msg, err return msg, err
} }

View File

@ -9,7 +9,6 @@ package yggdrasil
import ( import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
const ( const (
@ -230,8 +229,9 @@ type wire_trafficPacket struct {
} }
// Encodes a wire_trafficPacket into its wire format. // Encodes a wire_trafficPacket into its wire format.
// The returned slice was taken from the pool.
func (p *wire_trafficPacket) encode() []byte { func (p *wire_trafficPacket) encode() []byte {
bs := util.GetBytes() bs := pool_getBytes(0)
bs = wire_put_uint64(wire_Traffic, bs) bs = wire_put_uint64(wire_Traffic, bs)
bs = wire_put_coords(p.Coords, bs) bs = wire_put_coords(p.Coords, bs)
bs = append(bs, p.Handle[:]...) bs = append(bs, p.Handle[:]...)
@ -241,7 +241,9 @@ func (p *wire_trafficPacket) encode() []byte {
} }
// Decodes an encoded wire_trafficPacket into the struct, returning true if successful. // Decodes an encoded wire_trafficPacket into the struct, returning true if successful.
// Either way, the argument slice is added to the pool.
func (p *wire_trafficPacket) decode(bs []byte) bool { func (p *wire_trafficPacket) decode(bs []byte) bool {
defer pool_putBytes(bs)
var pType uint64 var pType uint64
switch { switch {
case !wire_chop_uint64(&pType, &bs): case !wire_chop_uint64(&pType, &bs):
@ -255,7 +257,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
case !wire_chop_slice(p.Nonce[:], &bs): case !wire_chop_slice(p.Nonce[:], &bs):
return false return false
} }
p.Payload = append(util.GetBytes(), bs...) p.Payload = append(p.Payload, bs...)
return true return true
} }