Merge pull request #109 from Arceliar/dhtbackoff

Exponential DHT backoff
This commit is contained in:
Neil Alexander 2018-06-12 14:04:39 +01:00 committed by GitHub
commit bc82d035db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -44,7 +44,8 @@ type dhtInfo struct {
send time.Time // When we last sent a message
recv time.Time // When we last received a message
pings int // Decide when to drop
throttle uint8 // Number of seconds to wait before pinging a node to bootstrap buckets, gradually increases up to 1 minute
throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute
bootstrapSend time.Time // The time checked/updated as part of throttle checks
}
// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times.
@ -141,12 +142,14 @@ func (t *dht) handleRes(res *dhtRes) {
if !isIn {
return
}
now := time.Now()
rinfo := dhtInfo{
key: res.Key,
coords: res.Coords,
send: time.Now(), // Technically wrong but should be OK...
recv: time.Now(),
throttle: 1,
send: now, // Technically wrong but should be OK...
recv: now,
throttle: time.Second,
bootstrapSend: now,
}
// If they're already in the table, then keep the correct send time
bidx, isOK := t.getBucketIndex(rinfo.getNodeID())
@ -157,13 +160,15 @@ func (t *dht) handleRes(res *dhtRes) {
for _, oldinfo := range b.peers {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
rinfo.throttle += oldinfo.throttle
rinfo.throttle = oldinfo.throttle
rinfo.bootstrapSend = oldinfo.bootstrapSend
}
}
for _, oldinfo := range b.other {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
rinfo.throttle += oldinfo.throttle
rinfo.throttle = oldinfo.throttle
rinfo.bootstrapSend = oldinfo.bootstrapSend
}
}
// Insert into table
@ -266,8 +271,8 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) {
// This speeds up bootstrapping
info.recv = info.recv.Add(-time.Hour)
}
if isPeer || info.throttle > 60 {
info.throttle = 60
if isPeer || info.throttle > time.Minute {
info.throttle = time.Minute
}
// First drop any existing entry from the bucket
b.drop(&info.key)
@ -512,20 +517,39 @@ func (t *dht) doMaintenance() {
}
}
if oldest != nil && time.Since(oldest.recv) > time.Minute {
// Ping the oldest node in the DHT, but don't ping nodes that have been checked within the last minute
t.addToMill(oldest, nil)
} // if the DHT isn't empty
}
// Refresh buckets
if t.offset > last {
t.offset = 0
}
target := t.getTarget(t.offset)
for _, info := range t.lookup(target, true) {
if time.Since(info.recv) > time.Duration(info.throttle)*time.Second {
func() {
closer := t.lookup(target, false)
for _, info := range closer {
// Throttled ping of a node that's closer to the destination
if time.Since(info.recv) > info.throttle {
t.addToMill(info, target)
t.offset++
break
info.bootstrapSend = time.Now()
info.throttle *= 2
if info.throttle > time.Minute {
info.throttle = time.Minute
}
return
}
}
if len(closer) == 0 {
// If we don't know of anyone closer at all, then there's a hole in our dht
// Ping the closest node we know and ignore the throttle, to try to fill it
for _, info := range t.lookup(target, true) {
t.addToMill(info, target)
t.offset++
return
}
}
}()
//t.offset++
}
for len(t.rumorMill) > 0 {