From ecf7e490d75fb55cd6515e7f690c6973102b6798 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 1 Jun 2018 23:34:21 -0500 Subject: [PATCH 1/7] start of iterative dht searches --- src/yggdrasil/admin.go | 6 +-- src/yggdrasil/core.go | 2 +- src/yggdrasil/dht.go | 1 + src/yggdrasil/release.go | 2 +- src/yggdrasil/router.go | 4 +- src/yggdrasil/search.go | 97 ++++++++++++++++++++++++++++++++++++++-- 6 files changed, 101 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index acc25d7f..07b74310 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -583,9 +583,9 @@ func (a *admin) getResponse_dot() []byte { infos[info.key] = info } } - addInfo(dht, "fillcolor=\"#ffffff\" style=filled", "Known in DHT") // white - addInfo(sessions, "fillcolor=\"#acf3fd\" style=filled", "Open session") // blue - addInfo(peers, "fillcolor=\"#ffffb5\" style=filled", "Connected peer") // yellow + addInfo(dht, "fillcolor=\"#ffffff\" style=filled", "Known in DHT") // white + addInfo(sessions, "fillcolor=\"#acf3fd\" style=filled", "Open session") // blue + addInfo(peers, "fillcolor=\"#ffffb5\" style=filled", "Connected peer") // yellow addInfo(append([]admin_nodeInfo(nil), *self), "fillcolor=\"#a5ff8a\" style=filled", "This node") // green // Get coords as a slice of strings, FIXME? this looks very fragile coordSlice := func(coords string) []string { diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 9938266e..ff9c3602 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -172,7 +172,7 @@ func (c *Core) GetAddress() *net.IP { func (c *Core) GetSubnet() *net.IPNet { subnet := address_subnetForNodeID(c.GetNodeID())[:] subnet = append(subnet, 0, 0, 0, 0, 0, 0, 0, 0) - return &net.IPNet{ IP: subnet, Mask: net.CIDRMask(64, 128) } + return &net.IPNet{IP: subnet, Mask: net.CIDRMask(64, 128)} } // Sets the output logger of the Yggdrasil node after startup. This may be diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 1f03bee5..94a93f30 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -106,6 +106,7 @@ func (t *dht) handleReq(req *dhtReq) { } func (t *dht) handleRes(res *dhtRes) { + t.core.searches.handleDHTRes(res) reqs, isIn := t.reqs[res.key] if !isIn { return diff --git a/src/yggdrasil/release.go b/src/yggdrasil/release.go index fdbe8d5f..1b5e9caf 100644 --- a/src/yggdrasil/release.go +++ b/src/yggdrasil/release.go @@ -8,5 +8,5 @@ import "log" // Starts the function profiler. This is only supported when built with // '-tags build'. func StartProfiler(_ *log.Logger) error { - return errors.New("Release builds do not support -pprof, build using '-tags debug'") + return errors.New("Release builds do not support -pprof, build using '-tags debug'") } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 545a15be..d4e0c8e6 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -128,12 +128,12 @@ func (r *router) sendPacket(bs []byte) { } sinfo, isIn := r.core.searches.searches[*nodeID] if !isIn { - sinfo = r.core.searches.createSearch(nodeID, mask) + sinfo = r.core.searches.newIterSearch(nodeID, mask) } if packet != nil { sinfo.packet = packet } - r.core.searches.sendSearch(sinfo) + r.core.searches.continueSearch(sinfo) } var sinfo *sessionInfo var isIn bool diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index d4406615..ff5dd54b 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -16,15 +16,17 @@ package yggdrasil // The iterative parallel lookups from kad can skip over some DHT blackholes // This hides bugs, which I don't want to do right now +import "sort" import "time" //import "fmt" type searchInfo struct { - dest *NodeID - mask *NodeID - time time.Time - packet []byte + dest *NodeID + mask *NodeID + time time.Time + packet []byte + toVisit []*dhtInfo } type searches struct { @@ -55,6 +57,93 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { //////////////////////////////////////////////////////////////////////////////// +func (s *searches) handleDHTRes(res *dhtRes) { + if s.checkDHTRes(res) { + return + } + s.addToSearch(res) +} + +func (s *searches) addToSearch(res *dhtRes) { + // TODO + sinfo, isIn := s.searches[res.dest] + if !isIn { + return + } + from := dhtInfo{key: res.key, coords: res.coords} + for _, info := range res.infos { + if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { + sinfo.toVisit = append(sinfo.toVisit, info) + } + } + sort.SliceStable(sinfo.toVisit, func(i, j int) bool { + return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) + }) + s.doSearchStep(sinfo) +} + +func (s *searches) doSearchStep(sinfo *searchInfo) { + if len(sinfo.toVisit) == 0 || time.Since(sinfo.time) > 6*time.Second { + // Dead end or timeout, do cleanup + delete(s.searches, *sinfo.dest) + return + } else { + // Send to the next search target + var next *dhtInfo + next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + s.core.dht.ping(next, sinfo.dest) + } +} + +func (s *searches) continueSearch(sinfo *searchInfo) { + if time.Since(sinfo.time) < time.Second { + return + } + sinfo.time = time.Now() + s.doSearchStep(sinfo) +} + +func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { + sinfo := s.createSearch(dest, mask) + sinfo.toVisit = s.core.dht.lookup(dest, false) + return sinfo +} + +func (s *searches) checkDHTRes(res *dhtRes) bool { + info, isIn := s.searches[res.dest] + if !isIn { + return false + } + them := getNodeID(&res.key) + var destMasked NodeID + var themMasked NodeID + for idx := 0; idx < NodeIDLen; idx++ { + destMasked[idx] = info.dest[idx] & info.mask[idx] + themMasked[idx] = them[idx] & info.mask[idx] + } + if themMasked != destMasked { + return false + } + // They match, so create a session and send a sessionRequest + sinfo, isIn := s.core.sessions.getByTheirPerm(&res.key) + if !isIn { + sinfo = s.core.sessions.createSession(&res.key) + _, isIn := s.core.sessions.getByTheirPerm(&res.key) + if !isIn { + panic("This should never happen") + } + } + // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? + sinfo.coords = res.coords + sinfo.packet = info.packet + s.core.sessions.ping(sinfo) + // Cleanup + delete(s.searches, res.dest) + return true +} + +//////////////////////////////////////////////////////////////////////////////// + type searchReq struct { key boxPubKey // Who I am coords []byte // Where I am From 10a72444e37544f2f2adb5c862c159d389a3ae00 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 00:16:47 -0500 Subject: [PATCH 2/7] get itersearch to run in the sim --- misc/sim/treesim.go | 4 ++-- src/yggdrasil/debug.go | 2 ++ src/yggdrasil/search.go | 23 ++++++++++------------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 9b2d0af5..8ea06596 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -413,10 +413,10 @@ func main() { } fmt.Println("Test") Util_testAddrIDMask() - //idxstore := makeStoreSquareGrid(4) + idxstore := makeStoreSquareGrid(4) //idxstore := makeStoreStar(256) //idxstore := loadGraph("misc/sim/hype-2016-09-19.list") - idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") + //idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") //idxstore := loadGraph("skitter") kstore := getKeyedStore(idxstore) //* diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 42d2b471..a987fbdf 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -35,6 +35,8 @@ func (c *Core) Init() { bpub, bpriv := newBoxKeys() spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) + c.router.start() + c.switchTable.start() } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index ff5dd54b..3355f090 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -58,18 +58,20 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { //////////////////////////////////////////////////////////////////////////////// func (s *searches) handleDHTRes(res *dhtRes) { - if s.checkDHTRes(res) { + sinfo, isIn := s.searches[res.dest] + if !isIn || s.checkDHTRes(sinfo, res) { + // Either we don't recognize this search, or we just finished it return + } else { + // Add to the search and continue + s.addToSearch(sinfo, res) + s.doSearchStep(sinfo) } - s.addToSearch(res) } -func (s *searches) addToSearch(res *dhtRes) { +func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // TODO - sinfo, isIn := s.searches[res.dest] - if !isIn { - return - } + from := dhtInfo{key: res.key, coords: res.coords} for _, info := range res.infos { if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { @@ -79,7 +81,6 @@ func (s *searches) addToSearch(res *dhtRes) { sort.SliceStable(sinfo.toVisit, func(i, j int) bool { return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) }) - s.doSearchStep(sinfo) } func (s *searches) doSearchStep(sinfo *searchInfo) { @@ -109,11 +110,7 @@ func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { return sinfo } -func (s *searches) checkDHTRes(res *dhtRes) bool { - info, isIn := s.searches[res.dest] - if !isIn { - return false - } +func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { them := getNodeID(&res.key) var destMasked NodeID var themMasked NodeID From ed6c9c2a5477332ddaadc0ad5fae57db07499198 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 00:29:36 -0500 Subject: [PATCH 3/7] deduplicate dht responses when adding them to the search, limit the search toVisit size --- src/yggdrasil/search.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 3355f090..f7ef3a6c 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -70,17 +70,30 @@ func (s *searches) handleDHTRes(res *dhtRes) { } func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { - // TODO - + // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.key, coords: res.coords} for _, info := range res.infos { if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { sinfo.toVisit = append(sinfo.toVisit, info) } } + // Deduplicate + vMap := make(map[NodeID]*dhtInfo) + for _, info := range sinfo.toVisit { + vMap[*info.getNodeID()] = info + } + sinfo.toVisit = sinfo.toVisit[:0] + for _, info := range vMap { + sinfo.toVisit = append(sinfo.toVisit, info) + } + // Sort sort.SliceStable(sinfo.toVisit, func(i, j int) bool { return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) }) + // Truncate to some maximum size + if len(sinfo.toVisit) > 16 { + sinfo.toVisit = sinfo.toVisit[:16] + } } func (s *searches) doSearchStep(sinfo *searchInfo) { From 09baad48e3ec3a013eba2de58087c8c7d8c3a7ab Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 14:57:06 -0500 Subject: [PATCH 4/7] retry failed iterative searches, possibly becoming parallel if things are just slow, and keep track of / skip nodes that were already visited in the search --- src/yggdrasil/search.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index f7ef3a6c..4aa6c86c 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -21,12 +21,16 @@ import "time" //import "fmt" +const search_MAX_SEARCH_SIZE = 16 +const search_RETRY_TIME = time.Second + type searchInfo struct { dest *NodeID mask *NodeID time time.Time packet []byte toVisit []*dhtInfo + visited map[NodeID]bool } type searches struct { @@ -73,6 +77,9 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.key, coords: res.coords} for _, info := range res.infos { + if sinfo.visited[*info.getNodeID()] { + continue + } if dht_firstCloserThanThird(info.getNodeID(), &res.dest, from.getNodeID()) { sinfo.toVisit = append(sinfo.toVisit, info) } @@ -91,14 +98,14 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.dest, sinfo.toVisit[j].getNodeID()) }) // Truncate to some maximum size - if len(sinfo.toVisit) > 16 { - sinfo.toVisit = sinfo.toVisit[:16] + if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] } } func (s *searches) doSearchStep(sinfo *searchInfo) { - if len(sinfo.toVisit) == 0 || time.Since(sinfo.time) > 6*time.Second { - // Dead end or timeout, do cleanup + if len(sinfo.toVisit) == 0 { + // Dead end, do cleanup delete(s.searches, *sinfo.dest) return } else { @@ -106,20 +113,36 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] s.core.dht.ping(next, sinfo.dest) + sinfo.visited[*next.getNodeID()] = true } } func (s *searches) continueSearch(sinfo *searchInfo) { - if time.Since(sinfo.time) < time.Second { + if time.Since(sinfo.time) < search_RETRY_TIME { return } sinfo.time = time.Now() s.doSearchStep(sinfo) + // In case the search dies, try to spawn another thread later + // Note that this will spawn multiple parallel searches as time passes + // Any that die aren't restarted, but a new one will start later + retryLater := func() { + newSearchInfo := s.searches[*sinfo.dest] + if newSearchInfo != sinfo { + return + } + s.continueSearch(sinfo) + } + go func() { + time.Sleep(search_RETRY_TIME) + s.core.router.admin <- retryLater + }() } func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { sinfo := s.createSearch(dest, mask) sinfo.toVisit = s.core.dht.lookup(dest, false) + sinfo.visited = make(map[NodeID]bool) return sinfo } From 45abfafbba629a89ffc6ad8c2302d64c7218468b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 16:33:58 -0500 Subject: [PATCH 5/7] value instead of pointer types for search dest/mask --- src/yggdrasil/search.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 88479d05..de386947 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -25,8 +25,8 @@ const search_MAX_SEARCH_SIZE = 16 const search_RETRY_TIME = time.Second type searchInfo struct { - dest *NodeID - mask *NodeID + dest NodeID + mask NodeID time time.Time packet []byte toVisit []*dhtInfo @@ -51,8 +51,8 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { } } info := searchInfo{ - dest: dest, - mask: mask, + dest: *dest, + mask: *mask, time: now.Add(-time.Second), } s.searches[*dest] = &info @@ -106,13 +106,13 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { func (s *searches) doSearchStep(sinfo *searchInfo) { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup - delete(s.searches, *sinfo.dest) + delete(s.searches, sinfo.dest) return } else { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] - s.core.dht.ping(next, sinfo.dest) + s.core.dht.ping(next, &sinfo.dest) sinfo.visited[*next.getNodeID()] = true } } @@ -127,7 +127,7 @@ func (s *searches) continueSearch(sinfo *searchInfo) { // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later retryLater := func() { - newSearchInfo := s.searches[*sinfo.dest] + newSearchInfo := s.searches[sinfo.dest] if newSearchInfo != sinfo { return } @@ -199,7 +199,7 @@ func (s *searches) sendSearch(info *searchInfo) { req := searchReq{ key: s.core.boxPub, coords: coords, - dest: *info.dest, + dest: info.dest, } info.time = time.Now() s.handleSearchReq(&req) From 3e1ac818548ca8af76c935b677aa1e8f95aaa741 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 16:39:34 -0500 Subject: [PATCH 6/7] allow searches to start with nodes further from the destination than ourself --- src/yggdrasil/search.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index de386947..588d2e02 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -141,7 +141,7 @@ func (s *searches) continueSearch(sinfo *searchInfo) { func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { sinfo := s.createSearch(dest, mask) - sinfo.toVisit = s.core.dht.lookup(dest, false) + sinfo.toVisit = s.core.dht.lookup(dest, true) sinfo.visited = make(map[NodeID]bool) return sinfo } From b9ea5350c699b34fe26ae5e8f684b6d0f7ba9b4c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Jun 2018 16:45:45 -0500 Subject: [PATCH 7/7] update search.go comments to describe the iterative approach --- src/yggdrasil/search.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 588d2e02..2aa3a858 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -4,17 +4,12 @@ package yggdrasil // The basic idea is as follows: // We may know a NodeID (with a mask) and want to connect -// We forward a searchReq packet through the dht -// The last person in the dht will respond with a searchRes -// If the responders nodeID is close enough to the requested key, it matches -// The "close enough" is handled by a bitmask, set when the request is sent -// For testing in the sim, it must match exactly -// For the real world, the mask would need to map it to the desired IPv6 -// This is also where we store the temporary keys used to send a request -// Would go in sessions, but can't open one without knowing perm key -// This is largely to avoid using an iterative DHT lookup approach -// The iterative parallel lookups from kad can skip over some DHT blackholes -// This hides bugs, which I don't want to do right now +// We begin a search by initializing a list of all nodes in our DHT, sorted by closest to the destination +// We then iteratively ping nodes from the search, marking each pinged node as visited +// We add any unvisited nodes from ping responses to the search, truncating to some maximum search size +// This stops when we either run out of nodes to ping (we hit a dead end where we can't make progress without going back), or we reach the destination +// A new search packet is sent immediately after receiving a response +// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) import "sort" import "time"