mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-12-22 16:07:31 +00:00
upgrade phony dependency and switch to its new interface
This commit is contained in:
parent
cff1366146
commit
aa30c6cc98
2
go.mod
2
go.mod
@ -1,7 +1,7 @@
|
|||||||
module github.com/yggdrasil-network/yggdrasil-go
|
module github.com/yggdrasil-network/yggdrasil-go
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e
|
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe
|
||||||
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
|
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
|
||||||
github.com/hashicorp/go-syslog v1.0.0
|
github.com/hashicorp/go-syslog v1.0.0
|
||||||
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222
|
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222
|
||||||
|
4
go.sum
4
go.sum
@ -1,5 +1,5 @@
|
|||||||
github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e h1:qsNZzfxSvlSE4JZ3OpmDmAeqCRpOO3RI9eJ7U6z23Gk=
|
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe h1:U5bediuXjZ1y6bByIXXraoE319yFp9kx1z8K6el7Ftc=
|
||||||
github.com/Arceliar/phony v0.0.0-20190825032731-f8ba56f9093e/go.mod h1:+/sVcxsqK1Sjm3Vd+yCfMAohJOfTRyNh24apkxhqU3Q=
|
github.com/Arceliar/phony v0.0.0-20190825152505-180ac75690fe/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI=
|
||||||
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
|
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
|
||||||
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
|
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
|
||||||
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
|
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
|
||||||
|
@ -17,7 +17,7 @@ import (
|
|||||||
const tunConnTimeout = 2 * time.Minute
|
const tunConnTimeout = 2 * time.Minute
|
||||||
|
|
||||||
type tunConn struct {
|
type tunConn struct {
|
||||||
phony.Actor
|
phony.Inbox
|
||||||
tun *TunAdapter
|
tun *TunAdapter
|
||||||
conn *yggdrasil.Conn
|
conn *yggdrasil.Conn
|
||||||
addr address.Address
|
addr address.Address
|
||||||
@ -198,7 +198,7 @@ func (s *tunConn) _write(bs []byte) (err error) {
|
|||||||
// No point in wasting resources to send back an error if there was none
|
// No point in wasting resources to send back an error if there was none
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.EnqueueFrom(s.conn, func() {
|
s.RecvFrom(s.conn, func() {
|
||||||
if e, eok := err.(yggdrasil.ConnError); !eok {
|
if e, eok := err.(yggdrasil.ConnError); !eok {
|
||||||
if e.Closed() {
|
if e.Closed() {
|
||||||
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
|
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
|
||||||
|
@ -257,7 +257,7 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
|
|||||||
tun.subnetToConn[s.snet] = &s
|
tun.subnetToConn[s.snet] = &s
|
||||||
// Set the read callback and start the timeout goroutine
|
// Set the read callback and start the timeout goroutine
|
||||||
conn.SetReadCallback(func(bs []byte) {
|
conn.SetReadCallback(func(bs []byte) {
|
||||||
s.EnqueueFrom(conn, func() {
|
s.RecvFrom(conn, func() {
|
||||||
s._read(bs)
|
s._read(bs)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -54,7 +54,7 @@ func (e *ConnError) Closed() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
phony.Actor
|
phony.Inbox
|
||||||
core *Core
|
core *Core
|
||||||
readDeadline *time.Time
|
readDeadline *time.Time
|
||||||
writeDeadline *time.Time
|
writeDeadline *time.Time
|
||||||
@ -84,8 +84,8 @@ func (c *Conn) String() string {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) setMTU(from phony.IActor, mtu uint16) {
|
func (c *Conn) setMTU(from phony.Actor, mtu uint16) {
|
||||||
c.EnqueueFrom(from, func() { c.mtu = mtu })
|
c.RecvFrom(from, func() { c.mtu = mtu })
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should never be called from the router goroutine, used in the dial functions
|
// This should never be called from the router goroutine, used in the dial functions
|
||||||
@ -143,7 +143,7 @@ func (c *Conn) doSearch() {
|
|||||||
sinfo.continueSearch()
|
sinfo.continueSearch()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.core.router.EnqueueFrom(c.session, routerWork)
|
c.core.router.RecvFrom(c.session, routerWork)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
|
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
|
||||||
@ -159,7 +159,7 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool)
|
|||||||
|
|
||||||
// SetReadCallback sets a callback which will be called whenever a packet is received.
|
// SetReadCallback sets a callback which will be called whenever a packet is received.
|
||||||
func (c *Conn) SetReadCallback(callback func([]byte)) {
|
func (c *Conn) SetReadCallback(callback func([]byte)) {
|
||||||
c.EnqueueFrom(nil, func() {
|
c.RecvFrom(nil, func() {
|
||||||
c.readCallback = callback
|
c.readCallback = callback
|
||||||
c._drainReadBuffer()
|
c._drainReadBuffer()
|
||||||
})
|
})
|
||||||
@ -172,14 +172,14 @@ func (c *Conn) _drainReadBuffer() {
|
|||||||
select {
|
select {
|
||||||
case bs := <-c.readBuffer:
|
case bs := <-c.readBuffer:
|
||||||
c.readCallback(bs)
|
c.readCallback(bs)
|
||||||
c.EnqueueFrom(nil, c._drainReadBuffer) // In case there's more
|
c.RecvFrom(nil, c._drainReadBuffer) // In case there's more
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called by the session to pass a new message to the Conn
|
// Called by the session to pass a new message to the Conn
|
||||||
func (c *Conn) recvMsg(from phony.IActor, msg []byte) {
|
func (c *Conn) recvMsg(from phony.Actor, msg []byte) {
|
||||||
c.EnqueueFrom(from, func() {
|
c.RecvFrom(from, func() {
|
||||||
if c.readCallback != nil {
|
if c.readCallback != nil {
|
||||||
c.readCallback(msg)
|
c.readCallback(msg)
|
||||||
} else {
|
} else {
|
||||||
@ -234,7 +234,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
|
|||||||
if len(msg.Message) > int(c.mtu) {
|
if len(msg.Message) > int(c.mtu) {
|
||||||
return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)}
|
return ConnError{errors.New("packet too big"), true, false, false, int(c.mtu)}
|
||||||
}
|
}
|
||||||
c.session.EnqueueFrom(c, func() {
|
c.session.RecvFrom(c, func() {
|
||||||
// Send the packet
|
// Send the packet
|
||||||
c.session._send(msg)
|
c.session._send(msg)
|
||||||
// Session keep-alive, while we wait for the crypto workers from send
|
// Session keep-alive, while we wait for the crypto workers from send
|
||||||
@ -254,11 +254,11 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteFrom should be called by a phony.IActor, and tells the Conn to send a message.
|
// WriteFrom should be called by a phony.Actor, and tells the Conn to send a message.
|
||||||
// This is used internaly by WriteNoCopy and Write.
|
// This is used internaly by WriteNoCopy and Write.
|
||||||
// If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage.
|
// If the callback is called with a non-nil value, then it is safe to reuse the argument FlowKeyMessage.
|
||||||
func (c *Conn) WriteFrom(from phony.IActor, msg FlowKeyMessage, callback func(error)) {
|
func (c *Conn) WriteFrom(from phony.Actor, msg FlowKeyMessage, callback func(error)) {
|
||||||
c.EnqueueFrom(from, func() {
|
c.RecvFrom(from, func() {
|
||||||
callback(c._write(msg))
|
callback(c._write(msg))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -320,7 +320,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
isAlive = true
|
isAlive = true
|
||||||
if !isReady {
|
if !isReady {
|
||||||
// (Re-)enable in the switch
|
// (Re-)enable in the switch
|
||||||
intf.link.core.switchTable.EnqueueFrom(nil, func() {
|
intf.link.core.switchTable.RecvFrom(nil, func() {
|
||||||
intf.link.core.switchTable._idleIn(intf.peer.port)
|
intf.link.core.switchTable._idleIn(intf.peer.port)
|
||||||
})
|
})
|
||||||
isReady = true
|
isReady = true
|
||||||
@ -359,7 +359,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
isReady = false
|
isReady = false
|
||||||
} else {
|
} else {
|
||||||
// Keep enabled in the switch
|
// Keep enabled in the switch
|
||||||
intf.link.core.switchTable.EnqueueFrom(nil, func() {
|
intf.link.core.switchTable.RecvFrom(nil, func() {
|
||||||
intf.link.core.switchTable._idleIn(intf.peer.port)
|
intf.link.core.switchTable._idleIn(intf.peer.port)
|
||||||
})
|
})
|
||||||
isReady = true
|
isReady = true
|
||||||
@ -390,7 +390,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Run reader loop
|
// Run reader loop
|
||||||
var helper phony.Actor
|
var helper phony.Inbox
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
var helperFunc func()
|
var helperFunc func()
|
||||||
helperFunc = func() {
|
helperFunc = func() {
|
||||||
@ -417,10 +417,10 @@ func (intf *linkInterface) handler() error {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
// Now try to read again
|
// Now try to read again
|
||||||
helper.EnqueueFrom(nil, helperFunc)
|
helper.RecvFrom(nil, helperFunc)
|
||||||
}
|
}
|
||||||
// Start the read loop
|
// Start the read loop
|
||||||
helper.EnqueueFrom(nil, helperFunc)
|
helper.RecvFrom(nil, helperFunc)
|
||||||
<-done // Wait for the helper to exit
|
<-done // Wait for the helper to exit
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// Remember to set `err` to something useful before returning
|
// Remember to set `err` to something useful before returning
|
||||||
|
@ -96,7 +96,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
|
|||||||
|
|
||||||
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
|
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
|
||||||
type peer struct {
|
type peer struct {
|
||||||
phony.Actor
|
phony.Inbox
|
||||||
core *Core
|
core *Core
|
||||||
intf *linkInterface
|
intf *linkInterface
|
||||||
port switchPort
|
port switchPort
|
||||||
@ -175,13 +175,13 @@ func (ps *peers) removePeer(port switchPort) {
|
|||||||
|
|
||||||
// If called, sends a notification to each peer that they should send a new switch message.
|
// If called, sends a notification to each peer that they should send a new switch message.
|
||||||
// Mainly called by the switch after an update.
|
// Mainly called by the switch after an update.
|
||||||
func (ps *peers) sendSwitchMsgs(from phony.IActor) {
|
func (ps *peers) sendSwitchMsgs(from phony.Actor) {
|
||||||
ports := ps.getPorts()
|
ports := ps.getPorts()
|
||||||
for _, p := range ports {
|
for _, p := range ports {
|
||||||
if p.port == 0 {
|
if p.port == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p.EnqueueFrom(from, p._sendSwitchMsg)
|
p.RecvFrom(from, p._sendSwitchMsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,8 +207,8 @@ func (p *peer) _updateDHT() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) {
|
func (p *peer) handlePacketFrom(from phony.Actor, packet []byte) {
|
||||||
p.EnqueueFrom(from, func() {
|
p.RecvFrom(from, func() {
|
||||||
p._handlePacket(packet)
|
p._handlePacket(packet)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -245,8 +245,8 @@ func (p *peer) _handleTraffic(packet []byte) {
|
|||||||
p.core.switchTable.packetInFrom(p, packet)
|
p.core.switchTable.packetInFrom(p, packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) {
|
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
|
||||||
p.EnqueueFrom(from, func() {
|
p.RecvFrom(from, func() {
|
||||||
p._sendPackets(packets)
|
p._sendPackets(packets)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -263,7 +263,7 @@ func (p *peer) _sendPackets(packets [][]byte) {
|
|||||||
p.out(packets)
|
p.out(packets)
|
||||||
}
|
}
|
||||||
|
|
||||||
var peerLinkOutHelper phony.Actor
|
var peerLinkOutHelper phony.Inbox
|
||||||
|
|
||||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
||||||
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
||||||
@ -281,7 +281,7 @@ func (p *peer) _sendLinkPacket(packet []byte) {
|
|||||||
}
|
}
|
||||||
packet = linkPacket.encode()
|
packet = linkPacket.encode()
|
||||||
// TODO replace this with a message send if/when the link becomes an actor
|
// TODO replace this with a message send if/when the link becomes an actor
|
||||||
peerLinkOutHelper.EnqueueFrom(nil, func() {
|
peerLinkOutHelper.RecvFrom(nil, func() {
|
||||||
select {
|
select {
|
||||||
case p.linkOut <- packet:
|
case p.linkOut <- packet:
|
||||||
case <-p.done:
|
case <-p.done:
|
||||||
|
@ -37,7 +37,7 @@ import (
|
|||||||
// The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer.
|
// The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer.
|
||||||
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
|
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
|
||||||
type router struct {
|
type router struct {
|
||||||
phony.Actor
|
phony.Inbox
|
||||||
core *Core
|
core *Core
|
||||||
reconfigure chan chan error
|
reconfigure chan chan error
|
||||||
addr address.Address
|
addr address.Address
|
||||||
@ -83,8 +83,8 @@ func (r *router) start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// In practice, the switch will call this with 1 packet
|
// In practice, the switch will call this with 1 packet
|
||||||
func (r *router) handlePackets(from phony.IActor, packets [][]byte) {
|
func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
|
||||||
r.EnqueueFrom(from, func() {
|
r.RecvFrom(from, func() {
|
||||||
for _, packet := range packets {
|
for _, packet := range packets {
|
||||||
r._handlePacket(packet)
|
r._handlePacket(packet)
|
||||||
}
|
}
|
||||||
@ -92,15 +92,15 @@ func (r *router) handlePackets(from phony.IActor, packets [][]byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert a peer info into the dht, TODO? make the dht a separate actor
|
// Insert a peer info into the dht, TODO? make the dht a separate actor
|
||||||
func (r *router) insertPeer(from phony.IActor, info *dhtInfo) {
|
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
|
||||||
r.EnqueueFrom(from, func() {
|
r.RecvFrom(from, func() {
|
||||||
r.dht.insertPeer(info)
|
r.dht.insertPeer(info)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset sessions and DHT after the switch sees our coords change
|
// Reset sessions and DHT after the switch sees our coords change
|
||||||
func (r *router) reset(from phony.IActor) {
|
func (r *router) reset(from phony.Actor) {
|
||||||
r.EnqueueFrom(from, func() {
|
r.RecvFrom(from, func() {
|
||||||
r.sessions.reset()
|
r.sessions.reset()
|
||||||
r.dht.reset()
|
r.dht.reset()
|
||||||
})
|
})
|
||||||
|
@ -38,7 +38,7 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[0] }
|
|||||||
// All the information we know about an active session.
|
// All the information we know about an active session.
|
||||||
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
|
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
|
||||||
type sessionInfo struct {
|
type sessionInfo struct {
|
||||||
phony.Actor // Protects all of the below, use it any time you read/change the contents of a session
|
phony.Inbox // Protects all of the below, use it any time you read/change the contents of a session
|
||||||
sessions *sessions //
|
sessions *sessions //
|
||||||
reconfigure chan chan error //
|
reconfigure chan chan error //
|
||||||
theirAddr address.Address //
|
theirAddr address.Address //
|
||||||
@ -342,8 +342,8 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sends a session ping by calling sendPingPong in ping mode.
|
// Sends a session ping by calling sendPingPong in ping mode.
|
||||||
func (sinfo *sessionInfo) ping(from phony.IActor) {
|
func (sinfo *sessionInfo) ping(from phony.Actor) {
|
||||||
sinfo.EnqueueFrom(from, func() {
|
sinfo.RecvFrom(from, func() {
|
||||||
sinfo._sendPingPong(false)
|
sinfo._sendPingPong(false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -364,14 +364,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
|
|||||||
}
|
}
|
||||||
packet := p.encode()
|
packet := p.encode()
|
||||||
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
|
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
|
||||||
sinfo.sessions.router.EnqueueFrom(sinfo, func() { sinfo.sessions.router.out(packet) })
|
sinfo.sessions.router.RecvFrom(sinfo, func() { sinfo.sessions.router.out(packet) })
|
||||||
if sinfo.pingTime.Before(sinfo.time) {
|
if sinfo.pingTime.Before(sinfo.time) {
|
||||||
sinfo.pingTime = time.Now()
|
sinfo.pingTime = time.Now()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sinfo *sessionInfo) setConn(from phony.IActor, conn *Conn) {
|
func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) {
|
||||||
sinfo.EnqueueFrom(from, func() {
|
sinfo.RecvFrom(from, func() {
|
||||||
sinfo.conn = conn
|
sinfo.conn = conn
|
||||||
sinfo.conn.setMTU(sinfo, sinfo._getMTU())
|
sinfo.conn.setMTU(sinfo, sinfo._getMTU())
|
||||||
})
|
})
|
||||||
@ -406,7 +406,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
|
|||||||
ss.listenerMutex.Unlock()
|
ss.listenerMutex.Unlock()
|
||||||
}
|
}
|
||||||
if sinfo != nil {
|
if sinfo != nil {
|
||||||
sinfo.EnqueueFrom(ss.router, func() {
|
sinfo.RecvFrom(ss.router, func() {
|
||||||
// Update the session
|
// Update the session
|
||||||
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
|
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
|
||||||
return
|
return
|
||||||
@ -474,7 +474,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) {
|
|||||||
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
|
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
|
||||||
func (ss *sessions) reset() {
|
func (ss *sessions) reset() {
|
||||||
for _, sinfo := range ss.sinfos {
|
for _, sinfo := range ss.sinfos {
|
||||||
sinfo.EnqueueFrom(ss.router, func() {
|
sinfo.RecvFrom(ss.router, func() {
|
||||||
sinfo.reset = true
|
sinfo.reset = true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -489,8 +489,8 @@ type FlowKeyMessage struct {
|
|||||||
Message []byte
|
Message []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sinfo *sessionInfo) recv(from phony.IActor, packet *wire_trafficPacket) {
|
func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) {
|
||||||
sinfo.EnqueueFrom(from, func() {
|
sinfo.RecvFrom(from, func() {
|
||||||
sinfo._recvPacket(packet)
|
sinfo._recvPacket(packet)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -564,7 +564,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
|
|||||||
util.PutBytes(p.Payload)
|
util.PutBytes(p.Payload)
|
||||||
// Send the packet
|
// Send the packet
|
||||||
// TODO replace this with a send to the peer struct if that becomes an actor
|
// TODO replace this with a send to the peer struct if that becomes an actor
|
||||||
sinfo.sessions.router.EnqueueFrom(sinfo, func() {
|
sinfo.sessions.router.RecvFrom(sinfo, func() {
|
||||||
sinfo.sessions.router.out(packet)
|
sinfo.sessions.router.out(packet)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -576,7 +576,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sinfo *sessionInfo) checkCallbacks() {
|
func (sinfo *sessionInfo) checkCallbacks() {
|
||||||
sinfo.EnqueueFrom(nil, func() {
|
sinfo.RecvFrom(nil, func() {
|
||||||
if len(sinfo.callbacks) > 0 {
|
if len(sinfo.callbacks) > 0 {
|
||||||
select {
|
select {
|
||||||
case callback := <-sinfo.callbacks[0]:
|
case callback := <-sinfo.callbacks[0]:
|
||||||
|
@ -174,7 +174,7 @@ type switchTable struct {
|
|||||||
data switchData //
|
data switchData //
|
||||||
updater atomic.Value // *sync.Once
|
updater atomic.Value // *sync.Once
|
||||||
table atomic.Value // lookupTable
|
table atomic.Value // lookupTable
|
||||||
phony.Actor // Owns the below
|
phony.Inbox // Owns the below
|
||||||
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
||||||
idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor
|
idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor
|
||||||
}
|
}
|
||||||
@ -828,8 +828,8 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *switchTable) packetInFrom(from phony.IActor, bytes []byte) {
|
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
|
||||||
t.EnqueueFrom(from, func() {
|
t.RecvFrom(from, func() {
|
||||||
t._packetIn(bytes)
|
t._packetIn(bytes)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user