close long-dead connections in link.go instead of in switch.go, this is important in case a connection opens but never bothers to send even one switch message

This commit is contained in:
Arceliar 2019-02-24 13:24:55 -06:00
parent bb3edd5e55
commit 654407dc6d
2 changed files with 11 additions and 24 deletions

View File

@ -229,11 +229,14 @@ func (intf *linkInterface) handler() error {
var sendTimerRunning bool var sendTimerRunning bool
var recvTimerRunning bool var recvTimerRunning bool
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?...
sendTime := time.Second sendTime := time.Second
sendTimer := time.NewTimer(sendTime) sendTimer := time.NewTimer(sendTime)
defer util.TimerStop(sendTimer) defer util.TimerStop(sendTimer)
recvTimer := time.NewTimer(recvTime) recvTimer := time.NewTimer(recvTime)
defer util.TimerStop(recvTimer) defer util.TimerStop(recvTimer)
closeTimer := time.NewTimer(closeTime)
defer util.TimerStop(closeTimer)
for { for {
//intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
// strings.ToUpper(intf.info.linkType), themString, intf.info.local, // strings.ToUpper(intf.info.linkType), themString, intf.info.local,
@ -243,6 +246,7 @@ func (intf *linkInterface) handler() error {
if !ok { if !ok {
return return
} }
util.TimerStop(closeTimer)
util.TimerStop(recvTimer) util.TimerStop(recvTimer)
recvTimerRunning = false recvTimerRunning = false
isAlive = true isAlive = true
@ -274,6 +278,8 @@ func (intf *linkInterface) handler() error {
// Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem
util.TimerStop(recvTimer) util.TimerStop(recvTimer)
recvTimer.Reset(recvTime) recvTimer.Reset(recvTime)
util.TimerStop(closeTimer)
closeTimer.Reset(closeTime)
recvTimerRunning = true recvTimerRunning = true
} }
case _, ok := <-signalReady: case _, ok := <-signalReady:
@ -297,6 +303,10 @@ func (intf *linkInterface) handler() error {
case <-recvTimer.C: case <-recvTimer.C:
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
isAlive = false isAlive = false
case <-closeTimer.C:
// We haven't received anything in a really long time, so things have died at the switch level and then some...
// Just close the connection at this point...
intf.msgIO.close()
} }
} }
}() }()

View File

@ -215,7 +215,6 @@ func (t *switchTable) doMaintenance() {
defer t.mutex.Unlock() // Release lock when we're done defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot() t.cleanRoot()
t.cleanDropped() t.cleanDropped()
t.cleanPeers()
} }
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
@ -272,28 +271,6 @@ func (t *switchTable) forgetPeer(port switchPort) {
} }
} }
// Clean all unresponsive peers from the table, needed in case a peer stops updating.
// Needed in case a non-parent peer keeps the connection open but stops sending updates.
// Also reclaims space from deleted peers by copying the map.
func (t *switchTable) cleanPeers() {
now := time.Now()
for port, peer := range t.data.peers {
if now.Sub(peer.time) > switch_timeout+switch_throttle {
// Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding.
delete(t.data.peers, port)
go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe
}
}
if _, isIn := t.data.peers[t.parent]; !isIn {
// The root timestamp would probably time out before this happens, but better safe than sorry.
// We removed the current parent, so find a new one.
t.parent = 0
for _, peer := range t.data.peers {
t.unlockedHandleMsg(&peer.msg, peer.port, true)
}
}
}
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup. // This function is called periodically to do that cleanup.