diff --git a/go.mod b/go.mod index 3f5cae88..fff4ae6f 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/yggdrasil-network/yggdrasil-go require ( - github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438 + github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hashicorp/go-syslog v1.0.0 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 diff --git a/go.sum b/go.sum index 22276c2c..29854cbf 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438 h1:t4tRgrItIq2ap4O31yOuWm17lUiyzf8gf/P+bEfgmrw= -github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438/go.mod h1:2Q9yJvg2PlMrnOEa3RTEy9hElWAICo/D8HTUDqAHUAo= +github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5 h1:D2Djqo/q7mftrtHLCpW4Rpplm8jj+Edc9jNz8Ll6E0A= +github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5/go.mod h1:2Q9yJvg2PlMrnOEa3RTEy9hElWAICo/D8HTUDqAHUAo= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE= diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 4ba0b2aa..1828b556 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -162,7 +162,7 @@ func (c *Conn) ReadNoCopy() ([]byte, error) { } else { return nil, ConnError{errors.New("session closed"), false, false, true, 0} } - case bs := <-c.session.recv: + case bs := <-c.session.toConn: return bs, nil } } @@ -221,7 +221,7 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { } else { err = ConnError{errors.New("session closed"), false, false, true, 0} } - case c.session.send <- msg: + case <-c.session.SyncExec(func() { c.session._send(msg) }): } } return err diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index fcd9364c..8869bd2a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -210,7 +210,7 @@ func (p *peer) linkLoop() { case dinfo = <-p.dinfo: case _ = <-tick.C: if dinfo != nil { - p.core.router.insertPeer(&p.core.router, dinfo) + p.core.router.insertPeer(nil, dinfo) } } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 25f8e800..b5df107e 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -66,7 +66,7 @@ func (r *router) init(core *Core) { p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) p.out = func(packets [][]byte) { // TODO make peers and/or the switch into actors, have them pass themselves as the from field - r.handlePackets(r, packets) + r.handlePackets(nil, packets) } r.out = p.handlePacket // TODO if the peer becomes its own actor, then send a message here r.nodeinfo.init(r.core) @@ -160,6 +160,8 @@ func (r *router) _handleTraffic(packet []byte) { util.PutBytes(p.Payload) return } + sinfo.recv(r, &p) + return select { case sinfo.fromRouter <- p: case <-sinfo.cancel.Finished(): diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 13f64424..84ea92af 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -72,8 +72,9 @@ type sessionInfo struct { init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use cancel util.Cancellation // Used to terminate workers fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session - recv chan []byte // Decrypted packets go here, picked up by the associated Conn - send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent + toConn chan []byte // Decrypted packets go here, picked up by the associated Conn + fromConn chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent + callbacks []chan func() // Finished work from crypto workers } // TODO remove this, call SyncExec directly @@ -253,8 +254,8 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.fromRouter = make(chan wire_trafficPacket, 1) - sinfo.recv = make(chan []byte, 32) - sinfo.send = make(chan FlowKeyMessage, 32) + sinfo.toConn = make(chan []byte, 32) + sinfo.fromConn = make(chan FlowKeyMessage, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle go func() { @@ -264,7 +265,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.sessions.removeSession(&sinfo) }) }() - go sinfo.startWorkers() + //go sinfo.startWorkers() return &sinfo } @@ -539,7 +540,7 @@ func (sinfo *sessionInfo) recvWorker() { select { case <-sinfo.cancel.Finished(): util.PutBytes(bs) - case sinfo.recv <- bs: + case sinfo.toConn <- bs: } } } @@ -664,15 +665,127 @@ func (sinfo *sessionInfo) sendWorker() { f() case <-sinfo.cancel.Finished(): return - case msg := <-sinfo.send: + case msg := <-sinfo.fromConn: doSend(msg) } } select { case <-sinfo.cancel.Finished(): return - case bs := <-sinfo.send: + case bs := <-sinfo.fromConn: doSend(bs) } } } + +//////////////////////////////////////////////////////////////////////////////// + +func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) { + sinfo.EnqueueFrom(from, func() { + sinfo._recvPacket(packet) + }) +} + +func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { + select { + case <-sinfo.init: + default: + // TODO find a better way to drop things until initialized + util.PutBytes(p.Payload) + return + } + switch { + case sinfo._nonceIsOK(&p.Nonce): + case len(sinfo.toConn) < cap(sinfo.toConn): + default: + // We're either full or don't like this nonce + util.PutBytes(p.Payload) + return + } + + k := sinfo.sharedSesKey + var isOK bool + var bs []byte + ch := make(chan func(), 1) + poolFunc := func() { + bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) + callback := func() { + util.PutBytes(p.Payload) + if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { + // Either we failed to decrypt, or the session was updated, or we received this packet in the mean time + util.PutBytes(bs) + return + } + sinfo._updateNonce(&p.Nonce) + sinfo.time = time.Now() + sinfo.bytesRecvd += uint64(len(bs)) + select { + case sinfo.toConn <- bs: + default: + // We seem to have filled up the buffer in the mean time, so drop it + util.PutBytes(bs) + } + } + ch <- callback + sinfo.checkCallbacks() + } + sinfo.callbacks = append(sinfo.callbacks, ch) + util.WorkerGo(poolFunc) +} + +func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { + select { + case <-sinfo.init: + default: + // TODO find a better way to drop things until initialized + util.PutBytes(msg.Message) + return + } + sinfo.bytesSent += uint64(len(msg.Message)) + coords := append([]byte(nil), sinfo.coords...) + if msg.FlowKey != 0 { + coords = append(coords, 0) + coords = append(coords, wire_encode_uint64(msg.FlowKey)...) + } + p := wire_trafficPacket{ + Coords: coords, + Handle: sinfo.theirHandle, + Nonce: sinfo.myNonce, + } + sinfo.myNonce.Increment() + k := sinfo.sharedSesKey + ch := make(chan func(), 1) + poolFunc := func() { + p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) + callback := func() { + // Encoding may block on a util.GetBytes(), so kept out of the worker pool + packet := p.encode() + // Cleanup + util.PutBytes(msg.Message) + util.PutBytes(p.Payload) + // Send the packet + // TODO replace this with a send to the peer struct if that becomes an actor + sinfo.sessions.router.EnqueueFrom(sinfo, func() { + sinfo.sessions.router.out(packet) + }) + } + ch <- callback + sinfo.checkCallbacks() + } + sinfo.callbacks = append(sinfo.callbacks, ch) + util.WorkerGo(poolFunc) +} + +func (sinfo *sessionInfo) checkCallbacks() { + sinfo.EnqueueFrom(nil, func() { + if len(sinfo.callbacks) > 0 { + select { + case callback := <-sinfo.callbacks[0]: + sinfo.callbacks = sinfo.callbacks[1:] + callback() + sinfo.checkCallbacks() + default: + } + } + }) +} diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 86ae102b..1fa75a6c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -245,7 +245,7 @@ func (t *switchTable) cleanRoot() { if t.data.locator.root != t.key { t.data.seq++ t.updater.Store(&sync.Once{}) - t.core.router.reset(&t.core.router) + t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.core.peers.sendSwitchMsgs() @@ -508,7 +508,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep if !equiv(&sender.locator, &t.data.locator) { doUpdate = true t.data.seq++ - t.core.router.reset(&t.core.router) + t.core.router.reset(nil) } if t.data.locator.tstamp != sender.locator.tstamp { t.time = now