diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 7091cf7b..f6229036 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -130,8 +130,8 @@ func (c *Conn) search() error { close(done) } } - sinfo, infos := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - sinfo.continueSearch(infos) + sinfo := c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo.startSearch() } else { err = errors.New("search already exists") close(done) @@ -152,11 +152,10 @@ func (c *Conn) doSearch() { if !isIn { // Nothing was found, so create a new search searchCompleted := func(sinfo *sessionInfo, e error) {} - var infos []*dhtInfo - sinfo, infos = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) // Start the search - sinfo.continueSearch(infos) + sinfo.startSearch() } } c.core.router.Act(c.session, routerWork) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index ce78fd1f..bf8c7819 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -83,20 +83,15 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { // Use results to start an additional search thread infos := append([]*dhtInfo(nil), res.Infos...) infos = sinfo.getAllowedInfos(infos) - sinfo.continueSearch(infos) + if len(infos) > 0 { + sinfo.continueSearch(infos) + } } } // If there has been no response in too long, then this cleans up the search. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. func (sinfo *searchInfo) doSearchStep(infos []*dhtInfo) { - if time.Since(sinfo.time) > search_RETRY_TIME { - // Dead end and no response in too long, do cleanup - // FIXME we should really let all the parallel search threads exist when info is empty, and then delete when no threads are left, instead of keeping them all around until things time out or exit successfully - delete(sinfo.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) - return - } if len(infos) > 0 { // Send to the next search target next := infos[0] @@ -124,16 +119,12 @@ func (sinfo *searchInfo) getAllowedInfos(infos []*dhtInfo) []*dhtInfo { return infos } -// If we've recently sent a ping for this search, do nothing. -// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. +// Run doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. +// Must not be called with an empty list of infos func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { sinfo.doSearchStep(infos) - if len(infos) > 0 { - infos = infos[1:] // Remove the node we just tried - } - // In case the search dies, try to spawn another thread later - // Note that this will spawn multiple parallel searches as time passes - // Any that die aren't restarted, but a new one will start later + infos = infos[1:] // Remove the node we just tried + // In case there's no response, try the next node in infos later time.AfterFunc(search_STEP_TIME, func() { sinfo.searches.router.Act(nil, func() { // FIXME this keeps the search alive forever if not for the searches map, fix that @@ -143,7 +134,9 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { } // Get good infos here instead of at the top, to make sure we can always start things off with a continueSearch call to ourself infos = sinfo.getAllowedInfos(infos) - sinfo.continueSearch(infos) + if len(infos) > 0 { + sinfo.continueSearch(infos) + } }) }) } @@ -156,40 +149,34 @@ func (sinfo *searchInfo) startSearch() { key: sinfo.searches.router.core.boxPub, coords: loc.getCoords(), }) - // Start the search by asking ourself, useful if we're the destination - sinfo.continueSearch(infos) - // Start a timer to clean up the search if everything times out - var cleanupFunc func() - cleanupFunc = func() { + // Start the search by asking ourself, useful if we're the destination + sinfo.continueSearch(infos) + // Start a timer to clean up the search if everything times out + var cleanupFunc func() + cleanupFunc = func() { sinfo.searches.router.Act(nil, func() { // FIXME this keeps the search alive forever if not for the searches map, fix that newSearchInfo := sinfo.searches.searches[sinfo.dest] if newSearchInfo != sinfo { return } - elapsed := time.Since(sinfo.time) - if elapsed > search_RETRY_TIME { - // cleanup - delete(sinfo.searches.searches, sinfo.dest) - sinfo.callback(nil, errors.New("search reached dead end")) - return - } - time.AfterFunc(search_RETRY_TIME - elapsed, cleanupFunc) - }) - } + elapsed := time.Since(sinfo.time) + if elapsed > search_RETRY_TIME { + // cleanup + delete(sinfo.searches.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) + return + } + time.AfterFunc(search_RETRY_TIME-elapsed, cleanupFunc) + }) + } } // Calls create search, and initializes the iterative search parts of the struct before returning it. -func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) (*searchInfo, []*dhtInfo) { +func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { sinfo := s.createSearch(dest, mask, callback) sinfo.visited = s.router.dht.nodeID - loc := s.router.core.switchTable.getLocator() - var infos []*dhtInfo - infos = append(infos, &dhtInfo{ - key: s.router.core.boxPub, - coords: loc.getCoords(), - }) // Start the search by asking ourself, useful if we're the destination - return sinfo, infos + return sinfo } // Checks if a dhtRes is good (called by handleDHTRes).