From aa30c6cc98071cb4c953049baf3ff8e19e213044 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Aug 2019 10:36:09 -0500 Subject: [PATCH] upgrade phony dependency and switch to its new interface --- go.mod | 2 +- go.sum | 4 ++-- src/tuntap/conn.go | 4 ++-- src/tuntap/tun.go | 2 +- src/yggdrasil/conn.go | 24 ++++++++++++------------ src/yggdrasil/link.go | 10 +++++----- src/yggdrasil/peer.go | 18 +++++++++--------- src/yggdrasil/router.go | 14 +++++++------- src/yggdrasil/session.go | 24 ++++++++++++------------ src/yggdrasil/switch.go | 6 +++--- 10 files changed, 54 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index a71eb4bc..f3d8417f 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/yggdrasil-network/yggdrasil-go require ( - github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e + github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hashicorp/go-syslog v1.0.0 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 diff --git a/go.sum b/go.sum index 0aba11ef..29e0dfec 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e h1:qsNZzfxSvlSE4JZ3OpmDmAeqCRpOO3RI9eJ7U6z23Gk= -github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e/go.mod h1:+/sVcxsqK1Sjm3Vd+yCfMAohJOfTRyNh24apkxhqU3Q= +github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe h1:U5bediuXjZ1y6bByIXXraoE319yFp9kx1z8K6el7Ftc= +github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE= diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index 744ca9fc..0e0dd461 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -17,7 +17,7 @@ import ( const tunConnTimeout = 2 * time.Minute type tunConn struct { - phony.Actor + phony.Inbox tun *TunAdapter conn *yggdrasil.Conn addr address.Address @@ -198,7 +198,7 @@ func (s *tunConn) _write(bs []byte) (err error) { // No point in wasting resources to send back an error if there was none return } - s.EnqueueFrom(s.conn, func() { + s.RecvFrom(s.conn, func() { if e, eok := err.(yggdrasil.ConnError); !eok { if e.Closed() { s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 895f893b..dfc343df 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -257,7 +257,7 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { tun.subnetToConn[s.snet] = &s // Set the read callback and start the timeout goroutine conn.SetReadCallback(func(bs []byte) { - s.EnqueueFrom(conn, func() { + s.RecvFrom(conn, func() { s._read(bs) }) }) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 2f69139e..b0c26867 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -54,7 +54,7 @@ func (e *ConnError) Closed() bool { } type Conn struct { - phony.Actor + phony.Inbox core *Core readDeadline *time.Time writeDeadline *time.Time @@ -84,8 +84,8 @@ func (c *Conn) String() string { return s } -func (c *Conn) setMTU(from phony.IActor, mtu uint16) { - c.EnqueueFrom(from, func() { c.mtu = mtu }) +func (c *Conn) setMTU(from phony.Actor, mtu uint16) { + c.RecvFrom(from, func() { c.mtu = mtu }) } // This should never be called from the router goroutine, used in the dial functions @@ -143,7 +143,7 @@ func (c *Conn) doSearch() { sinfo.continueSearch() } } - c.core.router.EnqueueFrom(c.session, routerWork) + c.core.router.RecvFrom(c.session, routerWork) } func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { @@ -159,7 +159,7 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) // SetReadCallback sets a callback which will be called whenever a packet is received. func (c *Conn) SetReadCallback(callback func([]byte)) { - c.EnqueueFrom(nil, func() { + c.RecvFrom(nil, func() { c.readCallback = callback c._drainReadBuffer() }) @@ -172,14 +172,14 @@ func (c *Conn) _drainReadBuffer() { select { case bs := <-c.readBuffer: c.readCallback(bs) - c.EnqueueFrom(nil, c._drainReadBuffer) // In case there's more + c.RecvFrom(nil, c._drainReadBuffer) // In case there's more default: } } // Called by the session to pass a new message to the Conn -func (c *Conn) recvMsg(from phony.IActor, msg []byte) { - c.EnqueueFrom(from, func() { +func (c *Conn) recvMsg(from phony.Actor, msg []byte) { + c.RecvFrom(from, func() { if c.readCallback != nil { c.readCallback(msg) } else { @@ -234,7 +234,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error { if len(msg.Message) > int(c.mtu) { return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)} } - c.session.EnqueueFrom(c, func() { + c.session.RecvFrom(c, func() { // Send the packet c.session._send(msg) // Session keep-alive, while we wait for the crypto workers from send @@ -254,11 +254,11 @@ func (c *Conn) _write(msg FlowKeyMessage) error { return nil } -// WriteFrom should be called by a phony.IActor, and tells the Conn to send a message. +// WriteFrom should be called by a phony.Actor, and tells the Conn to send a message. // This is used internaly by WriteNoCopy and Write. // If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage. -func (c *Conn) WriteFrom(from phony.IActor, msg FlowKeyMessage, callback func(error)) { - c.EnqueueFrom(from, func() { +func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) { + c.RecvFrom(from, func() { callback(c._write(msg)) }) } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 98f44bc7..d4779ead 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -320,7 +320,7 @@ func (intf *linkInterface) handler() error { isAlive = true if !isReady { // (Re-)enable in the switch - intf.link.core.switchTable.EnqueueFrom(nil, func() { + intf.link.core.switchTable.RecvFrom(nil, func() { intf.link.core.switchTable._idleIn(intf.peer.port) }) isReady = true @@ -359,7 +359,7 @@ func (intf *linkInterface) handler() error { isReady = false } else { // Keep enabled in the switch - intf.link.core.switchTable.EnqueueFrom(nil, func() { + intf.link.core.switchTable.RecvFrom(nil, func() { intf.link.core.switchTable._idleIn(intf.peer.port) }) isReady = true @@ -390,7 +390,7 @@ func (intf *linkInterface) handler() error { } }() // Run reader loop - var helper phony.Actor + var helper phony.Inbox done := make(chan struct{}) var helperFunc func() helperFunc = func() { @@ -417,10 +417,10 @@ func (intf *linkInterface) handler() error { default: } // Now try to read again - helper.EnqueueFrom(nil, helperFunc) + helper.RecvFrom(nil, helperFunc) } // Start the read loop - helper.EnqueueFrom(nil, helperFunc) + helper.RecvFrom(nil, helperFunc) <-done // Wait for the helper to exit //////////////////////////////////////////////////////////////////////////////// // Remember to set `err` to something useful before returning diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7fe33ecd..aa31bb1b 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -96,7 +96,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { // Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { - phony.Actor + phony.Inbox core *Core intf *linkInterface port switchPort @@ -175,13 +175,13 @@ func (ps *peers) removePeer(port switchPort) { // If called, sends a notification to each peer that they should send a new switch message. // Mainly called by the switch after an update. -func (ps *peers) sendSwitchMsgs(from phony.IActor) { +func (ps *peers) sendSwitchMsgs(from phony.Actor) { ports := ps.getPorts() for _, p := range ports { if p.port == 0 { continue } - p.EnqueueFrom(from, p._sendSwitchMsg) + p.RecvFrom(from, p._sendSwitchMsg) } } @@ -207,8 +207,8 @@ func (p *peer) _updateDHT() { } } -func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) { - p.EnqueueFrom(from, func() { +func (p *peer) handlePacketFrom(from phony.Actor, packet []byte) { + p.RecvFrom(from, func() { p._handlePacket(packet) }) } @@ -245,8 +245,8 @@ func (p *peer) _handleTraffic(packet []byte) { p.core.switchTable.packetInFrom(p, packet) } -func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) { - p.EnqueueFrom(from, func() { +func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { + p.RecvFrom(from, func() { p._sendPackets(packets) }) } @@ -263,7 +263,7 @@ func (p *peer) _sendPackets(packets [][]byte) { p.out(packets) } -var peerLinkOutHelper phony.Actor +var peerLinkOutHelper phony.Inbox // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. @@ -281,7 +281,7 @@ func (p *peer) _sendLinkPacket(packet []byte) { } packet = linkPacket.encode() // TODO replace this with a message send if/when the link becomes an actor - peerLinkOutHelper.EnqueueFrom(nil, func() { + peerLinkOutHelper.RecvFrom(nil, func() { select { case p.linkOut <- packet: case <-p.done: diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index adf1b1d4..7b6a9b80 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -37,7 +37,7 @@ import ( // The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { - phony.Actor + phony.Inbox core *Core reconfigure chan chan error addr address.Address @@ -83,8 +83,8 @@ func (r *router) start() error { } // In practice, the switch will call this with 1 packet -func (r *router) handlePackets(from phony.IActor, packets [][]byte) { - r.EnqueueFrom(from, func() { +func (r *router) handlePackets(from phony.Actor, packets [][]byte) { + r.RecvFrom(from, func() { for _, packet := range packets { r._handlePacket(packet) } @@ -92,15 +92,15 @@ func (r *router) handlePackets(from phony.IActor, packets [][]byte) { } // Insert a peer info into the dht, TODO? make the dht a separate actor -func (r *router) insertPeer(from phony.IActor, info *dhtInfo) { - r.EnqueueFrom(from, func() { +func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { + r.RecvFrom(from, func() { r.dht.insertPeer(info) }) } // Reset sessions and DHT after the switch sees our coords change -func (r *router) reset(from phony.IActor) { - r.EnqueueFrom(from, func() { +func (r *router) reset(from phony.Actor) { + r.RecvFrom(from, func() { r.sessions.reset() r.dht.reset() }) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 5243fb0f..f1263379 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -38,7 +38,7 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[0] } // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - phony.Actor // Protects all of the below, use it any time you read/change the contents of a session + phony.Inbox // Protects all of the below, use it any time you read/change the contents of a session sessions *sessions // reconfigure chan chan error // theirAddr address.Address // @@ -342,8 +342,8 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, } // Sends a session ping by calling sendPingPong in ping mode. -func (sinfo *sessionInfo) ping(from phony.IActor) { - sinfo.EnqueueFrom(from, func() { +func (sinfo *sessionInfo) ping(from phony.Actor) { + sinfo.RecvFrom(from, func() { sinfo._sendPingPong(false) }) } @@ -364,14 +364,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { } packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first - sinfo.sessions.router.EnqueueFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) + sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) if sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } } -func (sinfo *sessionInfo) setConn(from phony.IActor, conn *Conn) { - sinfo.EnqueueFrom(from, func() { +func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { + sinfo.RecvFrom(from, func() { sinfo.conn = conn sinfo.conn.setMTU(sinfo, sinfo._getMTU()) }) @@ -406,7 +406,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { ss.listenerMutex.Unlock() } if sinfo != nil { - sinfo.EnqueueFrom(ss.router, func() { + sinfo.RecvFrom(ss.router, func() { // Update the session if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ return @@ -474,7 +474,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.EnqueueFrom(ss.router, func() { + sinfo.RecvFrom(ss.router, func() { sinfo.reset = true }) } @@ -489,8 +489,8 @@ type FlowKeyMessage struct { Message []byte } -func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) { - sinfo.EnqueueFrom(from, func() { +func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) { + sinfo.RecvFrom(from, func() { sinfo._recvPacket(packet) }) } @@ -564,7 +564,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { util.PutBytes(p.Payload) // Send the packet // TODO replace this with a send to the peer struct if that becomes an actor - sinfo.sessions.router.EnqueueFrom(sinfo, func() { + sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) } @@ -576,7 +576,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { } func (sinfo *sessionInfo) checkCallbacks() { - sinfo.EnqueueFrom(nil, func() { + sinfo.RecvFrom(nil, func() { if len(sinfo.callbacks) > 0 { select { case callback := <-sinfo.callbacks[0]: diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index b6fc26f8..db63a845 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -174,7 +174,7 @@ type switchTable struct { data switchData // updater atomic.Value // *sync.Once table atomic.Value // lookupTable - phony.Actor // Owns the below + phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor } @@ -828,8 +828,8 @@ func (t *switchTable) _handleIdle(port switchPort) bool { return false } -func (t *switchTable) packetInFrom(from phony.IActor, bytes []byte) { - t.EnqueueFrom(from, func() { +func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { + t.RecvFrom(from, func() { t._packetIn(bytes) }) }