From e0a3055c2ff306463f515b0d7c3a8bafb2e09113 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 27 Jul 2019 18:10:32 -0500 Subject: [PATCH 1/5] get rid of session workers, new util.PutBytes/GetBytes logic --- go.sum | 5 ++ src/util/cancellation.go | 38 ++++++-------- src/util/util.go | 31 ++++++------ src/yggdrasil/api.go | 2 +- src/yggdrasil/conn.go | 104 +++++++++++++++++++++------------------ src/yggdrasil/session.go | 29 +++-------- 6 files changed, 100 insertions(+), 109 deletions(-) diff --git a/go.sum b/go.sum index 059d8459..050a3e5e 100644 --- a/go.sum +++ b/go.sum @@ -18,25 +18,30 @@ github.com/yggdrasil-network/water v0.0.0-20190719211521-a76871ea954b/go.mod h1: github.com/yggdrasil-network/water v0.0.0-20190719213007-b160316e362e/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190720101301-5db94379a5eb/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190720145626-28ccb9101d55/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= +github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a h1:mQ0mPD+dyB/vaDPyVkCBiXUQu9Or7/cRSTjPlV8tXvw= github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20181207154023-610586996380 h1:zPQexyRtNYBc7bcHmehl1dH6TB3qn8zytv8cBGLDNY0= golang.org/x/net v0.0.0-20181207154023-610586996380/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= diff --git a/src/util/cancellation.go b/src/util/cancellation.go index 2a78c19d..af4721bb 100644 --- a/src/util/cancellation.go +++ b/src/util/cancellation.go @@ -13,33 +13,25 @@ type Cancellation interface { Error() error } +var CancellationFinalized = errors.New("finalizer called") +var CancellationTimeoutError = errors.New("timeout") + func CancellationFinalizer(c Cancellation) { - c.Cancel(errors.New("finalizer called")) + c.Cancel(CancellationFinalized) } type cancellation struct { - signal chan error cancel chan struct{} - errMtx sync.RWMutex + mutex sync.RWMutex err error -} - -func (c *cancellation) worker() { - // Launch this in a separate goroutine when creating a cancellation - err := <-c.signal - c.errMtx.Lock() - c.err = err - c.errMtx.Unlock() - close(c.cancel) + done bool } func NewCancellation() Cancellation { c := cancellation{ - signal: make(chan error), cancel: make(chan struct{}), } runtime.SetFinalizer(&c, CancellationFinalizer) - go c.worker() return &c } @@ -48,18 +40,22 @@ func (c *cancellation) Finished() <-chan struct{} { } func (c *cancellation) Cancel(err error) error { - select { - case c.signal <- err: + c.mutex.Lock() + defer c.mutex.Unlock() + if c.done { + return c.err + } else { + c.err = err + c.done = true + close(c.cancel) return nil - case <-c.cancel: - return c.Error() } } func (c *cancellation) Error() error { - c.errMtx.RLock() + c.mutex.RLock() err := c.err - c.errMtx.RUnlock() + c.mutex.RUnlock() return err } @@ -75,8 +71,6 @@ func CancellationChild(parent Cancellation) Cancellation { return child } -var CancellationTimeoutError = errors.New("timeout") - func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation { child := CancellationChild(parent) go func() { diff --git a/src/util/util.go b/src/util/util.go index 94bd5d6a..4596474e 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "sync" import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. @@ -21,29 +22,27 @@ func UnlockThread() { } // This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops. -// It's used like a sync.Pool, but with a fixed size and typechecked without type casts to/from interface{} (which were making the profiles look ugly). -var byteStore chan []byte +var byteStoreMutex sync.Mutex +var byteStore [][]byte -func init() { - byteStore = make(chan []byte, 32) -} - -// Gets an empty slice from the byte store, if one is available, or else returns a new nil slice. +// Gets an empty slice from the byte store. func GetBytes() []byte { - select { - case bs := <-byteStore: - return bs[:0] - default: + byteStoreMutex.Lock() + defer byteStoreMutex.Unlock() + if len(byteStore) > 0 { + var bs []byte + bs, byteStore = byteStore[len(byteStore)-1][:0], byteStore[:len(byteStore)-1] + return bs + } else { return nil } } -// Puts a slice in the store, if there's room, or else returns and lets the slice get collected. +// Puts a slice in the store. func PutBytes(bs []byte) { - select { - case byteStore <- bs: - default: - } + byteStoreMutex.Lock() + defer byteStoreMutex.Unlock() + byteStore = append(byteStore, bs) } // This is a workaround to go's broken timer implementation diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 1bec9836..2bad5a0c 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -230,7 +230,7 @@ func (c *Core) GetSessions() []Session { skip = true } }() - sinfo.doWorker(workerFunc) + sinfo.doFunc(workerFunc) }() if skip { continue diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index bc884fb3..ee9cbb30 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -145,9 +145,9 @@ func (c *Conn) Read(b []byte) (int, error) { } defer util.PutBytes(p.Payload) var err error - done := make(chan struct{}) + //done := make(chan struct{}) workerFunc := func() { - defer close(done) + //defer close(done) // If the nonce is bad then drop the packet and return an error if !sinfo.nonceIsOK(&p.Nonce) { err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} @@ -167,33 +167,36 @@ func (c *Conn) Read(b []byte) (int, error) { sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) } - // Hand over to the session worker - defer func() { - if recover() != nil { - err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0} - close(done) + sinfo.doFunc(workerFunc) + /* + // Hand over to the session worker + defer func() { + if recover() != nil { + err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0} + close(done) + } + }() // In case we're racing with a close + // Send to worker + select { + case sinfo.worker <- workerFunc: + case <-cancel.Finished(): + if cancel.Error() == util.CancellationTimeoutError { + return 0, ConnError{errors.New("read timeout"), true, false, false, 0} + } else { + return 0, ConnError{errors.New("session closed"), false, false, true, 0} + } } - }() // In case we're racing with a close - // Send to worker - select { - case sinfo.worker <- workerFunc: - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("read timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} + // Wait for the worker to finish + select { + case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) + case <-cancel.Finished(): + if cancel.Error() == util.CancellationTimeoutError { + return 0, ConnError{errors.New("read timeout"), true, false, false, 0} + } else { + return 0, ConnError{errors.New("session closed"), false, false, true, 0} + } } - } - // Wait for the worker to finish - select { - case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("read timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } - } + */ // Something went wrong in the session worker so abort if err != nil { if ce, ok := err.(*ConnError); ok && ce.Temporary() { @@ -214,10 +217,10 @@ func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Write(b []byte) (bytesWritten int, err error) { sinfo := c.session var packet []byte - done := make(chan struct{}) + //done := make(chan struct{}) written := len(b) workerFunc := func() { - defer close(done) + //defer close(done) // Does the packet exceed the permitted size for the session? if uint16(len(b)) > sinfo.getMTU() { written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} @@ -264,27 +267,30 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { default: // Don't do anything, to keep traffic throttled } } - // Set up a timer so this doesn't block forever - cancel := c.getDeadlineCancellation(&c.writeDeadline) - defer cancel.Cancel(nil) - // Hand over to the session worker - defer func() { - if recover() != nil { - err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0} - close(done) + sinfo.doFunc(workerFunc) + /* + // Set up a timer so this doesn't block forever + cancel := c.getDeadlineCancellation(&c.writeDeadline) + defer cancel.Cancel(nil) + // Hand over to the session worker + defer func() { + if recover() != nil { + err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0} + close(done) + } + }() // In case we're racing with a close + select { // Send to worker + case sinfo.worker <- workerFunc: + case <-cancel.Finished(): + if cancel.Error() == util.CancellationTimeoutError { + return 0, ConnError{errors.New("write timeout"), true, false, false, 0} + } else { + return 0, ConnError{errors.New("session closed"), false, false, true, 0} + } } - }() // In case we're racing with a close - select { // Send to worker - case sinfo.worker <- workerFunc: - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("write timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } - } - // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) - <-done + // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) + <-done + */ // Give the packet to the router if written > 0 { sinfo.core.router.out(packet) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 53836c38..eca3bb00 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -16,6 +16,7 @@ import ( // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { + mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session core *Core // reconfigure chan chan error // theirAddr address.Address // @@ -43,24 +44,14 @@ type sessionInfo struct { tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation bytesSent uint64 // Bytes of real traffic sent in this session bytesRecvd uint64 // Bytes of real traffic received in this session - worker chan func() // Channel to send work to the session worker recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use } -func (sinfo *sessionInfo) doWorker(f func()) { - done := make(chan struct{}) - sinfo.worker <- func() { - f() - close(done) - } - <-done -} - -func (sinfo *sessionInfo) workerMain() { - for f := range sinfo.worker { - f() - } +func (sinfo *sessionInfo) doFunc(f func()) { + sinfo.mutex.Lock() + defer sinfo.mutex.Unlock() + f() } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -231,11 +222,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.worker = make(chan func(), 1) sinfo.recv = make(chan *wire_trafficPacket, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle - go sinfo.workerMain() return &sinfo } @@ -267,14 +256,12 @@ func (ss *sessions) cleanup() { ss.lastCleanup = time.Now() } -// Closes a session, removing it from sessions maps and killing the worker goroutine. +// Closes a session, removing it from sessions maps. func (sinfo *sessionInfo) close() { if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo { delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) } - defer func() { recover() }() - close(sinfo.worker) } // Returns a session ping appropriate for the given session info. @@ -372,7 +359,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { } ss.listenerMutex.Unlock() } - sinfo.doWorker(func() { + sinfo.doFunc(func() { // Update the session if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ return @@ -426,7 +413,7 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.doWorker(func() { + sinfo.doFunc(func() { sinfo.reset = true }) } From 9e118884d48d924b396c81943a2ef3f9aafe8891 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 27 Jul 2019 18:12:06 -0500 Subject: [PATCH 2/5] remove some commented code --- src/yggdrasil/conn.go | 56 ------------------------------------------- 1 file changed, 56 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index ee9cbb30..e04108f6 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -145,9 +145,7 @@ func (c *Conn) Read(b []byte) (int, error) { } defer util.PutBytes(p.Payload) var err error - //done := make(chan struct{}) workerFunc := func() { - //defer close(done) // If the nonce is bad then drop the packet and return an error if !sinfo.nonceIsOK(&p.Nonce) { err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} @@ -168,35 +166,6 @@ func (c *Conn) Read(b []byte) (int, error) { sinfo.bytesRecvd += uint64(len(bs)) } sinfo.doFunc(workerFunc) - /* - // Hand over to the session worker - defer func() { - if recover() != nil { - err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0} - close(done) - } - }() // In case we're racing with a close - // Send to worker - select { - case sinfo.worker <- workerFunc: - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("read timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } - } - // Wait for the worker to finish - select { - case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("read timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } - } - */ // Something went wrong in the session worker so abort if err != nil { if ce, ok := err.(*ConnError); ok && ce.Temporary() { @@ -217,10 +186,8 @@ func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Write(b []byte) (bytesWritten int, err error) { sinfo := c.session var packet []byte - //done := make(chan struct{}) written := len(b) workerFunc := func() { - //defer close(done) // Does the packet exceed the permitted size for the session? if uint16(len(b)) > sinfo.getMTU() { written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} @@ -268,29 +235,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } } sinfo.doFunc(workerFunc) - /* - // Set up a timer so this doesn't block forever - cancel := c.getDeadlineCancellation(&c.writeDeadline) - defer cancel.Cancel(nil) - // Hand over to the session worker - defer func() { - if recover() != nil { - err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0} - close(done) - } - }() // In case we're racing with a close - select { // Send to worker - case sinfo.worker <- workerFunc: - case <-cancel.Finished(): - if cancel.Error() == util.CancellationTimeoutError { - return 0, ConnError{errors.New("write timeout"), true, false, false, 0} - } else { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } - } - // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) - <-done - */ // Give the packet to the router if written > 0 { sinfo.core.router.out(packet) From b66bea813b05e49c441593fb4c57e23b526d7b01 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 27 Jul 2019 18:23:55 -0500 Subject: [PATCH 3/5] rename a couple of things and move a PutBytes so it happens sooner --- src/yggdrasil/conn.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index e04108f6..30519f6f 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -143,9 +143,9 @@ func (c *Conn) Read(b []byte) (int, error) { if !ok { return 0, ConnError{errors.New("session closed"), false, false, true, 0} } - defer util.PutBytes(p.Payload) var err error - workerFunc := func() { + sessionFunc := func() { + defer util.PutBytes(p.Payload) // If the nonce is bad then drop the packet and return an error if !sinfo.nonceIsOK(&p.Nonce) { err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} @@ -165,7 +165,7 @@ func (c *Conn) Read(b []byte) (int, error) { sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) } - sinfo.doFunc(workerFunc) + sinfo.doFunc(sessionFunc) // Something went wrong in the session worker so abort if err != nil { if ce, ok := err.(*ConnError); ok && ce.Temporary() { @@ -187,7 +187,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { sinfo := c.session var packet []byte written := len(b) - workerFunc := func() { + sessionFunc := func() { // Does the packet exceed the permitted size for the session? if uint16(len(b)) > sinfo.getMTU() { written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} @@ -234,7 +234,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { default: // Don't do anything, to keep traffic throttled } } - sinfo.doFunc(workerFunc) + sinfo.doFunc(sessionFunc) // Give the packet to the router if written > 0 { sinfo.core.router.out(packet) From 38e1503b28562a04865807f2bf11dd98c0c75954 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 27 Jul 2019 20:09:43 -0500 Subject: [PATCH 4/5] split up some of the tun reader logic into a separate worker, so the main loop can be mostly just syscalls --- src/tuntap/iface.go | 313 ++++++++++++++++++++++---------------------- 1 file changed, 160 insertions(+), 153 deletions(-) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 637715d6..00cf2dfd 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -112,6 +112,164 @@ func (tun *TunAdapter) writer() error { func (tun *TunAdapter) reader() error { recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH) + toWorker := make(chan []byte, 32) + defer close(toWorker) + go func() { + for bs := range toWorker { + // If we detect an ICMP packet then hand it to the ICMPv6 module + if bs[6] == 58 { + // Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full + // Ethernet frame rather than just the IPv6 packet as this is needed for + // NDP to work correctly + if err := tun.icmpv6.ParsePacket(recvd); err == nil { + // We acted on the packet in the ICMPv6 module so don't forward or do + // anything else with it + continue + } + } + // From the IP header, work out what our source and destination addresses + // and node IDs are. We will need these in order to work out where to send + // the packet + var srcAddr address.Address + var dstAddr address.Address + var dstNodeID *crypto.NodeID + var dstNodeIDMask *crypto.NodeID + var dstSnet address.Subnet + var addrlen int + n := len(bs) + // Check the IP protocol - if it doesn't match then we drop the packet and + // do nothing with it + if bs[0]&0xf0 == 0x60 { + // Check if we have a fully-sized IPv6 header + if len(bs) < 40 { + continue + } + // Check the packet size + if n-tun_IPv6_HEADER_LENGTH != 256*int(bs[4])+int(bs[5]) { + continue + } + // IPv6 address + addrlen = 16 + copy(srcAddr[:addrlen], bs[8:]) + copy(dstAddr[:addrlen], bs[24:]) + copy(dstSnet[:addrlen/2], bs[24:]) + } else if bs[0]&0xf0 == 0x40 { + // Check if we have a fully-sized IPv4 header + if len(bs) < 20 { + continue + } + // Check the packet size + if n != 256*int(bs[2])+int(bs[3]) { + continue + } + // IPv4 address + addrlen = 4 + copy(srcAddr[:addrlen], bs[12:]) + copy(dstAddr[:addrlen], bs[16:]) + } else { + // Unknown address length or protocol, so drop the packet and ignore it + tun.log.Traceln("Unknown packet type, dropping") + continue + } + if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) { + // The packet had a source address that doesn't belong to us or our + // configured crypto-key routing source subnets + continue + } + if !dstAddr.IsValid() && !dstSnet.IsValid() { + if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil { + // A public key was found, get the node ID for the search + dstNodeID = crypto.GetNodeID(&key) + // Do a quick check to ensure that the node ID refers to a vaild + // Yggdrasil address or subnet - this might be superfluous + addr := *address.AddrForNodeID(dstNodeID) + copy(dstAddr[:], addr[:]) + copy(dstSnet[:], addr[:]) + // Are we certain we looked up a valid node? + if !dstAddr.IsValid() && !dstSnet.IsValid() { + continue + } + } else { + // No public key was found in the CKR table so we've exhausted our options + continue + } + } + // Do we have an active connection for this node address? + tun.mutex.RLock() + session, isIn := tun.addrToConn[dstAddr] + if !isIn || session == nil { + session, isIn = tun.subnetToConn[dstSnet] + if !isIn || session == nil { + // Neither an address nor a subnet mapping matched, therefore populate + // the node ID and mask to commence a search + if dstAddr.IsValid() { + dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask() + } else { + dstNodeID, dstNodeIDMask = dstSnet.GetNodeIDandMask() + } + } + } + tun.mutex.RUnlock() + // If we don't have a connection then we should open one + if !isIn || session == nil { + // Check we haven't been given empty node ID, really this shouldn't ever + // happen but just to be sure... + if dstNodeID == nil || dstNodeIDMask == nil { + panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") + } + // Dial to the remote node + packet := bs + go func() { + // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes + tun.mutex.Lock() + _, known := tun.dials[*dstNodeID] + tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) + for len(tun.dials[*dstNodeID]) > 32 { + util.PutBytes(tun.dials[*dstNodeID][0]) + tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] + } + tun.mutex.Unlock() + if known { + return + } + var tc *tunConn + if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { + // We've been given a connection so prepare the session wrapper + if tc, err = tun.wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + } + } + tun.mutex.Lock() + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + tun.mutex.Unlock() + if tc != nil { + for _, packet := range packets { + select { + case tc.send <- packet: + default: + util.PutBytes(packet) + } + } + } + }() + // While the dial is going on we can't do much else + // continuing this iteration - skip to the next one + continue + } + // If we have a connection now, try writing to it + if isIn && session != nil { + packet := bs + select { + case session.send <- packet: + default: + util.PutBytes(packet) + } + } + } + }() for { // Wait for a packet to be delivered to us through the TUN/TAP adapter n, err := tun.iface.Read(recvd) @@ -137,158 +295,7 @@ func (tun *TunAdapter) reader() error { } // Offset the buffer from now on so that we can ignore ethernet frames if // they are present - bs := recvd[offset : offset+n] - n -= offset - // If we detect an ICMP packet then hand it to the ICMPv6 module - if bs[6] == 58 { - // Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full - // Ethernet frame rather than just the IPv6 packet as this is needed for - // NDP to work correctly - if err := tun.icmpv6.ParsePacket(recvd); err == nil { - // We acted on the packet in the ICMPv6 module so don't forward or do - // anything else with it - continue - } - } - // From the IP header, work out what our source and destination addresses - // and node IDs are. We will need these in order to work out where to send - // the packet - var srcAddr address.Address - var dstAddr address.Address - var dstNodeID *crypto.NodeID - var dstNodeIDMask *crypto.NodeID - var dstSnet address.Subnet - var addrlen int - // Check the IP protocol - if it doesn't match then we drop the packet and - // do nothing with it - if bs[0]&0xf0 == 0x60 { - // Check if we have a fully-sized IPv6 header - if len(bs) < 40 { - continue - } - // Check the packet size - if n-tun_IPv6_HEADER_LENGTH != 256*int(bs[4])+int(bs[5]) { - continue - } - // IPv6 address - addrlen = 16 - copy(srcAddr[:addrlen], bs[8:]) - copy(dstAddr[:addrlen], bs[24:]) - copy(dstSnet[:addrlen/2], bs[24:]) - } else if bs[0]&0xf0 == 0x40 { - // Check if we have a fully-sized IPv4 header - if len(bs) < 20 { - continue - } - // Check the packet size - if n != 256*int(bs[2])+int(bs[3]) { - continue - } - // IPv4 address - addrlen = 4 - copy(srcAddr[:addrlen], bs[12:]) - copy(dstAddr[:addrlen], bs[16:]) - } else { - // Unknown address length or protocol, so drop the packet and ignore it - tun.log.Traceln("Unknown packet type, dropping") - continue - } - if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) { - // The packet had a source address that doesn't belong to us or our - // configured crypto-key routing source subnets - continue - } - if !dstAddr.IsValid() && !dstSnet.IsValid() { - if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil { - // A public key was found, get the node ID for the search - dstNodeID = crypto.GetNodeID(&key) - // Do a quick check to ensure that the node ID refers to a vaild - // Yggdrasil address or subnet - this might be superfluous - addr := *address.AddrForNodeID(dstNodeID) - copy(dstAddr[:], addr[:]) - copy(dstSnet[:], addr[:]) - // Are we certain we looked up a valid node? - if !dstAddr.IsValid() && !dstSnet.IsValid() { - continue - } - } else { - // No public key was found in the CKR table so we've exhausted our options - continue - } - } - // Do we have an active connection for this node address? - tun.mutex.RLock() - session, isIn := tun.addrToConn[dstAddr] - if !isIn || session == nil { - session, isIn = tun.subnetToConn[dstSnet] - if !isIn || session == nil { - // Neither an address nor a subnet mapping matched, therefore populate - // the node ID and mask to commence a search - if dstAddr.IsValid() { - dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask() - } else { - dstNodeID, dstNodeIDMask = dstSnet.GetNodeIDandMask() - } - } - } - tun.mutex.RUnlock() - // If we don't have a connection then we should open one - if !isIn || session == nil { - // Check we haven't been given empty node ID, really this shouldn't ever - // happen but just to be sure... - if dstNodeID == nil || dstNodeIDMask == nil { - panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") - } - // Dial to the remote node - packet := append(util.GetBytes(), bs[:n]...) - go func() { - // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes - tun.mutex.Lock() - _, known := tun.dials[*dstNodeID] - tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) - for len(tun.dials[*dstNodeID]) > 32 { - util.PutBytes(tun.dials[*dstNodeID][0]) - tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] - } - tun.mutex.Unlock() - if known { - return - } - var tc *tunConn - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - // We've been given a connection so prepare the session wrapper - if tc, err = tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) - } - } - tun.mutex.Lock() - packets := tun.dials[*dstNodeID] - delete(tun.dials, *dstNodeID) - tun.mutex.Unlock() - if tc != nil { - for _, packet := range packets { - select { - case tc.send <- packet: - default: - util.PutBytes(packet) - } - } - } - }() - // While the dial is going on we can't do much else - // continuing this iteration - skip to the next one - continue - } - // If we have a connection now, try writing to it - if isIn && session != nil { - packet := append(util.GetBytes(), bs[:n]...) - select { - case session.send <- packet: - default: - util.PutBytes(packet) - } - } + bs := append(util.GetBytes(), recvd[offset:offset+n]...) + toWorker <- bs } } From 406e143f7ff8cafb36d19daf5a155487f83b71cb Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 28 Jul 2019 23:33:04 -0500 Subject: [PATCH 5/5] move some logic from TunAdapter.reader into a new function, TunAdapter.readerPacketHandler --- src/tuntap/iface.go | 351 ++++++++++++++++++++++---------------------- 1 file changed, 178 insertions(+), 173 deletions(-) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index 00cf2dfd..a95dfae4 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -110,178 +110,10 @@ func (tun *TunAdapter) writer() error { } } -func (tun *TunAdapter) reader() error { - recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH) - toWorker := make(chan []byte, 32) - defer close(toWorker) - go func() { - for bs := range toWorker { - // If we detect an ICMP packet then hand it to the ICMPv6 module - if bs[6] == 58 { - // Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full - // Ethernet frame rather than just the IPv6 packet as this is needed for - // NDP to work correctly - if err := tun.icmpv6.ParsePacket(recvd); err == nil { - // We acted on the packet in the ICMPv6 module so don't forward or do - // anything else with it - continue - } - } - // From the IP header, work out what our source and destination addresses - // and node IDs are. We will need these in order to work out where to send - // the packet - var srcAddr address.Address - var dstAddr address.Address - var dstNodeID *crypto.NodeID - var dstNodeIDMask *crypto.NodeID - var dstSnet address.Subnet - var addrlen int - n := len(bs) - // Check the IP protocol - if it doesn't match then we drop the packet and - // do nothing with it - if bs[0]&0xf0 == 0x60 { - // Check if we have a fully-sized IPv6 header - if len(bs) < 40 { - continue - } - // Check the packet size - if n-tun_IPv6_HEADER_LENGTH != 256*int(bs[4])+int(bs[5]) { - continue - } - // IPv6 address - addrlen = 16 - copy(srcAddr[:addrlen], bs[8:]) - copy(dstAddr[:addrlen], bs[24:]) - copy(dstSnet[:addrlen/2], bs[24:]) - } else if bs[0]&0xf0 == 0x40 { - // Check if we have a fully-sized IPv4 header - if len(bs) < 20 { - continue - } - // Check the packet size - if n != 256*int(bs[2])+int(bs[3]) { - continue - } - // IPv4 address - addrlen = 4 - copy(srcAddr[:addrlen], bs[12:]) - copy(dstAddr[:addrlen], bs[16:]) - } else { - // Unknown address length or protocol, so drop the packet and ignore it - tun.log.Traceln("Unknown packet type, dropping") - continue - } - if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) { - // The packet had a source address that doesn't belong to us or our - // configured crypto-key routing source subnets - continue - } - if !dstAddr.IsValid() && !dstSnet.IsValid() { - if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil { - // A public key was found, get the node ID for the search - dstNodeID = crypto.GetNodeID(&key) - // Do a quick check to ensure that the node ID refers to a vaild - // Yggdrasil address or subnet - this might be superfluous - addr := *address.AddrForNodeID(dstNodeID) - copy(dstAddr[:], addr[:]) - copy(dstSnet[:], addr[:]) - // Are we certain we looked up a valid node? - if !dstAddr.IsValid() && !dstSnet.IsValid() { - continue - } - } else { - // No public key was found in the CKR table so we've exhausted our options - continue - } - } - // Do we have an active connection for this node address? - tun.mutex.RLock() - session, isIn := tun.addrToConn[dstAddr] - if !isIn || session == nil { - session, isIn = tun.subnetToConn[dstSnet] - if !isIn || session == nil { - // Neither an address nor a subnet mapping matched, therefore populate - // the node ID and mask to commence a search - if dstAddr.IsValid() { - dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask() - } else { - dstNodeID, dstNodeIDMask = dstSnet.GetNodeIDandMask() - } - } - } - tun.mutex.RUnlock() - // If we don't have a connection then we should open one - if !isIn || session == nil { - // Check we haven't been given empty node ID, really this shouldn't ever - // happen but just to be sure... - if dstNodeID == nil || dstNodeIDMask == nil { - panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") - } - // Dial to the remote node - packet := bs - go func() { - // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes - tun.mutex.Lock() - _, known := tun.dials[*dstNodeID] - tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) - for len(tun.dials[*dstNodeID]) > 32 { - util.PutBytes(tun.dials[*dstNodeID][0]) - tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] - } - tun.mutex.Unlock() - if known { - return - } - var tc *tunConn - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - // We've been given a connection so prepare the session wrapper - if tc, err = tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) - } - } - tun.mutex.Lock() - packets := tun.dials[*dstNodeID] - delete(tun.dials, *dstNodeID) - tun.mutex.Unlock() - if tc != nil { - for _, packet := range packets { - select { - case tc.send <- packet: - default: - util.PutBytes(packet) - } - } - } - }() - // While the dial is going on we can't do much else - // continuing this iteration - skip to the next one - continue - } - // If we have a connection now, try writing to it - if isIn && session != nil { - packet := bs - select { - case session.send <- packet: - default: - util.PutBytes(packet) - } - } - } - }() - for { - // Wait for a packet to be delivered to us through the TUN/TAP adapter - n, err := tun.iface.Read(recvd) - if err != nil { - if !tun.isOpen { - return err - } - panic(err) - } - if n == 0 { - continue - } +// Run in a separate goroutine by the reader +// Does all of the per-packet ICMP checks, passes packets to the right Conn worker +func (tun *TunAdapter) readerPacketHandler(ch chan []byte) { + for recvd := range ch { // If it's a TAP adapter, update the buffer slice so that we no longer // include the ethernet headers offset := 0 @@ -295,7 +127,180 @@ func (tun *TunAdapter) reader() error { } // Offset the buffer from now on so that we can ignore ethernet frames if // they are present - bs := append(util.GetBytes(), recvd[offset:offset+n]...) + bs := recvd[offset:] + // If we detect an ICMP packet then hand it to the ICMPv6 module + if bs[6] == 58 { + // Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full + // Ethernet frame rather than just the IPv6 packet as this is needed for + // NDP to work correctly + if err := tun.icmpv6.ParsePacket(recvd); err == nil { + // We acted on the packet in the ICMPv6 module so don't forward or do + // anything else with it + continue + } + } + // Shift forward to avoid leaking bytes off the front of the slide when we eventually store it + bs = append(recvd[:0], bs...) + // From the IP header, work out what our source and destination addresses + // and node IDs are. We will need these in order to work out where to send + // the packet + var srcAddr address.Address + var dstAddr address.Address + var dstNodeID *crypto.NodeID + var dstNodeIDMask *crypto.NodeID + var dstSnet address.Subnet + var addrlen int + n := len(bs) + // Check the IP protocol - if it doesn't match then we drop the packet and + // do nothing with it + if bs[0]&0xf0 == 0x60 { + // Check if we have a fully-sized IPv6 header + if len(bs) < 40 { + continue + } + // Check the packet size + if n-tun_IPv6_HEADER_LENGTH != 256*int(bs[4])+int(bs[5]) { + continue + } + // IPv6 address + addrlen = 16 + copy(srcAddr[:addrlen], bs[8:]) + copy(dstAddr[:addrlen], bs[24:]) + copy(dstSnet[:addrlen/2], bs[24:]) + } else if bs[0]&0xf0 == 0x40 { + // Check if we have a fully-sized IPv4 header + if len(bs) < 20 { + continue + } + // Check the packet size + if n != 256*int(bs[2])+int(bs[3]) { + continue + } + // IPv4 address + addrlen = 4 + copy(srcAddr[:addrlen], bs[12:]) + copy(dstAddr[:addrlen], bs[16:]) + } else { + // Unknown address length or protocol, so drop the packet and ignore it + tun.log.Traceln("Unknown packet type, dropping") + continue + } + if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) { + // The packet had a source address that doesn't belong to us or our + // configured crypto-key routing source subnets + continue + } + if !dstAddr.IsValid() && !dstSnet.IsValid() { + if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil { + // A public key was found, get the node ID for the search + dstNodeID = crypto.GetNodeID(&key) + // Do a quick check to ensure that the node ID refers to a vaild + // Yggdrasil address or subnet - this might be superfluous + addr := *address.AddrForNodeID(dstNodeID) + copy(dstAddr[:], addr[:]) + copy(dstSnet[:], addr[:]) + // Are we certain we looked up a valid node? + if !dstAddr.IsValid() && !dstSnet.IsValid() { + continue + } + } else { + // No public key was found in the CKR table so we've exhausted our options + continue + } + } + // Do we have an active connection for this node address? + tun.mutex.RLock() + session, isIn := tun.addrToConn[dstAddr] + if !isIn || session == nil { + session, isIn = tun.subnetToConn[dstSnet] + if !isIn || session == nil { + // Neither an address nor a subnet mapping matched, therefore populate + // the node ID and mask to commence a search + if dstAddr.IsValid() { + dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask() + } else { + dstNodeID, dstNodeIDMask = dstSnet.GetNodeIDandMask() + } + } + } + tun.mutex.RUnlock() + // If we don't have a connection then we should open one + if !isIn || session == nil { + // Check we haven't been given empty node ID, really this shouldn't ever + // happen but just to be sure... + if dstNodeID == nil || dstNodeIDMask == nil { + panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") + } + // Dial to the remote node + go func() { + // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes + tun.mutex.Lock() + _, known := tun.dials[*dstNodeID] + tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs) + for len(tun.dials[*dstNodeID]) > 32 { + util.PutBytes(tun.dials[*dstNodeID][0]) + tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] + } + tun.mutex.Unlock() + if known { + return + } + var tc *tunConn + if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { + // We've been given a connection so prepare the session wrapper + if tc, err = tun.wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + } + } + tun.mutex.Lock() + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + tun.mutex.Unlock() + if tc != nil { + for _, packet := range packets { + select { + case tc.send <- packet: + default: + util.PutBytes(packet) + } + } + } + }() + // While the dial is going on we can't do much else + // continuing this iteration - skip to the next one + continue + } + // If we have a connection now, try writing to it + if isIn && session != nil { + select { + case session.send <- bs: + default: + util.PutBytes(bs) + } + } + } +} + +func (tun *TunAdapter) reader() error { + recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH) + toWorker := make(chan []byte, 32) + defer close(toWorker) + go tun.readerPacketHandler(toWorker) + for { + // Wait for a packet to be delivered to us through the TUN/TAP adapter + n, err := tun.iface.Read(recvd) + if err != nil { + if !tun.isOpen { + return err + } + panic(err) + } + if n == 0 { + continue + } + bs := append(util.GetBytes(), recvd[:n]...) toWorker <- bs } }