diff --git a/cmd/yggdrasilctl/main.go b/cmd/yggdrasilctl/main.go index 94d90842..697a3faa 100644 --- a/cmd/yggdrasilctl/main.go +++ b/cmd/yggdrasilctl/main.go @@ -25,14 +25,20 @@ import ( type admin_info map[string]interface{} func main() { + // makes sure we can use defer and still return an error code to the OS + os.Exit(run()) +} + +func run() int { logbuffer := &bytes.Buffer{} logger := log.New(logbuffer, "", log.Flags()) - defer func() { + defer func() int { if r := recover(); r != nil { logger.Println("Fatal error:", r) fmt.Print(logbuffer) - os.Exit(1) + return 1 } + return 0 }() endpoint := defaults.GetDefaults().DefaultAdminListen @@ -62,12 +68,12 @@ func main() { fmt.Println("Build name:", version.BuildName()) fmt.Println("Build version:", version.BuildVersion()) fmt.Println("To get the version number of the running Yggdrasil node, run", os.Args[0], "getSelf") - return + return 0 } if len(args) == 0 { flag.Usage() - return + return 0 } if *server == endpoint { @@ -176,15 +182,15 @@ func main() { } else { fmt.Println("Admin socket returned an error but didn't specify any error text") } - os.Exit(1) + return 1 } if _, ok := recv["request"]; !ok { fmt.Println("Missing request in response (malformed response?)") - os.Exit(1) + return 1 } if _, ok := recv["response"]; !ok { fmt.Println("Missing response body (malformed response?)") - os.Exit(1) + return 1 } req := recv["request"].(map[string]interface{}) res := recv["response"].(map[string]interface{}) @@ -193,7 +199,7 @@ func main() { if json, err := json.MarshalIndent(res, "", " "); err == nil { fmt.Println(string(json)) } - os.Exit(0) + return 0 } switch strings.ToLower(req["request"].(string)) { @@ -434,7 +440,7 @@ func main() { } if v, ok := recv["status"]; ok && v != "success" { - os.Exit(1) + return 1 } - os.Exit(0) + return 0 } diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 39b2a52c..82d0aa93 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -398,6 +398,7 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) } }) c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) + phony.Block(&c.router.nodeinfo, func() {}) // Wait for sendNodeInfo before starting timer timer := time.AfterFunc(6*time.Second, func() { close(response) }) defer timer.Stop() for res := range response { diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 72165948..013fd1e4 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -387,6 +387,8 @@ func (t *dht) getImportant() []*dhtInfo { if dist < minDist { minDist = dist important = append(important, info) + } else if len(important) < 2 { + important = append(important, info) } } var temp []*dhtInfo @@ -397,6 +399,8 @@ func (t *dht) getImportant() []*dhtInfo { if dist < minDist { minDist = dist temp = append(temp, info) + } else if len(temp) < 2 { + temp = append(temp, info) } } for idx := len(temp) - 1; idx >= 0; idx-- { diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index c3e9a274..fc6250d6 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -41,7 +41,7 @@ type nodeinfoReqRes struct { // Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep // the cache/callback maps clean of stale entries func (m *nodeinfo) init(core *Core) { - m.Act(m, func() { + m.Act(nil, func() { m._init(core) }) } @@ -66,13 +66,13 @@ func (m *nodeinfo) _cleanup() { } } time.AfterFunc(time.Second*30, func() { - m.Act(m, m._cleanup) + m.Act(nil, m._cleanup) }) } // Add a callback for a nodeinfo lookup func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) { - m.Act(m, func() { + m.Act(nil, func() { m._addCallback(sender, call) }) } @@ -164,8 +164,8 @@ func (m *nodeinfo) _getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, er } // Handles a nodeinfo request/response - called from the router -func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) { - m.Act(m, func() { +func (m *nodeinfo) handleNodeInfo(from phony.Actor, nodeinfo *nodeinfoReqRes) { + m.Act(from, func() { m._handleNodeInfo(nodeinfo) }) } @@ -181,7 +181,7 @@ func (m *nodeinfo) _handleNodeInfo(nodeinfo *nodeinfoReqRes) { // Send nodeinfo request or response - called from the router func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { - m.Act(m, func() { + m.Act(nil, func() { m._sendNodeInfo(key, coords, isResponse) }) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index bd6eefdf..b08a12d3 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -250,5 +250,5 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { return } req.SendPermPub = *fromKey - r.nodeinfo.handleNodeInfo(&req) + r.nodeinfo.handleNodeInfo(r, &req) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index b0fdbb51..92fc68b6 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -16,7 +16,6 @@ package yggdrasil import ( "errors" - "sort" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -25,10 +24,8 @@ import ( // This defines the maximum number of dhtInfo that we keep track of for nodes to query in an ongoing search. const search_MAX_SEARCH_SIZE = 16 -// This defines the time after which we send a new search packet. -// Search packets are sent automatically immediately after a response is received. -// So this allows for timeouts and for long searches to become increasingly parallel. -const search_RETRY_TIME = time.Second +// This defines the time after which we time out a search (so it can restart). +const search_RETRY_TIME = 3 * time.Second // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. @@ -38,7 +35,7 @@ type searchInfo struct { mask crypto.NodeID time time.Time toVisit []*dhtInfo - visited map[crypto.NodeID]bool + visited crypto.NodeID // Closest address visited so far callback func(*sessionInfo, error) // TODO context.Context for timeout and cancellation } @@ -94,34 +91,20 @@ func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { func (sinfo *searchInfo) addToSearch(res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} - sinfo.visited[*from.getNodeID()] = true + if dht_ordered(&sinfo.dest, from.getNodeID(), &sinfo.visited) { + // Closer to the destination, so update visited + sinfo.visited = *from.getNodeID() + } for _, info := range res.Infos { - if *info.getNodeID() == sinfo.searches.router.dht.nodeID || sinfo.visited[*info.getNodeID()] { + if *info.getNodeID() == sinfo.visited { + // dht_ordered could return true here, but we want to skip it in this case continue } - if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { + if dht_ordered(&sinfo.dest, info.getNodeID(), &sinfo.visited) { // Response is closer to the destination sinfo.toVisit = append(sinfo.toVisit, info) } } - // Deduplicate - vMap := make(map[crypto.NodeID]*dhtInfo) - for _, info := range sinfo.toVisit { - vMap[*info.getNodeID()] = info - } - sinfo.toVisit = sinfo.toVisit[:0] - for _, info := range vMap { - sinfo.toVisit = append(sinfo.toVisit, info) - } - // Sort - sort.SliceStable(sinfo.toVisit, func(i, j int) bool { - // Should return true if i is closer to the destination than j - return dht_ordered(&res.Dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) - }) - // Truncate to some maximum size - if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { - sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] - } } // If there are no nodes left toVisit, then this cleans up the search. @@ -136,12 +119,13 @@ func (sinfo *searchInfo) doSearchStep() { return } // Send to the next search target - var next *dhtInfo - next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] - rq := dhtReqKey{next.key, sinfo.dest} - sinfo.searches.router.dht.addCallback(&rq, sinfo.handleDHTRes) - sinfo.searches.router.dht.ping(next, &sinfo.dest) - sinfo.time = time.Now() + for _, next := range sinfo.toVisit { + rq := dhtReqKey{next.key, sinfo.dest} + sinfo.searches.router.dht.addCallback(&rq, sinfo.handleDHTRes) + sinfo.searches.router.dht.ping(next, &sinfo.dest) + sinfo.time = time.Now() + } + sinfo.toVisit = sinfo.toVisit[:0] } // If we've recently sent a ping for this search, do nothing. @@ -166,7 +150,7 @@ func (sinfo *searchInfo) continueSearch() { // Calls create search, and initializes the iterative search parts of the struct before returning it. func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { sinfo := s.createSearch(dest, mask, callback) - sinfo.visited = make(map[crypto.NodeID]bool) + sinfo.visited = s.router.dht.nodeID loc := s.router.core.switchTable.getLocator() sinfo.toVisit = append(sinfo.toVisit, &dhtInfo{ key: s.router.core.boxPub,