From 3845f81357dae0f29062e7caa60b1a723efddf05 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 27 Aug 2019 19:43:54 -0500 Subject: [PATCH] update to latest phony, adjust interface use accordingly --- go.mod | 2 +- go.sum | 4 ++-- src/tuntap/conn.go | 6 +++--- src/tuntap/iface.go | 10 +++++----- src/tuntap/tun.go | 12 ++++++------ src/yggdrasil/api.go | 6 ++++-- src/yggdrasil/conn.go | 28 ++++++++++++++-------------- src/yggdrasil/debug.go | 12 +++++++----- src/yggdrasil/link.go | 22 +++++++++++----------- src/yggdrasil/peer.go | 10 +++++----- src/yggdrasil/router.go | 14 +++++++------- src/yggdrasil/search.go | 2 +- src/yggdrasil/session.go | 22 +++++++++++----------- src/yggdrasil/switch.go | 6 +++--- 14 files changed, 80 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index f3d8417f..9ff6031a 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/yggdrasil-network/yggdrasil-go require ( - github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe + github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hashicorp/go-syslog v1.0.0 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 diff --git a/go.sum b/go.sum index 29e0dfec..60388964 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe h1:U5bediuXjZ1y6bByIXXraoE319yFp9kx1z8K6el7Ftc= -github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= +github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44 h1:8EiuIp65v8aLkLc4LWxtn4NTH+P2LwDmrKKWdBzn9cI= +github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE= diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index afb0f09e..31875d94 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -27,7 +27,7 @@ type tunConn struct { } func (s *tunConn) close() { - s.tun.RecvFrom(s, s._close_from_tun) + s.tun.Act(s, s._close_from_tun) } func (s *tunConn) _close_from_tun() { @@ -117,7 +117,7 @@ func (s *tunConn) _read(bs []byte) (err error) { } func (s *tunConn) writeFrom(from phony.Actor, bs []byte) { - s.RecvFrom(from, func() { + s.Act(from, func() { s._write(bs) }) } @@ -197,7 +197,7 @@ func (s *tunConn) _write(bs []byte) (err error) { // No point in wasting resources to send back an error if there was none return } - s.RecvFrom(s.conn, func() { + s.Act(s.conn, func() { if e, eok := err.(yggdrasil.ConnError); !eok { if e.Closed() { s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index da419369..92ba36ab 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -19,7 +19,7 @@ type tunWriter struct { } func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { - w.RecvFrom(from, func() { + w.Act(from, func() { w._write(b) }) } @@ -90,7 +90,7 @@ func (w *tunWriter) _write(b []byte) { util.PutBytes(b) } if err != nil { - w.tun.RecvFrom(w, func() { + w.tun.Act(w, func() { if !w.tun.isOpen { w.tun.log.Errorln("TUN/TAP iface write error:", err) } @@ -118,12 +118,12 @@ func (r *tunReader) _read() { } if err == nil { // Now read again - r.RecvFrom(nil, r._read) + r.Act(nil, r._read) } } func (tun *TunAdapter) handlePacketFrom(from phony.Actor, packet []byte, err error) { - tun.RecvFrom(from, func() { + tun.Act(from, func() { tun._handlePacket(packet, err) }) } @@ -248,7 +248,7 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) { if !known { go func() { conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask) - tun.RecvFrom(nil, func() { + tun.Act(nil, func() { packets := tun.dials[*dstNodeID] delete(tun.dials, *dstNodeID) if err != nil { diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 2b163e3b..6317459c 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -125,7 +125,7 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener // reader actor to handle packets on that interface. func (tun *TunAdapter) Start() error { var err error - <-tun.SyncExec(func() { + phony.Block(tun, func() { err = tun._start() }) return err @@ -167,7 +167,7 @@ func (tun *TunAdapter) _start() error { } }() go tun.handler() - tun.reader.RecvFrom(nil, tun.reader._read) // Start the reader + tun.reader.Act(nil, tun.reader._read) // Start the reader tun.icmpv6.Init(tun) if iftapmode { go tun.icmpv6.Solicit(tun.addr) @@ -180,7 +180,7 @@ func (tun *TunAdapter) _start() error { // read/write goroutines to handle packets on that interface. func (tun *TunAdapter) Stop() error { var err error - <-tun.SyncExec(func() { + phony.Block(tun, func() { err = tun._stop() }) return err @@ -233,7 +233,7 @@ func (tun *TunAdapter) handler() error { tun.log.Errorln("TUN/TAP connection accept error:", err) return err } - <-tun.SyncExec(func() { + phony.Block(tun, func() { if _, err := tun._wrap(conn); err != nil { // Something went wrong when storing the connection, typically that // something already exists for this address or subnet @@ -273,11 +273,11 @@ func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { tun.subnetToConn[s.snet] = &s // Set the read callback and start the timeout conn.SetReadCallback(func(bs []byte) { - s.RecvFrom(conn, func() { + s.Act(conn, func() { s._read(bs) }) }) - s.RecvFrom(nil, s.stillAlive) + s.Act(nil, s.stillAlive) // Return return c, err } diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index c6c15e24..44cbad1d 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -11,6 +11,8 @@ import ( "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + + "github.com/Arceliar/phony" ) // Peer represents a single peer object. This contains information from the @@ -106,7 +108,7 @@ func (c *Core) GetPeers() []Peer { for _, port := range ps { p := ports[port] var info Peer - <-p.SyncExec(func() { + phony.Block(p, func() { info = Peer{ Endpoint: p.intf.name, BytesSent: p.bytesSent, @@ -138,7 +140,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer { } coords := elem.locator.getCoords() var info SwitchPeer - <-peer.SyncExec(func() { + phony.Block(peer, func() { info = SwitchPeer{ Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...), BytesSent: peer.bytesSent, diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index efc0c81e..c56c2a90 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -80,12 +80,12 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session func (c *Conn) String() string { var s string - <-c.SyncExec(func() { s = fmt.Sprintf("conn=%p", c) }) + phony.Block(c, func() { s = fmt.Sprintf("conn=%p", c) }) return s } func (c *Conn) setMTU(from phony.Actor, mtu uint16) { - c.RecvFrom(from, func() { c.mtu = mtu }) + c.Act(from, func() { c.mtu = mtu }) } // This should never be called from the router goroutine, used in the dial functions @@ -143,7 +143,7 @@ func (c *Conn) doSearch() { sinfo.continueSearch() } } - c.core.router.RecvFrom(c.session, routerWork) + c.core.router.Act(c.session, routerWork) } func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { @@ -159,7 +159,7 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) // SetReadCallback sets a callback which will be called whenever a packet is received. func (c *Conn) SetReadCallback(callback func([]byte)) { - c.RecvFrom(nil, func() { + c.Act(nil, func() { c.readCallback = callback c._drainReadBuffer() }) @@ -172,14 +172,14 @@ func (c *Conn) _drainReadBuffer() { select { case bs := <-c.readBuffer: c.readCallback(bs) - c.RecvFrom(nil, c._drainReadBuffer) // In case there's more + c.Act(nil, c._drainReadBuffer) // In case there's more default: } } // Called by the session to pass a new message to the Conn func (c *Conn) recvMsg(from phony.Actor, msg []byte) { - c.RecvFrom(from, func() { + c.Act(from, func() { if c.readCallback != nil { c.readCallback(msg) } else { @@ -195,7 +195,7 @@ func (c *Conn) recvMsg(from phony.Actor, msg []byte) { func (c *Conn) ReadNoCopy() ([]byte, error) { var cancel util.Cancellation var doCancel bool - <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) }) + phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) }) if doCancel { defer cancel.Cancel(nil) } @@ -234,7 +234,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error { if len(msg.Message) > int(c.mtu) { return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)} } - c.session.RecvFrom(c, func() { + c.session.Act(c, func() { // Send the packet c.session._send(msg) // Session keep-alive, while we wait for the crypto workers from send @@ -258,7 +258,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error { // This is used internaly by WriteNoCopy and Write. // If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage. func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) { - c.RecvFrom(from, func() { + c.Act(from, func() { callback(c._write(msg)) }) } @@ -268,7 +268,7 @@ func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(err func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { var cancel util.Cancellation var doCancel bool - <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) + phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) var err error select { case <-cancel.Finished(): @@ -299,7 +299,7 @@ func (c *Conn) Write(b []byte) (int, error) { } func (c *Conn) Close() (err error) { - <-c.SyncExec(func() { + phony.Block(c, func() { if c.session != nil { // Close the session, if it hasn't been closed already if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { @@ -319,7 +319,7 @@ func (c *Conn) LocalAddr() crypto.NodeID { func (c *Conn) RemoteAddr() crypto.NodeID { // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... var n crypto.NodeID - <-c.SyncExec(func() { n = *c.nodeID }) + phony.Block(c, func() { n = *c.nodeID }) return n } @@ -331,12 +331,12 @@ func (c *Conn) SetDeadline(t time.Time) error { func (c *Conn) SetReadDeadline(t time.Time) error { // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... - <-c.SyncExec(func() { c.readDeadline = &t }) + phony.Block(c, func() { c.readDeadline = &t }) return nil } func (c *Conn) SetWriteDeadline(t time.Time) error { // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... - <-c.SyncExec(func() { c.writeDeadline = &t }) + phony.Block(c, func() { c.writeDeadline = &t }) return nil } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 1cc7eae4..9d2199d0 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -29,6 +29,8 @@ import "github.com/yggdrasil-network/yggdrasil-go/src/config" import "github.com/yggdrasil-network/yggdrasil-go/src/crypto" import "github.com/yggdrasil-network/yggdrasil-go/src/defaults" +import "github.com/Arceliar/phony" + // Start the profiler in debug builds, if the required environment variable is set. func init() { envVarName := "PPROFLISTEN" @@ -572,14 +574,14 @@ func DEBUG_simLinkPeers(p, q *peer) { continue case packet := <-send: packets = append(packets, packet) - <-source.core.switchTable.SyncExec(func() { + phony.Block(&source.core.switchTable, func() { source.core.switchTable._idleIn(source.port) }) continue default: } if len(packets) > 0 { - <-dest.SyncExec(func() { dest._handlePacket(packets[0]) }) + phony.Block(dest, func() { dest._handlePacket(packets[0]) }) packets = packets[1:] continue } @@ -588,7 +590,7 @@ func DEBUG_simLinkPeers(p, q *peer) { packets = append(packets, packet) case packet := <-send: packets = append(packets, packet) - <-source.core.switchTable.SyncExec(func() { + phony.Block(&source.core.switchTable, func() { source.core.switchTable._idleIn(source.port) }) } @@ -597,10 +599,10 @@ func DEBUG_simLinkPeers(p, q *peer) { } goWorkers(p, q) goWorkers(q, p) - <-p.core.switchTable.SyncExec(func() { + phony.Block(&p.core.switchTable, func() { p.core.switchTable._idleIn(p.port) }) - <-q.core.switchTable.SyncExec(func() { + phony.Block(&q.core.switchTable, func() { q.core.switchTable._idleIn(q.port) }) } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 6b7f446f..e65055bc 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -227,7 +227,7 @@ func (intf *linkInterface) handler() error { strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start things go intf.peer.start() - intf.reader.RecvFrom(nil, intf.reader._read) + intf.reader.Act(nil, intf.reader._read) // Wait for the reader to finish err = <-intf.reader.err if err != nil { @@ -251,7 +251,7 @@ const ( // notify the intf that we're currently sending func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { - intf.RecvFrom(&intf.writer, func() { + intf.Act(&intf.writer, func() { if !isLinkTraffic { intf.inSwitch = false } @@ -270,7 +270,7 @@ func (intf *linkInterface) _cancelStallTimer() { // called by an AfterFunc if we appear to have timed out func (intf *linkInterface) notifyBlockedSend() { - intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc + intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. intf.link.core.switchTable.blockPeer(intf.peer.port) @@ -280,7 +280,7 @@ func (intf *linkInterface) notifyBlockedSend() { // notify the intf that we've finished sending, returning the peer to the switch func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { - intf.RecvFrom(&intf.writer, func() { + intf.Act(&intf.writer, func() { intf.sendTimer.Stop() intf.sendTimer = nil if !isLinkTraffic { @@ -296,7 +296,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { func (intf *linkInterface) _notifySwitch() { if !intf.inSwitch && !intf.stalled { intf.inSwitch = true - intf.link.core.switchTable.RecvFrom(intf, func() { + intf.link.core.switchTable.Act(intf, func() { intf.link.core.switchTable._idleIn(intf.peer.port) }) } @@ -304,7 +304,7 @@ func (intf *linkInterface) _notifySwitch() { // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds func (intf *linkInterface) notifyStalled() { - intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc + intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil @@ -316,7 +316,7 @@ func (intf *linkInterface) notifyStalled() { // reset the close timer func (intf *linkInterface) notifyReading() { - intf.RecvFrom(&intf.reader, func() { + intf.Act(&intf.reader, func() { if intf.closeTimer != nil { intf.closeTimer.Stop() } @@ -326,7 +326,7 @@ func (intf *linkInterface) notifyReading() { // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic func (intf *linkInterface) notifyRead(size int) { - intf.RecvFrom(&intf.reader, func() { + intf.Act(&intf.reader, func() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil @@ -341,7 +341,7 @@ func (intf *linkInterface) notifyRead(size int) { // We need to send keep-alive traffic now func (intf *linkInterface) notifyDoKeepAlive() { - intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc + intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil @@ -358,7 +358,7 @@ type linkWriter struct { } func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { - w.RecvFrom(from, func() { + w.Act(from, func() { var size int for _, bs := range bss { size += len(bs) @@ -396,5 +396,5 @@ func (r *linkReader) _read() { return } // Now try to read again - r.RecvFrom(nil, r._read) + r.Act(nil, r._read) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4cf0068b..d33c413a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -178,7 +178,7 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) { if p.port == 0 { continue } - p.RecvFrom(from, p._sendSwitchMsg) + p.Act(from, p._sendSwitchMsg) } } @@ -187,7 +187,7 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) { func (p *peer) start() { var updateDHT func() updateDHT = func() { - <-p.SyncExec(func() { + phony.Block(p, func() { select { case <-p.done: default: @@ -198,7 +198,7 @@ func (p *peer) start() { } updateDHT() // Just for good measure, immediately send a switch message to this peer when we start - <-p.SyncExec(p._sendSwitchMsg) + p.Act(nil, p._sendSwitchMsg) } func (p *peer) _updateDHT() { @@ -208,7 +208,7 @@ func (p *peer) _updateDHT() { } func (p *peer) handlePacketFrom(from phony.Actor, packet []byte) { - p.RecvFrom(from, func() { + p.Act(from, func() { p._handlePacket(packet) }) } @@ -246,7 +246,7 @@ func (p *peer) _handleTraffic(packet []byte) { } func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { - p.RecvFrom(from, func() { + p.Act(from, func() { p._sendPackets(packets) }) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index fec55b7c..bbc87c51 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -77,7 +77,7 @@ func (r *router) reconfigure(e chan error) { defer close(e) var errs []error // Reconfigure the router - <-r.SyncExec(func() { + phony.Block(r, func() { current := r.core.config.GetCurrent() err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) if err != nil { @@ -98,7 +98,7 @@ func (r *router) start() error { // In practice, the switch will call this with 1 packet func (r *router) handlePackets(from phony.Actor, packets [][]byte) { - r.RecvFrom(from, func() { + r.Act(from, func() { for _, packet := range packets { r._handlePacket(packet) } @@ -107,14 +107,14 @@ func (r *router) handlePackets(from phony.Actor, packets [][]byte) { // Insert a peer info into the dht, TODO? make the dht a separate actor func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { - r.RecvFrom(from, func() { + r.Act(from, func() { r.dht.insertPeer(info) }) } // Reset sessions and DHT after the switch sees our coords change func (r *router) reset(from phony.Actor) { - r.RecvFrom(from, func() { + r.Act(from, func() { r.sessions.reset() r.dht.reset() }) @@ -123,7 +123,7 @@ func (r *router) reset(from phony.Actor) { // TODO remove reconfigure so this is just a ticker loop // and then find something better than a ticker loop to schedule things... func (r *router) doMaintenance() { - <-r.SyncExec(func() { + phony.Block(r, func() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.dht.doMaintenance() @@ -252,7 +252,7 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { r.nodeinfo.handleNodeInfo(&req) } -// TODO remove this, have things either be actors that send message or else call SyncExec directly +// TODO remove this, have things either be actors that send message or else call Block directly func (r *router) doAdmin(f func()) { - <-r.SyncExec(f) + phony.Block(r, f) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 9cf5f06e..ca357cc2 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -153,7 +153,7 @@ func (sinfo *searchInfo) continueSearch() { // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later time.AfterFunc(search_RETRY_TIME, func() { - sinfo.searches.router.RecvFrom(nil, func() { + sinfo.searches.router.Act(nil, func() { // FIXME this keeps the search alive forever if not for the searches map, fix that newSearchInfo := sinfo.searches.searches[sinfo.dest] if newSearchInfo != sinfo { diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 0fc7ec80..06780e65 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -80,7 +80,7 @@ func (sinfo *sessionInfo) reconfigure(e chan error) { // TODO remove this, call SyncExec directly func (sinfo *sessionInfo) doFunc(f func()) { - <-sinfo.SyncExec(f) + phony.Block(sinfo, f) } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -164,7 +164,7 @@ func (ss *sessions) init(r *router) { func (ss *sessions) reconfigure(e chan error) { defer close(e) responses := make(map[crypto.Handle]chan error) - <-ss.router.SyncExec(func() { + phony.Block(ss.router, func() { for index, session := range ss.sinfos { responses[index] = make(chan error) go session.reconfigure(responses[index]) @@ -287,7 +287,7 @@ func (ss *sessions) cleanup() { } func (sinfo *sessionInfo) doRemove() { - sinfo.sessions.router.RecvFrom(nil, func() { + sinfo.sessions.router.Act(nil, func() { sinfo.sessions.removeSession(sinfo) }) } @@ -341,7 +341,7 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, // Sends a session ping by calling sendPingPong in ping mode. func (sinfo *sessionInfo) ping(from phony.Actor) { - sinfo.RecvFrom(from, func() { + sinfo.Act(from, func() { sinfo._sendPingPong(false) }) } @@ -362,14 +362,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { } packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first - sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) + sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) if sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } } func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { - sinfo.RecvFrom(from, func() { + sinfo.Act(from, func() { sinfo.conn = conn sinfo.conn.setMTU(sinfo, sinfo._getMTU()) }) @@ -404,7 +404,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { ss.listenerMutex.Unlock() } if sinfo != nil { - sinfo.RecvFrom(ss.router, func() { + sinfo.Act(ss.router, func() { // Update the session if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ return @@ -472,7 +472,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.RecvFrom(ss.router, func() { + sinfo.Act(ss.router, func() { sinfo.reset = true }) } @@ -488,7 +488,7 @@ type FlowKeyMessage struct { } func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) { - sinfo.RecvFrom(from, func() { + sinfo.Act(from, func() { sinfo._recvPacket(packet) }) } @@ -562,7 +562,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { util.PutBytes(p.Payload) // Send the packet // TODO replace this with a send to the peer struct if that becomes an actor - sinfo.sessions.router.RecvFrom(sinfo, func() { + sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) } @@ -574,7 +574,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { } func (sinfo *sessionInfo) checkCallbacks() { - sinfo.RecvFrom(nil, func() { + sinfo.Act(nil, func() { if len(sinfo.callbacks) > 0 { select { case callback := <-sinfo.callbacks[0]: diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index b30e59d5..93d79d29 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -192,7 +192,7 @@ func (t *switchTable) init(core *Core) { t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[crypto.SigPubKey]int64) - <-t.SyncExec(func() { + phony.Block(t, func() { t.queues.totalMaxSize = SwitchQueueTotalMinSize t.queues.bufs = make(map[string]switch_buffer) t.idle = make(map[switchPort]time.Time) @@ -827,7 +827,7 @@ func (t *switchTable) _handleIdle(port switchPort) bool { } func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { - t.RecvFrom(from, func() { + t.Act(from, func() { t._packetIn(bytes) }) } @@ -870,5 +870,5 @@ func (t *switchTable) _idleIn(port switchPort) { // Passed a function to call. // This will send the function to t.admin and block until it finishes. func (t *switchTable) doAdmin(f func()) { - <-t.SyncExec(f) + phony.Block(t, f) }