From 09baad48e3ec3a013eba2de58087c8c7d8c3a7ab Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 14:57:06 -0500 Subject: [PATCH] retry failed iterative searches, possibly becoming parallel if things are just slow, and keep track of / skip nodes that were already visited in the search --- src/yggdrasil/search.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index f7ef3a6c..4aa6c86c 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -21,12 +21,16 @@ import "time" //import "fmt" +const search_MAX_SEARCH_SIZE = 16 +const search_RETRY_TIME = time.Second + type searchInfo struct { dest *NodeID mask *NodeID time time.Time packet []byte toVisit []*dhtInfo + visited map[NodeID]bool } type searches struct { @@ -73,6 +77,9 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.key, coords: res.coords} for _, info := range res.infos { + if sinfo.visited[*info.getNodeID()] { + continue + } if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { sinfo.toVisit = append(sinfo.toVisit, info) } @@ -91,14 +98,14 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) }) // Truncate to some maximum size - if len(sinfo.toVisit) > 16 { - sinfo.toVisit = sinfo.toVisit[:16] + if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] } } func (s *searches) doSearchStep(sinfo *searchInfo) { - if len(sinfo.toVisit) == 0 || time.Since(sinfo.time) > 6*time.Second { - // Dead end or timeout, do cleanup + if len(sinfo.toVisit) == 0 { + // Dead end, do cleanup delete(s.searches, *sinfo.dest) return } else { @@ -106,20 +113,36 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] s.core.dht.ping(next, sinfo.dest) + sinfo.visited[*next.getNodeID()] = true } } func (s *searches) continueSearch(sinfo *searchInfo) { - if time.Since(sinfo.time) < time.Second { + if time.Since(sinfo.time) < search_RETRY_TIME { return } sinfo.time = time.Now() s.doSearchStep(sinfo) + // In case the search dies, try to spawn another thread later + // Note that this will spawn multiple parallel searches as time passes + // Any that die aren't restarted, but a new one will start later + retryLater := func() { + newSearchInfo := s.searches[*sinfo.dest] + if newSearchInfo != sinfo { + return + } + s.continueSearch(sinfo) + } + go func() { + time.Sleep(search_RETRY_TIME) + s.core.router.admin <- retryLater + }() } func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { sinfo := s.createSearch(dest, mask) sinfo.toVisit = s.core.dht.lookup(dest, false) + sinfo.visited = make(map[NodeID]bool) return sinfo }