update switch blockPeer/unblockPeer logic and dht reset when coords change

This commit is contained in:
Arceliar 2020-05-16 09:25:57 -05:00
parent 433e392bdf
commit dc128121e5
4 changed files with 38 additions and 23 deletions

View File

@ -89,6 +89,11 @@ func (t *dht) reconfigure() {
// Resets the DHT in response to coord changes. // Resets the DHT in response to coord changes.
// This empties all info from the DHT and drops outstanding requests. // This empties all info from the DHT and drops outstanding requests.
func (t *dht) reset() { func (t *dht) reset() {
for _, info := range t.table {
if t.isImportant(info) {
t.ping(info, nil)
}
}
t.reqs = make(map[dhtReqKey]time.Time) t.reqs = make(map[dhtReqKey]time.Time)
t.table = make(map[crypto.NodeID]*dhtInfo) t.table = make(map[crypto.NodeID]*dhtInfo)
t.imp = nil t.imp = nil
@ -144,12 +149,8 @@ func (t *dht) insert(info *dhtInfo) {
// Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping // Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping
func (t *dht) insertPeer(info *dhtInfo) { func (t *dht) insertPeer(info *dhtInfo) {
oldInfo, isIn := t.table[*info.getNodeID()] t.insert(info) // FIXME this resets timers / ping counts / etc, so it seems kind of dangerous
if !isIn || time.Since(oldInfo.recv) > dht_max_delay+30*time.Second { t.ping(info, nil) // This is a quick fix to the above, ping them immediately...
// TODO? also check coords?
newInfo := *info // Insert a copy
t.insert(&newInfo)
}
} }
// Return true if first/second/third are (partially) ordered correctly. // Return true if first/second/third are (partially) ordered correctly.

View File

@ -376,6 +376,7 @@ func (intf *linkInterface) notifyRead(size int) {
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
} }
intf.link.core.switchTable.unblockPeer(intf, intf.peer.port)
}) })
} }

View File

@ -194,21 +194,20 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) {
}) })
} }
// This must be launched in a separate goroutine by whatever sets up the peer struct. func (ps *peers) updateDHT(from phony.Actor) {
// It handles link protocol traffic. ps.Act(from, func() {
func (p *peer) start() { for _, peer := range ps.ports {
var updateDHT func() p := peer
updateDHT = func() { if p.port == 0 {
phony.Block(p, func() { continue
select {
case <-p.done:
default:
p._updateDHT()
time.AfterFunc(time.Second, updateDHT)
} }
}) p.Act(ps, p._updateDHT)
} }
updateDHT() })
}
// This must be launched in a separate goroutine by whatever sets up the peer struct.
func (p *peer) start() {
// Just for good measure, immediately send a switch message to this peer when we start // Just for good measure, immediately send a switch message to this peer when we start
p.Act(nil, p._sendSwitchMsg) p.Act(nil, p._sendSwitchMsg)
} }

View File

@ -239,11 +239,12 @@ func (t *switchTable) _cleanRoot() {
func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
t.Act(from, func() { t.Act(from, func() {
peer, isIn := t.data.peers[port] peer, isIn := t.data.peers[port]
if !isIn { if !isIn || peer.blocked {
return return
} }
peer.blocked = true peer.blocked = true
t.data.peers[port] = peer t.data.peers[port] = peer
t._updateTable()
if port != t.parent { if port != t.parent {
return return
} }
@ -258,6 +259,18 @@ func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
}) })
} }
func (t *switchTable) unblockPeer(from phony.Actor, port switchPort) {
t.Act(from, func() {
peer, isIn := t.data.peers[port]
if !isIn || !peer.blocked {
return
}
peer.blocked = false
t.data.peers[port] = peer
t._updateTable()
})
}
// Removes a peer. // Removes a peer.
// Must be called by the router actor with a lambda that calls this. // Must be called by the router actor with a lambda that calls this.
// If the removed peer was this node's parent, it immediately tries to find a new parent. // If the removed peer was this node's parent, it immediately tries to find a new parent.
@ -482,11 +495,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
// The timestamp was updated, so we need to update locally and send to our peers. // The timestamp was updated, so we need to update locally and send to our peers.
updateRoot = true updateRoot = true
} }
// Note that we depend on the LIFO order of the stack of defers here...
if updateRoot { if updateRoot {
if !equiv(&sender.locator, &t.data.locator) { if !equiv(&sender.locator, &t.data.locator) {
doUpdate = true doUpdate = true
t.data.seq++ t.data.seq++
t.core.router.reset(nil) defer t.core.router.reset(t)
} }
if t.data.locator.tstamp != sender.locator.tstamp { if t.data.locator.tstamp != sender.locator.tstamp {
t.time = now t.time = now
@ -495,7 +509,7 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
t.parent = sender.port t.parent = sender.port
defer t.core.peers.sendSwitchMsgs(t) defer t.core.peers.sendSwitchMsgs(t)
} }
if true || doUpdate { if doUpdate {
defer t._updateTable() defer t._updateTable()
} }
return return