From 533da351f9b0f0b1ffa6a2d459aeece7c05d2b2c Mon Sep 17 00:00:00 2001
From: Arceliar <Arceliar@users.noreply.github.com>
Date: Fri, 23 Aug 2019 22:23:01 -0500
Subject: [PATCH] fix actor EnqueueFrom stack overflow (use nil now to send
 from self) and replace session send/recv workers with actor functions

---
 go.mod                   |   2 +-
 go.sum                   |   4 +-
 src/yggdrasil/conn.go    |   4 +-
 src/yggdrasil/peer.go    |   2 +-
 src/yggdrasil/router.go  |   4 +-
 src/yggdrasil/session.go | 129 ++++++++++++++++++++++++++++++++++++---
 src/yggdrasil/switch.go  |   4 +-
 7 files changed, 132 insertions(+), 17 deletions(-)

diff --git a/go.mod b/go.mod
index 3f5cae88..fff4ae6f 100644
--- a/go.mod
+++ b/go.mod
@@ -1,7 +1,7 @@
 module github.com/yggdrasil-network/yggdrasil-go
 
 require (
-	github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438
+	github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5
 	github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
 	github.com/hashicorp/go-syslog v1.0.0
 	github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222
diff --git a/go.sum b/go.sum
index 22276c2c..29854cbf 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438 h1:t4tRgrItIq2ap4O31yOuWm17lUiyzf8gf/P+bEfgmrw=
-github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438/go.mod h1:2Q9yJvg2PlMrnOEa3RTEy9hElWAICo/D8HTUDqAHUAo=
+github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5 h1:D2Djqo/q7mftrtHLCpW4Rpplm8jj+Edc9jNz8Ll6E0A=
+github.com/Arceliar/phony v0.0.0-20190824031448-b53e115f69b5/go.mod h1:2Q9yJvg2PlMrnOEa3RTEy9hElWAICo/D8HTUDqAHUAo=
 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
 github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go
index 4ba0b2aa..1828b556 100644
--- a/src/yggdrasil/conn.go
+++ b/src/yggdrasil/conn.go
@@ -162,7 +162,7 @@ func (c *Conn) ReadNoCopy() ([]byte, error) {
 		} else {
 			return nil, ConnError{errors.New("session closed"), false, false, true, 0}
 		}
-	case bs := <-c.session.recv:
+	case bs := <-c.session.toConn:
 		return bs, nil
 	}
 }
@@ -221,7 +221,7 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
 			} else {
 				err = ConnError{errors.New("session closed"), false, false, true, 0}
 			}
-		case c.session.send <- msg:
+		case <-c.session.SyncExec(func() { c.session._send(msg) }):
 		}
 	}
 	return err
diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go
index fcd9364c..8869bd2a 100644
--- a/src/yggdrasil/peer.go
+++ b/src/yggdrasil/peer.go
@@ -210,7 +210,7 @@ func (p *peer) linkLoop() {
 		case dinfo = <-p.dinfo:
 		case _ = <-tick.C:
 			if dinfo != nil {
-				p.core.router.insertPeer(&p.core.router, dinfo)
+				p.core.router.insertPeer(nil, dinfo)
 			}
 		}
 	}
diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go
index 25f8e800..b5df107e 100644
--- a/src/yggdrasil/router.go
+++ b/src/yggdrasil/router.go
@@ -66,7 +66,7 @@ func (r *router) init(core *Core) {
 	p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
 	p.out = func(packets [][]byte) {
 		// TODO make peers and/or the switch into actors, have them pass themselves as the from field
-		r.handlePackets(r, packets)
+		r.handlePackets(nil, packets)
 	}
 	r.out = p.handlePacket // TODO if the peer becomes its own actor, then send a message here
 	r.nodeinfo.init(r.core)
@@ -160,6 +160,8 @@ func (r *router) _handleTraffic(packet []byte) {
 		util.PutBytes(p.Payload)
 		return
 	}
+	sinfo.recv(r, &p)
+	return
 	select {
 	case sinfo.fromRouter <- p:
 	case <-sinfo.cancel.Finished():
diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go
index 13f64424..84ea92af 100644
--- a/src/yggdrasil/session.go
+++ b/src/yggdrasil/session.go
@@ -72,8 +72,9 @@ type sessionInfo struct {
 	init           chan struct{}                 // Closed when the first session pong arrives, used to signal that the session is ready for initial use
 	cancel         util.Cancellation             // Used to terminate workers
 	fromRouter     chan wire_trafficPacket       // Received packets go here, to be decrypted by the session
-	recv           chan []byte                   // Decrypted packets go here, picked up by the associated Conn
-	send           chan FlowKeyMessage           // Packets with optional flow key go here, to be encrypted and sent
+	toConn         chan []byte                   // Decrypted packets go here, picked up by the associated Conn
+	fromConn       chan FlowKeyMessage           // Packets with optional flow key go here, to be encrypted and sent
+	callbacks      []chan func()                 // Finished work from crypto workers
 }
 
 // TODO remove this, call SyncExec directly
@@ -253,8 +254,8 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
 	sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
 	sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
 	sinfo.fromRouter = make(chan wire_trafficPacket, 1)
-	sinfo.recv = make(chan []byte, 32)
-	sinfo.send = make(chan FlowKeyMessage, 32)
+	sinfo.toConn = make(chan []byte, 32)
+	sinfo.fromConn = make(chan FlowKeyMessage, 32)
 	ss.sinfos[sinfo.myHandle] = &sinfo
 	ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
 	go func() {
@@ -264,7 +265,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
 			sinfo.sessions.removeSession(&sinfo)
 		})
 	}()
-	go sinfo.startWorkers()
+	//go sinfo.startWorkers()
 	return &sinfo
 }
 
@@ -539,7 +540,7 @@ func (sinfo *sessionInfo) recvWorker() {
 					select {
 					case <-sinfo.cancel.Finished():
 						util.PutBytes(bs)
-					case sinfo.recv <- bs:
+					case sinfo.toConn <- bs:
 					}
 				}
 			}
@@ -664,15 +665,127 @@ func (sinfo *sessionInfo) sendWorker() {
 				f()
 			case <-sinfo.cancel.Finished():
 				return
-			case msg := <-sinfo.send:
+			case msg := <-sinfo.fromConn:
 				doSend(msg)
 			}
 		}
 		select {
 		case <-sinfo.cancel.Finished():
 			return
-		case bs := <-sinfo.send:
+		case bs := <-sinfo.fromConn:
 			doSend(bs)
 		}
 	}
 }
+
+////////////////////////////////////////////////////////////////////////////////
+
+func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) {
+	sinfo.EnqueueFrom(from, func() {
+		sinfo._recvPacket(packet)
+	})
+}
+
+func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
+	select {
+	case <-sinfo.init:
+	default:
+		// TODO find a better way to drop things until initialized
+		util.PutBytes(p.Payload)
+		return
+	}
+	switch {
+	case sinfo._nonceIsOK(&p.Nonce):
+	case len(sinfo.toConn) < cap(sinfo.toConn):
+	default:
+		// We're either full or don't like this nonce
+		util.PutBytes(p.Payload)
+		return
+	}
+
+	k := sinfo.sharedSesKey
+	var isOK bool
+	var bs []byte
+	ch := make(chan func(), 1)
+	poolFunc := func() {
+		bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
+		callback := func() {
+			util.PutBytes(p.Payload)
+			if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
+				// Either we failed to decrypt, or the session was updated, or we received this packet in the mean time
+				util.PutBytes(bs)
+				return
+			}
+			sinfo._updateNonce(&p.Nonce)
+			sinfo.time = time.Now()
+			sinfo.bytesRecvd += uint64(len(bs))
+			select {
+			case sinfo.toConn <- bs:
+			default:
+				// We seem to have filled up the buffer in the mean time, so drop it
+				util.PutBytes(bs)
+			}
+		}
+		ch <- callback
+		sinfo.checkCallbacks()
+	}
+	sinfo.callbacks = append(sinfo.callbacks, ch)
+	util.WorkerGo(poolFunc)
+}
+
+func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
+	select {
+	case <-sinfo.init:
+	default:
+		// TODO find a better way to drop things until initialized
+		util.PutBytes(msg.Message)
+		return
+	}
+	sinfo.bytesSent += uint64(len(msg.Message))
+	coords := append([]byte(nil), sinfo.coords...)
+	if msg.FlowKey != 0 {
+		coords = append(coords, 0)
+		coords = append(coords, wire_encode_uint64(msg.FlowKey)...)
+	}
+	p := wire_trafficPacket{
+		Coords: coords,
+		Handle: sinfo.theirHandle,
+		Nonce:  sinfo.myNonce,
+	}
+	sinfo.myNonce.Increment()
+	k := sinfo.sharedSesKey
+	ch := make(chan func(), 1)
+	poolFunc := func() {
+		p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
+		callback := func() {
+			// Encoding may block on a util.GetBytes(), so kept out of the worker pool
+			packet := p.encode()
+			// Cleanup
+			util.PutBytes(msg.Message)
+			util.PutBytes(p.Payload)
+			// Send the packet
+			// TODO replace this with a send to the peer struct if that becomes an actor
+			sinfo.sessions.router.EnqueueFrom(sinfo, func() {
+				sinfo.sessions.router.out(packet)
+			})
+		}
+		ch <- callback
+		sinfo.checkCallbacks()
+	}
+	sinfo.callbacks = append(sinfo.callbacks, ch)
+	util.WorkerGo(poolFunc)
+}
+
+func (sinfo *sessionInfo) checkCallbacks() {
+	sinfo.EnqueueFrom(nil, func() {
+		if len(sinfo.callbacks) > 0 {
+			select {
+			case callback := <-sinfo.callbacks[0]:
+				sinfo.callbacks = sinfo.callbacks[1:]
+				callback()
+				sinfo.checkCallbacks()
+			default:
+			}
+		}
+	})
+}
diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go
index 86ae102b..1fa75a6c 100644
--- a/src/yggdrasil/switch.go
+++ b/src/yggdrasil/switch.go
@@ -245,7 +245,7 @@ func (t *switchTable) cleanRoot() {
 		if t.data.locator.root != t.key {
 			t.data.seq++
 			t.updater.Store(&sync.Once{})
-			t.core.router.reset(&t.core.router)
+			t.core.router.reset(nil)
 		}
 		t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
 		t.core.peers.sendSwitchMsgs()
@@ -508,7 +508,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
 		if !equiv(&sender.locator, &t.data.locator) {
 			doUpdate = true
 			t.data.seq++
-			t.core.router.reset(&t.core.router)
+			t.core.router.reset(nil)
 		}
 		if t.data.locator.tstamp != sender.locator.tstamp {
 			t.time = now