diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 9b2d0af5..8ea06596 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -413,10 +413,10 @@ func main() { } fmt.Println("Test") Util_testAddrIDMask() - //idxstore := makeStoreSquareGrid(4) + idxstore := makeStoreSquareGrid(4) //idxstore := makeStoreStar(256) //idxstore := loadGraph("misc/sim/hype-2016-09-19.list") - idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") + //idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") //idxstore := loadGraph("skitter") kstore := getKeyedStore(idxstore) //* diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 42d2b471..a987fbdf 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -35,6 +35,8 @@ func (c *Core) Init() { bpub, bpriv := newBoxKeys() spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) + c.router.start() + c.switchTable.start() } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 5e61d419..e59017a4 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -106,6 +106,7 @@ func (t *dht) handleReq(req *dhtReq) { } func (t *dht) handleRes(res *dhtRes) { + t.core.searches.handleDHTRes(res) reqs, isIn := t.reqs[res.Key] if !isIn { return diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 75d85d14..7934b78a 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -128,12 +128,12 @@ func (r *router) sendPacket(bs []byte) { } sinfo, isIn := r.core.searches.searches[*nodeID] if !isIn { - sinfo = r.core.searches.createSearch(nodeID, mask) + sinfo = r.core.searches.newIterSearch(nodeID, mask) } if packet != nil { sinfo.packet = packet } - r.core.searches.sendSearch(sinfo) + r.core.searches.continueSearch(sinfo) } var sinfo *sessionInfo var isIn bool diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 6d7216d0..2aa3a858 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -4,27 +4,28 @@ package yggdrasil // The basic idea is as follows: // We may know a NodeID (with a mask) and want to connect -// We forward a searchReq packet through the dht -// The last person in the dht will respond with a searchRes -// If the responders nodeID is close enough to the requested key, it matches -// The "close enough" is handled by a bitmask, set when the request is sent -// For testing in the sim, it must match exactly -// For the real world, the mask would need to map it to the desired IPv6 -// This is also where we store the temporary keys used to send a request -// Would go in sessions, but can't open one without knowing perm key -// This is largely to avoid using an iterative DHT lookup approach -// The iterative parallel lookups from kad can skip over some DHT blackholes -// This hides bugs, which I don't want to do right now +// We begin a search by initializing a list of all nodes in our DHT, sorted by closest to the destination +// We then iteratively ping nodes from the search, marking each pinged node as visited +// We add any unvisited nodes from ping responses to the search, truncating to some maximum search size +// This stops when we either run out of nodes to ping (we hit a dead end where we can't make progress without going back), or we reach the destination +// A new search packet is sent immediately after receiving a response +// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) +import "sort" import "time" //import "fmt" +const search_MAX_SEARCH_SIZE = 16 +const search_RETRY_TIME = time.Second + type searchInfo struct { - dest *NodeID - mask *NodeID - time time.Time - packet []byte + dest NodeID + mask NodeID + time time.Time + packet []byte + toVisit []*dhtInfo + visited map[NodeID]bool } type searches struct { @@ -45,8 +46,8 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { } } info := searchInfo{ - dest: dest, - mask: mask, + dest: *dest, + mask: *mask, time: now.Add(-time.Second), } s.searches[*dest] = &info @@ -55,6 +56,122 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { //////////////////////////////////////////////////////////////////////////////// +func (s *searches) handleDHTRes(res *dhtRes) { + sinfo, isIn := s.searches[res.Dest] + if !isIn || s.checkDHTRes(sinfo, res) { + // Either we don't recognize this search, or we just finished it + return + } else { + // Add to the search and continue + s.addToSearch(sinfo, res) + s.doSearchStep(sinfo) + } +} + +func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { + // Add responses to toVisit if closer to dest than the res node + from := dhtInfo{key: res.Key, coords: res.Coords} + for _, info := range res.Infos { + if sinfo.visited[*info.getNodeID()] { + continue + } + if dht_firstCloserThanThird(info.getNodeID(), &res.Dest, from.getNodeID()) { + sinfo.toVisit = append(sinfo.toVisit, info) + } + } + // Deduplicate + vMap := make(map[NodeID]*dhtInfo) + for _, info := range sinfo.toVisit { + vMap[*info.getNodeID()] = info + } + sinfo.toVisit = sinfo.toVisit[:0] + for _, info := range vMap { + sinfo.toVisit = append(sinfo.toVisit, info) + } + // Sort + sort.SliceStable(sinfo.toVisit, func(i, j int) bool { + return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.Dest, sinfo.toVisit[j].getNodeID()) + }) + // Truncate to some maximum size + if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] + } +} + +func (s *searches) doSearchStep(sinfo *searchInfo) { + if len(sinfo.toVisit) == 0 { + // Dead end, do cleanup + delete(s.searches, sinfo.dest) + return + } else { + // Send to the next search target + var next *dhtInfo + next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + s.core.dht.ping(next, &sinfo.dest) + sinfo.visited[*next.getNodeID()] = true + } +} + +func (s *searches) continueSearch(sinfo *searchInfo) { + if time.Since(sinfo.time) < search_RETRY_TIME { + return + } + sinfo.time = time.Now() + s.doSearchStep(sinfo) + // In case the search dies, try to spawn another thread later + // Note that this will spawn multiple parallel searches as time passes + // Any that die aren't restarted, but a new one will start later + retryLater := func() { + newSearchInfo := s.searches[sinfo.dest] + if newSearchInfo != sinfo { + return + } + s.continueSearch(sinfo) + } + go func() { + time.Sleep(search_RETRY_TIME) + s.core.router.admin <- retryLater + }() +} + +func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { + sinfo := s.createSearch(dest, mask) + sinfo.toVisit = s.core.dht.lookup(dest, true) + sinfo.visited = make(map[NodeID]bool) + return sinfo +} + +func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { + them := getNodeID(&res.Key) + var destMasked NodeID + var themMasked NodeID + for idx := 0; idx < NodeIDLen; idx++ { + destMasked[idx] = info.dest[idx] & info.mask[idx] + themMasked[idx] = them[idx] & info.mask[idx] + } + if themMasked != destMasked { + return false + } + // They match, so create a session and send a sessionRequest + sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key) + if !isIn { + sinfo = s.core.sessions.createSession(&res.Key) + _, isIn := s.core.sessions.getByTheirPerm(&res.Key) + if !isIn { + panic("This should never happen") + } + } + // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? + sinfo.coords = res.Coords + sinfo.packet = info.packet + s.core.sessions.ping(sinfo) + // Cleanup + delete(s.searches, res.Dest) + return true +} + +//////////////////////////////////////////////////////////////////////////////// + type searchReq struct { key boxPubKey // Who I am coords []byte // Where I am @@ -77,7 +194,7 @@ func (s *searches) sendSearch(info *searchInfo) { req := searchReq{ key: s.core.boxPub, coords: coords, - dest: *info.dest, + dest: info.dest, } info.time = time.Now() s.handleSearchReq(&req) diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 1f6f422c..71aa0b22 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -430,8 +430,8 @@ func (p *sessionPing) decode(bs []byte) bool { return false case pType != wire_SessionPing && pType != wire_SessionPong: return false - //p.sendPermPub used in top level (crypto), so skipped here -case !wire_chop_slice(p.Handle[:], &bs): + //p.sendPermPub used in top level (crypto), so skipped here + case !wire_chop_slice(p.Handle[:], &bs): return false case !wire_chop_slice(p.SendSesPub[:], &bs): return false