bugfixes related to peer timeouts in the DHT, significantly improve DHT bootstrap speed

This commit is contained in:
Arceliar 2018-05-17 21:20:31 -05:00
parent 8d9887294c
commit fe518f4e3f
3 changed files with 57 additions and 39 deletions

View File

@ -93,7 +93,7 @@ func (t *dht) handleReq(req *dhtReq) {
key: t.core.boxPub, key: t.core.boxPub,
coords: coords, coords: coords,
dest: req.dest, dest: req.dest,
infos: t.lookup(&req.dest), infos: t.lookup(&req.dest, false),
} }
t.sendRes(&res, req) t.sendRes(&res, req)
// Also (possibly) add them to our DHT // Also (possibly) add them to our DHT
@ -152,7 +152,7 @@ func (t *dht) handleRes(res *dhtRes) {
} }
} }
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo {
// FIXME this allocates a bunch, sorts, and keeps the part it likes // FIXME this allocates a bunch, sorts, and keeps the part it likes
// It would be better to only track the part it likes to begin with // It would be better to only track the part it likes to begin with
addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo { addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo {
@ -160,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
if info == nil { if info == nil {
panic("Should never happen!") panic("Should never happen!")
} }
if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
res = append(res, info) res = append(res, info)
} }
} }
@ -197,13 +197,15 @@ func (t *dht) nBuckets() int {
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
// Insert a peer if and only if the bucket doesn't already contain it // Always inserts peers, inserts other nodes if not already present
if !t.shouldInsert(info) { if isPeer || t.shouldInsert(info) {
return // We've never heard this node before
// TODO is there a better time than "now" to set send/recv to?
// (Is there another "natural" choice that bootstraps faster?)
info.send = time.Now()
info.recv = info.send
t.insert(info, isPeer)
} }
info.send = time.Now()
info.recv = info.send
t.insert(info, isPeer)
} }
func (t *dht) insert(info *dhtInfo, isPeer bool) { func (t *dht) insert(info *dhtInfo, isPeer bool) {
@ -217,6 +219,11 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) {
return return
} }
b := t.getBucket(bidx) b := t.getBucket(bidx)
if !isPeer && !b.containsOther(info) {
// This is a new entry, give it an old age so it's pinged sooner
// This speeds up bootstrapping
info.recv = info.recv.Add(-time.Minute)
}
// First drop any existing entry from the bucket // First drop any existing entry from the bucket
b.drop(&info.key) b.drop(&info.key)
// Now add to the *end* of the bucket // Now add to the *end* of the bucket
@ -243,36 +250,45 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
return t.nBuckets(), false return t.nBuckets(), false
} }
func (b *bucket) contains(ninfo *dhtInfo) bool { func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool {
// Compares if key and coords match // Compares if key and coords match
var found bool if newInfo == nil {
check := func(infos []*dhtInfo) { panic("Should never happen")
for _, info := range infos { }
if info == nil { for _, info := range infos {
panic("Should never happen") if info == nil {
} panic("Should never happen")
if info.key != info.key { }
continue if info.key != newInfo.key {
} continue
if len(info.coords) != len(ninfo.coords) { }
continue if len(info.coords) != len(newInfo.coords) {
} continue
match := true }
for idx := 0; idx < len(info.coords); idx++ { match := true
if info.coords[idx] != ninfo.coords[idx] { for idx := 0; idx < len(info.coords); idx++ {
match = false if info.coords[idx] != newInfo.coords[idx] {
break match = false
}
}
if match {
found = true
break break
} }
} }
if match {
return true
}
} }
check(b.peers) return false
check(b.other) }
return found
func (b *bucket) containsPeer(info *dhtInfo) bool {
return dht_bucket_check(info, b.peers)
}
func (b *bucket) containsOther(info *dhtInfo) bool {
return dht_bucket_check(info, b.other)
}
func (b *bucket) contains(info *dhtInfo) bool {
return b.containsPeer(info) || b.containsOther(info)
} }
func (b *bucket) drop(key *boxPubKey) { func (b *bucket) drop(key *boxPubKey) {
@ -436,13 +452,14 @@ func (t *dht) doMaintenance() {
t.offset = 0 t.offset = 0
} }
target := t.getTarget(t.offset) target := t.getTarget(t.offset)
for _, info := range t.lookup(target) { for _, info := range t.lookup(target, true) {
if time.Since(info.recv) > time.Minute { if time.Since(info.recv) > time.Minute {
t.addToMill(info, target) t.addToMill(info, target)
t.offset++
break break
} }
} }
t.offset++ //t.offset++
} }
for len(t.rumorMill) > 0 { for len(t.rumorMill) > 0 {
var rumor dht_rumor var rumor dht_rumor
@ -466,7 +483,7 @@ func (t *dht) shouldInsert(info *dhtInfo) bool {
return false return false
} }
b := t.getBucket(bidx) b := t.getBucket(bidx)
if b.contains(info) { if b.containsOther(info) {
return false return false
} }
if len(b.other) < dht_bucket_size { if len(b.other) < dht_bucket_size {

View File

@ -77,7 +77,8 @@ func (r *router) mainLoop() {
case p := <-r.send: case p := <-r.send:
r.sendPacket(p) r.sendPacket(p)
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
r.core.dht.insertIfNew(info, true) r.core.dht.insertIfNew(info, false) // Insert as a normal node
r.core.dht.insertIfNew(info, true) // Insert as a peer
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.resetInits()
r.core.dht.reset() r.core.dht.reset()

View File

@ -84,7 +84,7 @@ func (s *searches) sendSearch(info *searchInfo) {
} }
func (s *searches) handleSearchReq(req *searchReq) { func (s *searches) handleSearchReq(req *searchReq) {
lookup := s.core.dht.lookup(&req.dest) lookup := s.core.dht.lookup(&req.dest, false)
sent := false sent := false
//fmt.Println("DEBUG len:", len(lookup)) //fmt.Println("DEBUG len:", len(lookup))
for _, info := range lookup { for _, info := range lookup {