try to fix some possible races with how peers are added/removed and how they're blocked in the switch when they enter a bad state

This commit is contained in:
Arceliar 2020-06-06 12:30:54 -05:00
parent aec82d7a39
commit 3fded209df
2 changed files with 58 additions and 34 deletions

View File

@ -64,8 +64,9 @@ type link struct {
keepAliveTimer *time.Timer // Fires to send keep-alive traffic keepAliveTimer *time.Timer // Fires to send keep-alive traffic
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
isSending bool // True between a notifySending and a notifySent readUnblocked bool // True if we've sent a read message unblocking this peer in the switch
blocked bool // True if we've blocked the peer in the switch writeUnblocked bool // True if we've sent a write message unblocking this peer in the swithc
shutdown bool // True if we're shutting down, avoids sending some messages that could race with new peers being crated in the same port
} }
type linkOptions struct { type linkOptions struct {
@ -285,7 +286,10 @@ func (intf *link) handler() error {
} }
defer func() { defer func() {
// More cleanup can go here // More cleanup can go here
intf.peer.Act(nil, intf.peer._removeSelf) intf.Act(nil, func() {
intf.shutdown = true
intf.peer.Act(intf, intf.peer._removeSelf)
})
}() }()
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
@ -385,7 +389,6 @@ const (
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *link) notifySending(size int) { func (intf *link) notifySending(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
intf.isSending = true
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
if intf.keepAliveTimer != nil { if intf.keepAliveTimer != nil {
intf.keepAliveTimer.Stop() intf.keepAliveTimer.Stop()
@ -400,10 +403,14 @@ func (intf *link) notifySending(size int) {
// through other links, if alternatives exist // through other links, if alternatives exist
func (intf *link) notifyBlockedSend() { func (intf *link) notifyBlockedSend() {
intf.Act(nil, func() { intf.Act(nil, func() {
if intf.sendTimer != nil && !intf.blocked { if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.blocked = true intf.sendTimer.Stop()
intf.links.core.switchTable.blockPeer(intf, intf.peer.port) intf.sendTimer = nil
if !intf.shutdown && intf.writeUnblocked {
intf.writeUnblocked = false
intf.links.core.switchTable.blockPeer(intf, intf.peer.port, true)
}
} }
}) })
} }
@ -421,10 +428,13 @@ func (intf *link) notifySent(size int) {
intf.keepAliveTimer = nil intf.keepAliveTimer = nil
} }
intf._notifyIdle() intf._notifyIdle()
intf.isSending = false
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
} }
if !intf.shutdown && !intf.writeUnblocked {
intf.writeUnblocked = true
intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, true)
}
}) })
} }
@ -439,9 +449,9 @@ func (intf *link) notifyStalled() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
if !intf.blocked { if !intf.shutdown && intf.readUnblocked {
intf.blocked = true intf.readUnblocked = false
intf.links.core.switchTable.blockPeer(intf, intf.peer.port) intf.links.core.switchTable.blockPeer(intf, intf.peer.port, false)
} }
} }
}) })
@ -467,9 +477,9 @@ func (intf *link) notifyRead(size int) {
if size > 0 && intf.keepAliveTimer == nil { if size > 0 && intf.keepAliveTimer == nil {
intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
} }
if intf.blocked { if !intf.shutdown && !intf.readUnblocked {
intf.blocked = false intf.readUnblocked = true
intf.links.core.switchTable.unblockPeer(intf, intf.peer.port) intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, false)
} }
}) })
} }

View File

@ -136,14 +136,19 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool {
// Information about a peer, used by the switch to build the tree and eventually make routing decisions. // Information about a peer, used by the switch to build the tree and eventually make routing decisions.
type peerInfo struct { type peerInfo struct {
key crypto.SigPubKey // ID of this peer key crypto.SigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree degree uint64 // Self-reported degree
time time.Time // Time this node was last seen time time.Time // Time this node was last seen
faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
port switchPort // Interface number of this peer port switchPort // Interface number of this peer
msg switchMsg // The wire switchMsg used msg switchMsg // The wire switchMsg used
blocked bool // True if the link is blocked, used to avoid parenting a blocked link readBlock bool // True if the link notified us of a read that blocked too long
writeBlock bool // True of the link notified us of a write that blocked too long
}
func (pinfo *peerInfo) blocked() bool {
return pinfo.readBlock || pinfo.writeBlock
} }
// This is just a uint64 with a named type for clarity reasons. // This is just a uint64 with a named type for clarity reasons.
@ -250,15 +255,19 @@ func (t *switchTable) _cleanRoot() {
} }
// Blocks and, if possible, unparents a peer // Blocks and, if possible, unparents a peer
func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { func (t *switchTable) blockPeer(from phony.Actor, port switchPort, isWrite bool) {
t.Act(from, func() { t.Act(from, func() {
peer, isIn := t.data.peers[port] peer, isIn := t.data.peers[port]
if !isIn || peer.blocked { switch {
case isIn && !isWrite && !peer.readBlock:
peer.readBlock = true
case isIn && isWrite && !peer.writeBlock:
peer.writeBlock = true
default:
return return
} }
peer.blocked = true
t.data.peers[port] = peer t.data.peers[port] = peer
t._updateTable() defer t._updateTable()
if port != t.parent { if port != t.parent {
return return
} }
@ -273,13 +282,17 @@ func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
}) })
} }
func (t *switchTable) unblockPeer(from phony.Actor, port switchPort) { func (t *switchTable) unblockPeer(from phony.Actor, port switchPort, isWrite bool) {
t.Act(from, func() { t.Act(from, func() {
peer, isIn := t.data.peers[port] peer, isIn := t.data.peers[port]
if !isIn || !peer.blocked { switch {
case isIn && !isWrite && peer.readBlock:
peer.readBlock = false
case isIn && isWrite && peer.writeBlock:
peer.writeBlock = false
default:
return return
} }
peer.blocked = false
t.data.peers[port] = peer t.data.peers[port] = peer
t._updateTable() t._updateTable()
}) })
@ -422,7 +435,8 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
if reprocessing { if reprocessing {
sender.faster = oldSender.faster sender.faster = oldSender.faster
sender.time = oldSender.time sender.time = oldSender.time
sender.blocked = oldSender.blocked sender.readBlock = oldSender.readBlock
sender.writeBlock = oldSender.writeBlock
} else { } else {
sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
for port, peer := range t.data.peers { for port, peer := range t.data.peers {
@ -445,7 +459,7 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
} }
} }
} }
if sender.blocked != oldSender.blocked { if sender.blocked() != oldSender.blocked() {
doUpdate = true doUpdate = true
} }
// Update sender // Update sender
@ -485,10 +499,10 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
case sender.faster[t.parent] >= switch_faster_threshold: case sender.faster[t.parent] >= switch_faster_threshold:
// The is reliably faster than the current parent. // The is reliably faster than the current parent.
updateRoot = true updateRoot = true
case !sender.blocked && oldParent.blocked: case !sender.blocked() && oldParent.blocked():
// Replace a blocked parent // Replace a blocked parent
updateRoot = true updateRoot = true
case reprocessing && sender.blocked && !oldParent.blocked: case reprocessing && sender.blocked() && !oldParent.blocked():
// Don't replace an unblocked parent when reprocessing // Don't replace an unblocked parent when reprocessing
case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
// The sender seems to be reliably faster than the current parent, so switch to them instead. // The sender seems to be reliably faster than the current parent, so switch to them instead.
@ -546,7 +560,7 @@ func (t *switchTable) _updateTable() {
} }
newTable._init() newTable._init()
for _, pinfo := range t.data.peers { for _, pinfo := range t.data.peers {
if pinfo.blocked || pinfo.locator.root != newTable.self.root { if pinfo.blocked() || pinfo.locator.root != newTable.self.root {
continue continue
} }
loc := pinfo.locator.clone() loc := pinfo.locator.clone()