mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-27 12:05:23 +00:00
more link migration
This commit is contained in:
parent
b5b179904b
commit
bd3eaefb72
@ -554,7 +554,8 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {
|
|||||||
func DEBUG_simLinkPeers(p, q *peer) {
|
func DEBUG_simLinkPeers(p, q *peer) {
|
||||||
// Sets q.out() to point to p and starts p.start()
|
// Sets q.out() to point to p and starts p.start()
|
||||||
goWorkers := func(source, dest *peer) {
|
goWorkers := func(source, dest *peer) {
|
||||||
source.linkOut = make(chan []byte, 1)
|
linkOut := make(chan []byte, 1)
|
||||||
|
source.linkOut = func(bs []byte) { linkOut <- bs }
|
||||||
send := make(chan []byte, 1)
|
send := make(chan []byte, 1)
|
||||||
source.out = func(bss [][]byte) {
|
source.out = func(bss [][]byte) {
|
||||||
for _, bs := range bss {
|
for _, bs := range bss {
|
||||||
@ -566,7 +567,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
|
|||||||
var packets [][]byte
|
var packets [][]byte
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case packet := <-source.linkOut:
|
case packet := <-linkOut:
|
||||||
packets = append(packets, packet)
|
packets = append(packets, packet)
|
||||||
continue
|
continue
|
||||||
case packet := <-send:
|
case packet := <-send:
|
||||||
@ -583,7 +584,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case packet := <-source.linkOut:
|
case packet := <-linkOut:
|
||||||
packets = append(packets, packet)
|
packets = append(packets, packet)
|
||||||
case packet := <-send:
|
case packet := <-send:
|
||||||
packets = append(packets, packet)
|
packets = append(packets, packet)
|
||||||
|
@ -46,23 +46,23 @@ type linkInterfaceMsgIO interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type linkInterface struct {
|
type linkInterface struct {
|
||||||
name string
|
name string
|
||||||
link *link
|
link *link
|
||||||
peer *peer
|
peer *peer
|
||||||
msgIO linkInterfaceMsgIO
|
msgIO linkInterfaceMsgIO
|
||||||
info linkInfo
|
info linkInfo
|
||||||
incoming bool
|
incoming bool
|
||||||
force bool
|
force bool
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch
|
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch
|
||||||
writer linkWriter // Writes packets, notifies this linkInterface
|
writer linkWriter // Writes packets, notifies this linkInterface
|
||||||
phony.Inbox // Protects the below
|
phony.Inbox // Protects the below
|
||||||
sendTimer *time.Timer // Fires to signal that sending is blocked
|
sendTimer *time.Timer // Fires to signal that sending is blocked
|
||||||
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
|
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
|
||||||
recvTimer *time.Timer // Fires to send keep-alive traffic
|
recvTimer *time.Timer // Fires to send keep-alive traffic
|
||||||
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
|
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
|
||||||
inSwitch bool // True if the switch is tracking this link
|
inSwitch bool // True if the switch is tracking this link
|
||||||
stalled bool // True if we haven't been receiving any response traffic
|
stalled bool // True if we haven't been receiving any response traffic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) init(c *Core) error {
|
func (l *link) init(c *Core) error {
|
||||||
@ -133,9 +133,9 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
|
|||||||
incoming: incoming,
|
incoming: incoming,
|
||||||
force: force,
|
force: force,
|
||||||
}
|
}
|
||||||
intf.writer.intf = &intf
|
intf.writer.intf = &intf
|
||||||
intf.reader.intf = &intf
|
intf.reader.intf = &intf
|
||||||
intf.reader.err = make(chan error)
|
intf.reader.err = make(chan error)
|
||||||
return &intf, nil
|
return &intf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,7 +199,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
intf.link.mutex.Lock()
|
intf.link.mutex.Lock()
|
||||||
delete(intf.link.interfaces, intf.info)
|
delete(intf.link.interfaces, intf.info)
|
||||||
intf.link.mutex.Unlock()
|
intf.link.mutex.Unlock()
|
||||||
//close(intf.closed)
|
close(intf.closed)
|
||||||
}()
|
}()
|
||||||
intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name)
|
intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name)
|
||||||
}
|
}
|
||||||
@ -214,427 +214,184 @@ func (intf *linkInterface) handler() error {
|
|||||||
// More cleanup can go here
|
// More cleanup can go here
|
||||||
intf.link.core.peers.removePeer(intf.peer.port)
|
intf.link.core.peers.removePeer(intf.peer.port)
|
||||||
}()
|
}()
|
||||||
// Finish setting up the peer struct
|
|
||||||
/*
|
|
||||||
out := make(chan [][]byte, 1)
|
|
||||||
defer close(out)
|
|
||||||
intf.peer.out = func(msgs [][]byte) {
|
intf.peer.out = func(msgs [][]byte) {
|
||||||
defer func() { recover() }()
|
intf.writer.sendFrom(intf.peer, msgs, false)
|
||||||
out <- msgs
|
}
|
||||||
|
intf.peer.linkOut = func(bs []byte) {
|
||||||
|
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
intf.peer.out = func(msgs [][]byte) {
|
|
||||||
intf.writer.sendFrom(intf.peer, msgs, false)
|
|
||||||
}
|
|
||||||
intf.peer.linkOut = make(chan []byte, 1)
|
|
||||||
go func() {
|
|
||||||
// TODO fix this
|
|
||||||
for bs := range intf.peer.linkOut {
|
|
||||||
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
||||||
themAddrString := net.IP(themAddr[:]).String()
|
themAddrString := net.IP(themAddr[:]).String()
|
||||||
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
||||||
intf.link.core.log.Infof("Connected %s: %s, source %s",
|
intf.link.core.log.Infof("Connected %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
// Start the link loop
|
// Start things
|
||||||
go intf.peer.start()
|
go intf.peer.start()
|
||||||
// Start the writer
|
intf.reader.RecvFrom(nil, intf.reader._read)
|
||||||
/*
|
// Wait for the reader to finish
|
||||||
signalReady := make(chan struct{}, 1)
|
err = <-intf.reader.err
|
||||||
signalSent := make(chan bool, 1)
|
if err != nil {
|
||||||
sendAck := make(chan struct{}, 1)
|
|
||||||
sendBlocked := time.NewTimer(time.Second)
|
|
||||||
defer util.TimerStop(sendBlocked)
|
|
||||||
util.TimerStop(sendBlocked)
|
|
||||||
go func() {
|
|
||||||
defer close(signalReady)
|
|
||||||
defer close(signalSent)
|
|
||||||
interval := 4 * time.Second
|
|
||||||
tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
|
|
||||||
defer util.TimerStop(tcpTimer)
|
|
||||||
send := func(bss [][]byte) {
|
|
||||||
sendBlocked.Reset(time.Second)
|
|
||||||
size, _ := intf.msgIO.writeMsgs(bss)
|
|
||||||
util.TimerStop(sendBlocked)
|
|
||||||
select {
|
|
||||||
case signalSent <- size > 0:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
// First try to send any link protocol traffic
|
|
||||||
select {
|
|
||||||
case msg := <-intf.peer.linkOut:
|
|
||||||
send([][]byte{msg})
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// No protocol traffic to send, so reset the timer
|
|
||||||
util.TimerStop(tcpTimer)
|
|
||||||
tcpTimer.Reset(interval)
|
|
||||||
// Now block until something is ready or the timer triggers keepalive traffic
|
|
||||||
select {
|
|
||||||
case <-tcpTimer.C:
|
|
||||||
intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
|
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
|
||||||
send([][]byte{nil})
|
|
||||||
case <-sendAck:
|
|
||||||
intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
|
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
|
||||||
send([][]byte{nil})
|
|
||||||
case msg := <-intf.peer.linkOut:
|
|
||||||
send([][]byte{msg})
|
|
||||||
case msgs, ok := <-out:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
send(msgs)
|
|
||||||
for _, msg := range msgs {
|
|
||||||
util.PutBytes(msg)
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case signalReady <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
//intf.link.core.log.Tracef("Sending packet to %s: %s, source %s",
|
|
||||||
// strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
//intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle
|
|
||||||
// Used to enable/disable activity in the switch
|
|
||||||
signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
|
|
||||||
defer close(signalAlive)
|
|
||||||
ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved
|
|
||||||
go func() {
|
|
||||||
var isAlive bool
|
|
||||||
var isReady bool
|
|
||||||
var sendTimerRunning bool
|
|
||||||
var recvTimerRunning bool
|
|
||||||
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
|
|
||||||
closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?...
|
|
||||||
sendTime := time.Second
|
|
||||||
sendTimer := time.NewTimer(sendTime)
|
|
||||||
defer util.TimerStop(sendTimer)
|
|
||||||
recvTimer := time.NewTimer(recvTime)
|
|
||||||
defer util.TimerStop(recvTimer)
|
|
||||||
closeTimer := time.NewTimer(closeTime)
|
|
||||||
defer util.TimerStop(closeTimer)
|
|
||||||
for {
|
|
||||||
//intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
|
|
||||||
// strings.ToUpper(intf.info.linkType), themString, intf.info.local,
|
|
||||||
// isAlive, isReady, sendTimerRunning, recvTimerRunning)
|
|
||||||
select {
|
|
||||||
case gotMsg, ok := <-signalAlive:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
util.TimerStop(closeTimer)
|
|
||||||
closeTimer.Reset(closeTime)
|
|
||||||
util.TimerStop(recvTimer)
|
|
||||||
recvTimerRunning = false
|
|
||||||
isAlive = true
|
|
||||||
if !isReady {
|
|
||||||
// (Re-)enable in the switch
|
|
||||||
intf.link.core.switchTable.RecvFrom(nil, func() {
|
|
||||||
intf.link.core.switchTable._idleIn(intf.peer.port)
|
|
||||||
})
|
|
||||||
isReady = true
|
|
||||||
}
|
|
||||||
if gotMsg && !sendTimerRunning {
|
|
||||||
// We got a message
|
|
||||||
// Start a timer, if it expires then send a 0-sized ack to let them know we're alive
|
|
||||||
util.TimerStop(sendTimer)
|
|
||||||
sendTimer.Reset(sendTime)
|
|
||||||
sendTimerRunning = true
|
|
||||||
}
|
|
||||||
if !gotMsg {
|
|
||||||
intf.link.core.log.Tracef("Received ack from %s: %s, source %s",
|
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
|
||||||
}
|
|
||||||
case sentMsg, ok := <-signalSent:
|
|
||||||
// Stop any running ack timer
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
util.TimerStop(sendTimer)
|
|
||||||
sendTimerRunning = false
|
|
||||||
if sentMsg && !recvTimerRunning {
|
|
||||||
// We sent a message
|
|
||||||
// Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem
|
|
||||||
util.TimerStop(recvTimer)
|
|
||||||
recvTimer.Reset(recvTime)
|
|
||||||
recvTimerRunning = true
|
|
||||||
}
|
|
||||||
case _, ok := <-signalReady:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !isAlive {
|
|
||||||
// Disable in the switch
|
|
||||||
isReady = false
|
|
||||||
} else {
|
|
||||||
// Keep enabled in the switch
|
|
||||||
intf.link.core.switchTable.RecvFrom(nil, func() {
|
|
||||||
intf.link.core.switchTable._idleIn(intf.peer.port)
|
|
||||||
})
|
|
||||||
isReady = true
|
|
||||||
}
|
|
||||||
case <-sendBlocked.C:
|
|
||||||
// We blocked while trying to send something
|
|
||||||
isReady = false
|
|
||||||
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
|
||||||
case <-sendTimer.C:
|
|
||||||
// We haven't sent anything, so signal a send of a 0 packet to let them know we're alive
|
|
||||||
select {
|
|
||||||
case sendAck <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
case <-recvTimer.C:
|
|
||||||
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
|
|
||||||
isAlive = false
|
|
||||||
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
|
||||||
case <-closeTimer.C:
|
|
||||||
// We haven't received anything in a really long time, so things have died at the switch level and then some...
|
|
||||||
// Just close the connection at this point...
|
|
||||||
select {
|
|
||||||
case ret <- errors.New("timeout"):
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
intf.msgIO.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
// Run reader loop
|
|
||||||
var helper phony.Inbox
|
|
||||||
done := make(chan struct{})
|
|
||||||
var helperFunc func()
|
|
||||||
helperFunc = func() {
|
|
||||||
// The helper reads in a loop and sends to the peer
|
|
||||||
// It loops by sending itself a message, which can be delayed by backpressure
|
|
||||||
// So if the peer is busy, backpressure will pause reading until the peer catches up
|
|
||||||
msg, err := intf.msgIO.readMsg()
|
|
||||||
if len(msg) > 0 {
|
|
||||||
// TODO rewrite this if the link becomes an actor
|
|
||||||
intf.peer.handlePacketFrom(&helper, msg)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
if err != io.EOF {
|
|
||||||
select {
|
|
||||||
case ret <- err:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(done)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case signalAlive <- len(msg) > 0:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// Now try to read again
|
|
||||||
helper.RecvFrom(nil, helperFunc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the read loop
|
|
||||||
helper.RecvFrom(nil, helperFunc)
|
|
||||||
<-done // Wait for the helper to exit
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
// Remember to set `err` to something useful before returning
|
|
||||||
select {
|
|
||||||
case err = <-ret:
|
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
||||||
default:
|
} else {
|
||||||
err = nil
|
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
|
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
// Start the reader
|
|
||||||
intf.reader.RecvFrom(nil, intf.reader._read)
|
|
||||||
// Wait for the reader to finish
|
|
||||||
err = <- intf.reader.err
|
|
||||||
if err != nil {
|
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
|
||||||
} else {
|
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
|
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
phony.Inbox // Protects the below
|
|
||||||
sendTimer *time.Timer // Fires to signal that sending is blocked
|
|
||||||
stallTimer *time.Time // Fires to signal that no incoming traffic (including keep-alive) has been seen
|
|
||||||
recvTimer *time.Timer // Fires to send keep-alive traffic
|
|
||||||
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
|
|
||||||
inSwitch bool // True if the switch is tracking this link
|
|
||||||
stalled bool // True if we haven't been receiving any response traffic
|
|
||||||
*/
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
sendBlockedTime = time.Second // How long to wait before deciding a send is blocked
|
sendBlockedTime = time.Second // How long to wait before deciding a send is blocked
|
||||||
keepAliveTime = 2*time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
|
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
|
||||||
stallTime = 6*time.Second // How long to wait for response traffic before deciding the connection has stalled
|
stallTime = 6 * time.Second // How long to wait for response traffic before deciding the connection has stalled
|
||||||
closeTime = 2*switch_timeout // How long to wait before closing the link
|
closeTime = 2 * switch_timeout // How long to wait before closing the link
|
||||||
)
|
)
|
||||||
|
|
||||||
// notify the intf that we're currently sending
|
// notify the intf that we're currently sending
|
||||||
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
|
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
if !isLinkTraffic && size > 0 {
|
if !isLinkTraffic && size > 0 {
|
||||||
intf.inSwitch = false
|
intf.inSwitch = false
|
||||||
}
|
}
|
||||||
intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend)
|
intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend)
|
||||||
intf._cancelRecvTimer()
|
intf._cancelRecvTimer()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
||||||
func (intf *linkInterface) _cancelRecvTimer() {
|
func (intf *linkInterface) _cancelRecvTimer() {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
if intf.recvTimer != nil {
|
if intf.recvTimer != nil {
|
||||||
intf.recvTimer.Stop()
|
intf.recvTimer.Stop()
|
||||||
intf.recvTimer = nil
|
intf.recvTimer = nil
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// called by an AfterFunc if we appear to have timed out
|
// called by an AfterFunc if we appear to have timed out
|
||||||
func (intf *linkInterface) notifyBlockedSend() {
|
func (intf *linkInterface) notifyBlockedSend() {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
if intf.sendTimer != nil {
|
if intf.sendTimer != nil {
|
||||||
//As far as we know, we're still trying to send, and the timer fired.
|
//As far as we know, we're still trying to send, and the timer fired.
|
||||||
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify the intf that we've finished sending, returning the peer to the switch
|
// notify the intf that we've finished sending, returning the peer to the switch
|
||||||
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
|
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
intf.sendTimer.Stop()
|
intf.sendTimer.Stop()
|
||||||
intf.sendTimer = nil
|
intf.sendTimer = nil
|
||||||
if !isLinkTraffic {
|
if !isLinkTraffic {
|
||||||
intf._notifySwitch()
|
intf._notifySwitch()
|
||||||
}
|
}
|
||||||
if size > 0 && intf.stallTimer == nil {
|
if size > 0 && intf.stallTimer == nil {
|
||||||
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
|
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
|
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
|
||||||
func (intf *linkInterface) _notifySwitch() {
|
func (intf *linkInterface) _notifySwitch() {
|
||||||
if !intf.inSwitch && !intf.stalled {
|
if !intf.inSwitch && !intf.stalled {
|
||||||
intf.inSwitch = true
|
intf.inSwitch = true
|
||||||
intf.link.core.switchTable.RecvFrom(intf, func() {
|
intf.link.core.switchTable.RecvFrom(intf, func() {
|
||||||
intf.link.core.switchTable._idleIn(intf.peer.port)
|
intf.link.core.switchTable._idleIn(intf.peer.port)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
||||||
func (intf *linkInterface) notifyStalled() {
|
func (intf *linkInterface) notifyStalled() {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil {
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
intf.stalled = true
|
intf.stalled = true
|
||||||
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// reset the close timer
|
// reset the close timer
|
||||||
func (intf *linkInterface) notifyReading(from phony.Actor) {
|
func (intf *linkInterface) notifyReading(from phony.Actor) {
|
||||||
intf.RecvFrom(from, func() {
|
intf.RecvFrom(from, func() {
|
||||||
if intf.closeTimer != nil {
|
if intf.closeTimer != nil {
|
||||||
intf.closeTimer.Stop()
|
intf.closeTimer.Stop()
|
||||||
}
|
}
|
||||||
intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() })
|
intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
|
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
|
||||||
func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) {
|
func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) {
|
||||||
intf.RecvFrom(from, func() {
|
intf.RecvFrom(from, func() {
|
||||||
intf.link.core.log.Printf("DEBUG notifyReadFrom: inSwitch %v, stalled %v\n", intf.inSwitch, intf.stalled)
|
if intf.stallTimer != nil {
|
||||||
if intf.stallTimer != nil {
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer = nil
|
||||||
intf.stallTimer = nil
|
}
|
||||||
}
|
intf.stalled = false
|
||||||
intf.stalled = false
|
intf._notifySwitch()
|
||||||
intf._notifySwitch()
|
if size > 0 && intf.recvTimer == nil {
|
||||||
if size > 0 && intf.recvTimer == nil {
|
intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
|
||||||
intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
|
}
|
||||||
}
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need to send keep-alive traffic now
|
// We need to send keep-alive traffic now
|
||||||
func (intf *linkInterface) notifyDoKeepAlive() {
|
func (intf *linkInterface) notifyDoKeepAlive() {
|
||||||
intf.RecvFrom(nil, func() {
|
intf.RecvFrom(nil, func() {
|
||||||
if intf.recvTimer != nil {
|
if intf.recvTimer != nil {
|
||||||
intf.recvTimer.Stop()
|
intf.recvTimer.Stop()
|
||||||
intf.recvTimer = nil
|
intf.recvTimer = nil
|
||||||
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic
|
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
type linkWriter struct {
|
type linkWriter struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
intf *linkInterface
|
intf *linkInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
|
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
|
||||||
w.RecvFrom(from, func() {
|
w.RecvFrom(from, func() {
|
||||||
var size int
|
var size int
|
||||||
for _, bs := range bss {
|
for _, bs := range bss {
|
||||||
size += len(bs)
|
size += len(bs)
|
||||||
}
|
}
|
||||||
w.intf.notifySending(size, isLinkTraffic)
|
w.intf.notifySending(size, isLinkTraffic)
|
||||||
w.intf.msgIO.writeMsgs(bss)
|
w.intf.msgIO.writeMsgs(bss)
|
||||||
w.intf.notifySent(size, isLinkTraffic)
|
w.intf.notifySent(size, isLinkTraffic)
|
||||||
w.intf.link.core.log.Println("DEBUG: wrote something, size:", size, "isLinkTraffic:", isLinkTraffic)
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
type linkReader struct {
|
type linkReader struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
intf *linkInterface
|
intf *linkInterface
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *linkReader) _read() {
|
func (r *linkReader) _read() {
|
||||||
r.intf.notifyReading(r)
|
r.intf.notifyReading(r)
|
||||||
msg, err := r.intf.msgIO.readMsg()
|
msg, err := r.intf.msgIO.readMsg()
|
||||||
r.intf.link.core.log.Println("DEBUG read something")
|
r.intf.notifyReadFrom(r, len(msg))
|
||||||
r.intf.notifyReadFrom(r, len(msg))
|
if len(msg) > 0 {
|
||||||
if len(msg) > 0 {
|
r.intf.peer.handlePacketFrom(r, msg)
|
||||||
r.intf.peer.handlePacketFrom(r, msg)
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
r.err <- err
|
||||||
}
|
}
|
||||||
if err != nil {
|
close(r.err)
|
||||||
if err != io.EOF {
|
return
|
||||||
r.err<-err
|
}
|
||||||
}
|
// Now try to read again
|
||||||
close(r.err)
|
r.RecvFrom(nil, r._read)
|
||||||
return
|
|
||||||
}
|
|
||||||
// Now try to read again
|
|
||||||
r.RecvFrom(nil, r._read)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ type peer struct {
|
|||||||
linkShared crypto.BoxSharedKey
|
linkShared crypto.BoxSharedKey
|
||||||
endpoint string
|
endpoint string
|
||||||
firstSeen time.Time // To track uptime for getPeers
|
firstSeen time.Time // To track uptime for getPeers
|
||||||
linkOut (chan []byte) // used for protocol traffic (to bypass queues)
|
linkOut func([]byte) // used for protocol traffic (bypasses the switch)
|
||||||
dinfo *dhtInfo // used to keep the DHT working
|
dinfo *dhtInfo // used to keep the DHT working
|
||||||
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
|
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
|
||||||
done (chan struct{}) // closed to exit the linkLoop
|
done (chan struct{}) // closed to exit the linkLoop
|
||||||
@ -263,8 +263,6 @@ func (p *peer) _sendPackets(packets [][]byte) {
|
|||||||
p.out(packets)
|
p.out(packets)
|
||||||
}
|
}
|
||||||
|
|
||||||
var peerLinkOutHelper phony.Inbox
|
|
||||||
|
|
||||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
||||||
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
||||||
func (p *peer) _sendLinkPacket(packet []byte) {
|
func (p *peer) _sendLinkPacket(packet []byte) {
|
||||||
@ -280,13 +278,7 @@ func (p *peer) _sendLinkPacket(packet []byte) {
|
|||||||
Payload: bs,
|
Payload: bs,
|
||||||
}
|
}
|
||||||
packet = linkPacket.encode()
|
packet = linkPacket.encode()
|
||||||
// TODO replace this with a message send if/when the link becomes an actor
|
p.linkOut(packet)
|
||||||
peerLinkOutHelper.RecvFrom(nil, func() {
|
|
||||||
select {
|
|
||||||
case p.linkOut <- packet:
|
|
||||||
case <-p.done:
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
|
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
|
||||||
|
@ -250,7 +250,7 @@ func (t *switchTable) cleanRoot() {
|
|||||||
t.core.router.reset(nil)
|
t.core.router.reset(nil)
|
||||||
}
|
}
|
||||||
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
|
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
|
||||||
t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor
|
t.core.peers.sendSwitchMsgs(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -517,7 +517,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
|
|||||||
}
|
}
|
||||||
t.data.locator = sender.locator
|
t.data.locator = sender.locator
|
||||||
t.parent = sender.port
|
t.parent = sender.port
|
||||||
t.core.peers.sendSwitchMsgs(nil) // TODO update if/when the switch becomes an actor
|
t.core.peers.sendSwitchMsgs(t)
|
||||||
}
|
}
|
||||||
if doUpdate {
|
if doUpdate {
|
||||||
t.updater.Store(&sync.Once{})
|
t.updater.Store(&sync.Once{})
|
||||||
|
Loading…
Reference in New Issue
Block a user