update to latest phony, adjust interface use accordingly

This commit is contained in:
Arceliar 2019-08-27 19:43:54 -05:00
parent 4d9c6342a7
commit 3845f81357
14 changed files with 80 additions and 76 deletions

2
go.mod
View File

@ -1,7 +1,7 @@
module github.com/yggdrasil-network/yggdrasil-go module github.com/yggdrasil-network/yggdrasil-go
require ( require (
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
github.com/hashicorp/go-syslog v1.0.0 github.com/hashicorp/go-syslog v1.0.0
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222

4
go.sum
View File

@ -1,5 +1,5 @@
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe h1:U5bediuXjZ1y6bByIXXraoE319yFp9kx1z8K6el7Ftc= github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44 h1:8EiuIp65v8aLkLc4LWxtn4NTH+P2LwDmrKKWdBzn9cI=
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI= github.com/Arceliar/phony v0.0.0-20190828002416-0337564e2c44/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE= github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=

View File

@ -27,7 +27,7 @@ type tunConn struct {
} }
func (s *tunConn) close() { func (s *tunConn) close() {
s.tun.RecvFrom(s, s._close_from_tun) s.tun.Act(s, s._close_from_tun)
} }
func (s *tunConn) _close_from_tun() { func (s *tunConn) _close_from_tun() {
@ -117,7 +117,7 @@ func (s *tunConn) _read(bs []byte) (err error) {
} }
func (s *tunConn) writeFrom(from phony.Actor, bs []byte) { func (s *tunConn) writeFrom(from phony.Actor, bs []byte) {
s.RecvFrom(from, func() { s.Act(from, func() {
s._write(bs) s._write(bs)
}) })
} }
@ -197,7 +197,7 @@ func (s *tunConn) _write(bs []byte) (err error) {
// No point in wasting resources to send back an error if there was none // No point in wasting resources to send back an error if there was none
return return
} }
s.RecvFrom(s.conn, func() { s.Act(s.conn, func() {
if e, eok := err.(yggdrasil.ConnError); !eok { if e, eok := err.(yggdrasil.ConnError); !eok {
if e.Closed() { if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)

View File

@ -19,7 +19,7 @@ type tunWriter struct {
} }
func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { func (w *tunWriter) writeFrom(from phony.Actor, b []byte) {
w.RecvFrom(from, func() { w.Act(from, func() {
w._write(b) w._write(b)
}) })
} }
@ -90,7 +90,7 @@ func (w *tunWriter) _write(b []byte) {
util.PutBytes(b) util.PutBytes(b)
} }
if err != nil { if err != nil {
w.tun.RecvFrom(w, func() { w.tun.Act(w, func() {
if !w.tun.isOpen { if !w.tun.isOpen {
w.tun.log.Errorln("TUN/TAP iface write error:", err) w.tun.log.Errorln("TUN/TAP iface write error:", err)
} }
@ -118,12 +118,12 @@ func (r *tunReader) _read() {
} }
if err == nil { if err == nil {
// Now read again // Now read again
r.RecvFrom(nil, r._read) r.Act(nil, r._read)
} }
} }
func (tun *TunAdapter) handlePacketFrom(from phony.Actor, packet []byte, err error) { func (tun *TunAdapter) handlePacketFrom(from phony.Actor, packet []byte, err error) {
tun.RecvFrom(from, func() { tun.Act(from, func() {
tun._handlePacket(packet, err) tun._handlePacket(packet, err)
}) })
} }
@ -248,7 +248,7 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
if !known { if !known {
go func() { go func() {
conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask) conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask)
tun.RecvFrom(nil, func() { tun.Act(nil, func() {
packets := tun.dials[*dstNodeID] packets := tun.dials[*dstNodeID]
delete(tun.dials, *dstNodeID) delete(tun.dials, *dstNodeID)
if err != nil { if err != nil {

View File

@ -125,7 +125,7 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener
// reader actor to handle packets on that interface. // reader actor to handle packets on that interface.
func (tun *TunAdapter) Start() error { func (tun *TunAdapter) Start() error {
var err error var err error
<-tun.SyncExec(func() { phony.Block(tun, func() {
err = tun._start() err = tun._start()
}) })
return err return err
@ -167,7 +167,7 @@ func (tun *TunAdapter) _start() error {
} }
}() }()
go tun.handler() go tun.handler()
tun.reader.RecvFrom(nil, tun.reader._read) // Start the reader tun.reader.Act(nil, tun.reader._read) // Start the reader
tun.icmpv6.Init(tun) tun.icmpv6.Init(tun)
if iftapmode { if iftapmode {
go tun.icmpv6.Solicit(tun.addr) go tun.icmpv6.Solicit(tun.addr)
@ -180,7 +180,7 @@ func (tun *TunAdapter) _start() error {
// read/write goroutines to handle packets on that interface. // read/write goroutines to handle packets on that interface.
func (tun *TunAdapter) Stop() error { func (tun *TunAdapter) Stop() error {
var err error var err error
<-tun.SyncExec(func() { phony.Block(tun, func() {
err = tun._stop() err = tun._stop()
}) })
return err return err
@ -233,7 +233,7 @@ func (tun *TunAdapter) handler() error {
tun.log.Errorln("TUN/TAP connection accept error:", err) tun.log.Errorln("TUN/TAP connection accept error:", err)
return err return err
} }
<-tun.SyncExec(func() { phony.Block(tun, func() {
if _, err := tun._wrap(conn); err != nil { if _, err := tun._wrap(conn); err != nil {
// Something went wrong when storing the connection, typically that // Something went wrong when storing the connection, typically that
// something already exists for this address or subnet // something already exists for this address or subnet
@ -273,11 +273,11 @@ func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
tun.subnetToConn[s.snet] = &s tun.subnetToConn[s.snet] = &s
// Set the read callback and start the timeout // Set the read callback and start the timeout
conn.SetReadCallback(func(bs []byte) { conn.SetReadCallback(func(bs []byte) {
s.RecvFrom(conn, func() { s.Act(conn, func() {
s._read(bs) s._read(bs)
}) })
}) })
s.RecvFrom(nil, s.stillAlive) s.Act(nil, s.stillAlive)
// Return // Return
return c, err return c, err
} }

View File

@ -11,6 +11,8 @@ import (
"github.com/gologme/log" "github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/Arceliar/phony"
) )
// Peer represents a single peer object. This contains information from the // Peer represents a single peer object. This contains information from the
@ -106,7 +108,7 @@ func (c *Core) GetPeers() []Peer {
for _, port := range ps { for _, port := range ps {
p := ports[port] p := ports[port]
var info Peer var info Peer
<-p.SyncExec(func() { phony.Block(p, func() {
info = Peer{ info = Peer{
Endpoint: p.intf.name, Endpoint: p.intf.name,
BytesSent: p.bytesSent, BytesSent: p.bytesSent,
@ -138,7 +140,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
} }
coords := elem.locator.getCoords() coords := elem.locator.getCoords()
var info SwitchPeer var info SwitchPeer
<-peer.SyncExec(func() { phony.Block(peer, func() {
info = SwitchPeer{ info = SwitchPeer{
Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...),
BytesSent: peer.bytesSent, BytesSent: peer.bytesSent,

View File

@ -80,12 +80,12 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
func (c *Conn) String() string { func (c *Conn) String() string {
var s string var s string
<-c.SyncExec(func() { s = fmt.Sprintf("conn=%p", c) }) phony.Block(c, func() { s = fmt.Sprintf("conn=%p", c) })
return s return s
} }
func (c *Conn) setMTU(from phony.Actor, mtu uint16) { func (c *Conn) setMTU(from phony.Actor, mtu uint16) {
c.RecvFrom(from, func() { c.mtu = mtu }) c.Act(from, func() { c.mtu = mtu })
} }
// This should never be called from the router goroutine, used in the dial functions // This should never be called from the router goroutine, used in the dial functions
@ -143,7 +143,7 @@ func (c *Conn) doSearch() {
sinfo.continueSearch() sinfo.continueSearch()
} }
} }
c.core.router.RecvFrom(c.session, routerWork) c.core.router.Act(c.session, routerWork)
} }
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
@ -159,7 +159,7 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool)
// SetReadCallback sets a callback which will be called whenever a packet is received. // SetReadCallback sets a callback which will be called whenever a packet is received.
func (c *Conn) SetReadCallback(callback func([]byte)) { func (c *Conn) SetReadCallback(callback func([]byte)) {
c.RecvFrom(nil, func() { c.Act(nil, func() {
c.readCallback = callback c.readCallback = callback
c._drainReadBuffer() c._drainReadBuffer()
}) })
@ -172,14 +172,14 @@ func (c *Conn) _drainReadBuffer() {
select { select {
case bs := <-c.readBuffer: case bs := <-c.readBuffer:
c.readCallback(bs) c.readCallback(bs)
c.RecvFrom(nil, c._drainReadBuffer) // In case there's more c.Act(nil, c._drainReadBuffer) // In case there's more
default: default:
} }
} }
// Called by the session to pass a new message to the Conn // Called by the session to pass a new message to the Conn
func (c *Conn) recvMsg(from phony.Actor, msg []byte) { func (c *Conn) recvMsg(from phony.Actor, msg []byte) {
c.RecvFrom(from, func() { c.Act(from, func() {
if c.readCallback != nil { if c.readCallback != nil {
c.readCallback(msg) c.readCallback(msg)
} else { } else {
@ -195,7 +195,7 @@ func (c *Conn) recvMsg(from phony.Actor, msg []byte) {
func (c *Conn) ReadNoCopy() ([]byte, error) { func (c *Conn) ReadNoCopy() ([]byte, error) {
var cancel util.Cancellation var cancel util.Cancellation
var doCancel bool var doCancel bool
<-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) }) phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) })
if doCancel { if doCancel {
defer cancel.Cancel(nil) defer cancel.Cancel(nil)
} }
@ -234,7 +234,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
if len(msg.Message) > int(c.mtu) { if len(msg.Message) > int(c.mtu) {
return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)} return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)}
} }
c.session.RecvFrom(c, func() { c.session.Act(c, func() {
// Send the packet // Send the packet
c.session._send(msg) c.session._send(msg)
// Session keep-alive, while we wait for the crypto workers from send // Session keep-alive, while we wait for the crypto workers from send
@ -258,7 +258,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
// This is used internaly by WriteNoCopy and Write. // This is used internaly by WriteNoCopy and Write.
// If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage. // If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage.
func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) { func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) {
c.RecvFrom(from, func() { c.Act(from, func() {
callback(c._write(msg)) callback(c._write(msg))
}) })
} }
@ -268,7 +268,7 @@ func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(err
func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
var cancel util.Cancellation var cancel util.Cancellation
var doCancel bool var doCancel bool
<-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) phony.Block(c, func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) })
var err error var err error
select { select {
case <-cancel.Finished(): case <-cancel.Finished():
@ -299,7 +299,7 @@ func (c *Conn) Write(b []byte) (int, error) {
} }
func (c *Conn) Close() (err error) { func (c *Conn) Close() (err error) {
<-c.SyncExec(func() { phony.Block(c, func() {
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
@ -319,7 +319,7 @@ func (c *Conn) LocalAddr() crypto.NodeID {
func (c *Conn) RemoteAddr() crypto.NodeID { func (c *Conn) RemoteAddr() crypto.NodeID {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
var n crypto.NodeID var n crypto.NodeID
<-c.SyncExec(func() { n = *c.nodeID }) phony.Block(c, func() { n = *c.nodeID })
return n return n
} }
@ -331,12 +331,12 @@ func (c *Conn) SetDeadline(t time.Time) error {
func (c *Conn) SetReadDeadline(t time.Time) error { func (c *Conn) SetReadDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
<-c.SyncExec(func() { c.readDeadline = &t }) phony.Block(c, func() { c.readDeadline = &t })
return nil return nil
} }
func (c *Conn) SetWriteDeadline(t time.Time) error { func (c *Conn) SetWriteDeadline(t time.Time) error {
// TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors...
<-c.SyncExec(func() { c.writeDeadline = &t }) phony.Block(c, func() { c.writeDeadline = &t })
return nil return nil
} }

View File

@ -29,6 +29,8 @@ import "github.com/yggdrasil-network/yggdrasil-go/src/config"
import "github.com/yggdrasil-network/yggdrasil-go/src/crypto" import "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
import "github.com/yggdrasil-network/yggdrasil-go/src/defaults" import "github.com/yggdrasil-network/yggdrasil-go/src/defaults"
import "github.com/Arceliar/phony"
// Start the profiler in debug builds, if the required environment variable is set. // Start the profiler in debug builds, if the required environment variable is set.
func init() { func init() {
envVarName := "PPROFLISTEN" envVarName := "PPROFLISTEN"
@ -572,14 +574,14 @@ func DEBUG_simLinkPeers(p, q *peer) {
continue continue
case packet := <-send: case packet := <-send:
packets = append(packets, packet) packets = append(packets, packet)
<-source.core.switchTable.SyncExec(func() { phony.Block(&source.core.switchTable, func() {
source.core.switchTable._idleIn(source.port) source.core.switchTable._idleIn(source.port)
}) })
continue continue
default: default:
} }
if len(packets) > 0 { if len(packets) > 0 {
<-dest.SyncExec(func() { dest._handlePacket(packets[0]) }) phony.Block(dest, func() { dest._handlePacket(packets[0]) })
packets = packets[1:] packets = packets[1:]
continue continue
} }
@ -588,7 +590,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
packets = append(packets, packet) packets = append(packets, packet)
case packet := <-send: case packet := <-send:
packets = append(packets, packet) packets = append(packets, packet)
<-source.core.switchTable.SyncExec(func() { phony.Block(&source.core.switchTable, func() {
source.core.switchTable._idleIn(source.port) source.core.switchTable._idleIn(source.port)
}) })
} }
@ -597,10 +599,10 @@ func DEBUG_simLinkPeers(p, q *peer) {
} }
goWorkers(p, q) goWorkers(p, q)
goWorkers(q, p) goWorkers(q, p)
<-p.core.switchTable.SyncExec(func() { phony.Block(&p.core.switchTable, func() {
p.core.switchTable._idleIn(p.port) p.core.switchTable._idleIn(p.port)
}) })
<-q.core.switchTable.SyncExec(func() { phony.Block(&q.core.switchTable, func() {
q.core.switchTable._idleIn(q.port) q.core.switchTable._idleIn(q.port)
}) })
} }

View File

@ -227,7 +227,7 @@ func (intf *linkInterface) handler() error {
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start things // Start things
go intf.peer.start() go intf.peer.start()
intf.reader.RecvFrom(nil, intf.reader._read) intf.reader.Act(nil, intf.reader._read)
// Wait for the reader to finish // Wait for the reader to finish
err = <-intf.reader.err err = <-intf.reader.err
if err != nil { if err != nil {
@ -251,7 +251,7 @@ const (
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
intf.RecvFrom(&intf.writer, func() { intf.Act(&intf.writer, func() {
if !isLinkTraffic { if !isLinkTraffic {
intf.inSwitch = false intf.inSwitch = false
} }
@ -270,7 +270,7 @@ func (intf *linkInterface) _cancelStallTimer() {
// called by an AfterFunc if we appear to have timed out // called by an AfterFunc if we appear to have timed out
func (intf *linkInterface) notifyBlockedSend() { func (intf *linkInterface) notifyBlockedSend() {
intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.sendTimer != nil { if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.link.core.switchTable.blockPeer(intf.peer.port)
@ -280,7 +280,7 @@ func (intf *linkInterface) notifyBlockedSend() {
// notify the intf that we've finished sending, returning the peer to the switch // notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
intf.RecvFrom(&intf.writer, func() { intf.Act(&intf.writer, func() {
intf.sendTimer.Stop() intf.sendTimer.Stop()
intf.sendTimer = nil intf.sendTimer = nil
if !isLinkTraffic { if !isLinkTraffic {
@ -296,7 +296,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
func (intf *linkInterface) _notifySwitch() { func (intf *linkInterface) _notifySwitch() {
if !intf.inSwitch && !intf.stalled { if !intf.inSwitch && !intf.stalled {
intf.inSwitch = true intf.inSwitch = true
intf.link.core.switchTable.RecvFrom(intf, func() { intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._idleIn(intf.peer.port) intf.link.core.switchTable._idleIn(intf.peer.port)
}) })
} }
@ -304,7 +304,7 @@ func (intf *linkInterface) _notifySwitch() {
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *linkInterface) notifyStalled() {
intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
@ -316,7 +316,7 @@ func (intf *linkInterface) notifyStalled() {
// reset the close timer // reset the close timer
func (intf *linkInterface) notifyReading() { func (intf *linkInterface) notifyReading() {
intf.RecvFrom(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.closeTimer != nil { if intf.closeTimer != nil {
intf.closeTimer.Stop() intf.closeTimer.Stop()
} }
@ -326,7 +326,7 @@ func (intf *linkInterface) notifyReading() {
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
func (intf *linkInterface) notifyRead(size int) { func (intf *linkInterface) notifyRead(size int) {
intf.RecvFrom(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
@ -341,7 +341,7 @@ func (intf *linkInterface) notifyRead(size int) {
// We need to send keep-alive traffic now // We need to send keep-alive traffic now
func (intf *linkInterface) notifyDoKeepAlive() { func (intf *linkInterface) notifyDoKeepAlive() {
intf.RecvFrom(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
@ -358,7 +358,7 @@ type linkWriter struct {
} }
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
w.RecvFrom(from, func() { w.Act(from, func() {
var size int var size int
for _, bs := range bss { for _, bs := range bss {
size += len(bs) size += len(bs)
@ -396,5 +396,5 @@ func (r *linkReader) _read() {
return return
} }
// Now try to read again // Now try to read again
r.RecvFrom(nil, r._read) r.Act(nil, r._read)
} }

View File

@ -178,7 +178,7 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) {
if p.port == 0 { if p.port == 0 {
continue continue
} }
p.RecvFrom(from, p._sendSwitchMsg) p.Act(from, p._sendSwitchMsg)
} }
} }
@ -187,7 +187,7 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) {
func (p *peer) start() { func (p *peer) start() {
var updateDHT func() var updateDHT func()
updateDHT = func() { updateDHT = func() {
<-p.SyncExec(func() { phony.Block(p, func() {
select { select {
case <-p.done: case <-p.done:
default: default:
@ -198,7 +198,7 @@ func (p *peer) start() {
} }
updateDHT() updateDHT()
// Just for good measure, immediately send a switch message to this peer when we start // Just for good measure, immediately send a switch message to this peer when we start
<-p.SyncExec(p._sendSwitchMsg) p.Act(nil, p._sendSwitchMsg)
} }
func (p *peer) _updateDHT() { func (p *peer) _updateDHT() {
@ -208,7 +208,7 @@ func (p *peer) _updateDHT() {
} }
func (p *peer) handlePacketFrom(from phony.Actor, packet []byte) { func (p *peer) handlePacketFrom(from phony.Actor, packet []byte) {
p.RecvFrom(from, func() { p.Act(from, func() {
p._handlePacket(packet) p._handlePacket(packet)
}) })
} }
@ -246,7 +246,7 @@ func (p *peer) _handleTraffic(packet []byte) {
} }
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
p.RecvFrom(from, func() { p.Act(from, func() {
p._sendPackets(packets) p._sendPackets(packets)
}) })
} }

View File

@ -77,7 +77,7 @@ func (r *router) reconfigure(e chan error) {
defer close(e) defer close(e)
var errs []error var errs []error
// Reconfigure the router // Reconfigure the router
<-r.SyncExec(func() { phony.Block(r, func() {
current := r.core.config.GetCurrent() current := r.core.config.GetCurrent()
err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy)
if err != nil { if err != nil {
@ -98,7 +98,7 @@ func (r *router) start() error {
// In practice, the switch will call this with 1 packet // In practice, the switch will call this with 1 packet
func (r *router) handlePackets(from phony.Actor, packets [][]byte) { func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
r.RecvFrom(from, func() { r.Act(from, func() {
for _, packet := range packets { for _, packet := range packets {
r._handlePacket(packet) r._handlePacket(packet)
} }
@ -107,14 +107,14 @@ func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
// Insert a peer info into the dht, TODO? make the dht a separate actor // Insert a peer info into the dht, TODO? make the dht a separate actor
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
r.RecvFrom(from, func() { r.Act(from, func() {
r.dht.insertPeer(info) r.dht.insertPeer(info)
}) })
} }
// Reset sessions and DHT after the switch sees our coords change // Reset sessions and DHT after the switch sees our coords change
func (r *router) reset(from phony.Actor) { func (r *router) reset(from phony.Actor) {
r.RecvFrom(from, func() { r.Act(from, func() {
r.sessions.reset() r.sessions.reset()
r.dht.reset() r.dht.reset()
}) })
@ -123,7 +123,7 @@ func (r *router) reset(from phony.Actor) {
// TODO remove reconfigure so this is just a ticker loop // TODO remove reconfigure so this is just a ticker loop
// and then find something better than a ticker loop to schedule things... // and then find something better than a ticker loop to schedule things...
func (r *router) doMaintenance() { func (r *router) doMaintenance() {
<-r.SyncExec(func() { phony.Block(r, func() {
// Any periodic maintenance stuff goes here // Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance()
r.dht.doMaintenance() r.dht.doMaintenance()
@ -252,7 +252,7 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
r.nodeinfo.handleNodeInfo(&req) r.nodeinfo.handleNodeInfo(&req)
} }
// TODO remove this, have things either be actors that send message or else call SyncExec directly // TODO remove this, have things either be actors that send message or else call Block directly
func (r *router) doAdmin(f func()) { func (r *router) doAdmin(f func()) {
<-r.SyncExec(f) phony.Block(r, f)
} }

View File

@ -153,7 +153,7 @@ func (sinfo *searchInfo) continueSearch() {
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later // Any that die aren't restarted, but a new one will start later
time.AfterFunc(search_RETRY_TIME, func() { time.AfterFunc(search_RETRY_TIME, func() {
sinfo.searches.router.RecvFrom(nil, func() { sinfo.searches.router.Act(nil, func() {
// FIXME this keeps the search alive forever if not for the searches map, fix that // FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.searches.searches[sinfo.dest] newSearchInfo := sinfo.searches.searches[sinfo.dest]
if newSearchInfo != sinfo { if newSearchInfo != sinfo {

View File

@ -80,7 +80,7 @@ func (sinfo *sessionInfo) reconfigure(e chan error) {
// TODO remove this, call SyncExec directly // TODO remove this, call SyncExec directly
func (sinfo *sessionInfo) doFunc(f func()) { func (sinfo *sessionInfo) doFunc(f func()) {
<-sinfo.SyncExec(f) phony.Block(sinfo, f)
} }
// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -164,7 +164,7 @@ func (ss *sessions) init(r *router) {
func (ss *sessions) reconfigure(e chan error) { func (ss *sessions) reconfigure(e chan error) {
defer close(e) defer close(e)
responses := make(map[crypto.Handle]chan error) responses := make(map[crypto.Handle]chan error)
<-ss.router.SyncExec(func() { phony.Block(ss.router, func() {
for index, session := range ss.sinfos { for index, session := range ss.sinfos {
responses[index] = make(chan error) responses[index] = make(chan error)
go session.reconfigure(responses[index]) go session.reconfigure(responses[index])
@ -287,7 +287,7 @@ func (ss *sessions) cleanup() {
} }
func (sinfo *sessionInfo) doRemove() { func (sinfo *sessionInfo) doRemove() {
sinfo.sessions.router.RecvFrom(nil, func() { sinfo.sessions.router.Act(nil, func() {
sinfo.sessions.removeSession(sinfo) sinfo.sessions.removeSession(sinfo)
}) })
} }
@ -341,7 +341,7 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey,
// Sends a session ping by calling sendPingPong in ping mode. // Sends a session ping by calling sendPingPong in ping mode.
func (sinfo *sessionInfo) ping(from phony.Actor) { func (sinfo *sessionInfo) ping(from phony.Actor) {
sinfo.RecvFrom(from, func() { sinfo.Act(from, func() {
sinfo._sendPingPong(false) sinfo._sendPingPong(false)
}) })
} }
@ -362,14 +362,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
} }
packet := p.encode() packet := p.encode()
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.out(packet) }) sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) })
if sinfo.pingTime.Before(sinfo.time) { if sinfo.pingTime.Before(sinfo.time) {
sinfo.pingTime = time.Now() sinfo.pingTime = time.Now()
} }
} }
func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) {
sinfo.RecvFrom(from, func() { sinfo.Act(from, func() {
sinfo.conn = conn sinfo.conn = conn
sinfo.conn.setMTU(sinfo, sinfo._getMTU()) sinfo.conn.setMTU(sinfo, sinfo._getMTU())
}) })
@ -404,7 +404,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
ss.listenerMutex.Unlock() ss.listenerMutex.Unlock()
} }
if sinfo != nil { if sinfo != nil {
sinfo.RecvFrom(ss.router, func() { sinfo.Act(ss.router, func() {
// Update the session // Update the session
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
return return
@ -472,7 +472,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) {
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) reset() { func (ss *sessions) reset() {
for _, sinfo := range ss.sinfos { for _, sinfo := range ss.sinfos {
sinfo.RecvFrom(ss.router, func() { sinfo.Act(ss.router, func() {
sinfo.reset = true sinfo.reset = true
}) })
} }
@ -488,7 +488,7 @@ type FlowKeyMessage struct {
} }
func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) { func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) {
sinfo.RecvFrom(from, func() { sinfo.Act(from, func() {
sinfo._recvPacket(packet) sinfo._recvPacket(packet)
}) })
} }
@ -562,7 +562,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
util.PutBytes(p.Payload) util.PutBytes(p.Payload)
// Send the packet // Send the packet
// TODO replace this with a send to the peer struct if that becomes an actor // TODO replace this with a send to the peer struct if that becomes an actor
sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.Act(sinfo, func() {
sinfo.sessions.router.out(packet) sinfo.sessions.router.out(packet)
}) })
} }
@ -574,7 +574,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
} }
func (sinfo *sessionInfo) checkCallbacks() { func (sinfo *sessionInfo) checkCallbacks() {
sinfo.RecvFrom(nil, func() { sinfo.Act(nil, func() {
if len(sinfo.callbacks) > 0 { if len(sinfo.callbacks) > 0 {
select { select {
case callback := <-sinfo.callbacks[0]: case callback := <-sinfo.callbacks[0]:

View File

@ -192,7 +192,7 @@ func (t *switchTable) init(core *Core) {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{}) t.table.Store(lookupTable{})
t.drop = make(map[crypto.SigPubKey]int64) t.drop = make(map[crypto.SigPubKey]int64)
<-t.SyncExec(func() { phony.Block(t, func() {
t.queues.totalMaxSize = SwitchQueueTotalMinSize t.queues.totalMaxSize = SwitchQueueTotalMinSize
t.queues.bufs = make(map[string]switch_buffer) t.queues.bufs = make(map[string]switch_buffer)
t.idle = make(map[switchPort]time.Time) t.idle = make(map[switchPort]time.Time)
@ -827,7 +827,7 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
} }
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
t.RecvFrom(from, func() { t.Act(from, func() {
t._packetIn(bytes) t._packetIn(bytes)
}) })
} }
@ -870,5 +870,5 @@ func (t *switchTable) _idleIn(port switchPort) {
// Passed a function to call. // Passed a function to call.
// This will send the function to t.admin and block until it finishes. // This will send the function to t.admin and block until it finishes.
func (t *switchTable) doAdmin(f func()) { func (t *switchTable) doAdmin(f func()) {
<-t.SyncExec(f) phony.Block(t, f)
} }