From 03a88fe30420b616022f0fd96495a126922cb572 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 14:48:07 -0500 Subject: [PATCH 01/20] Try using a chord-like DHT instead of a kad-like one, work in progress, but it compiles at least --- misc/sim/treesim.go | 2 +- src/yggdrasil/admin.go | 36 +-- src/yggdrasil/debug.go | 2 + src/yggdrasil/dht.go | 637 ++++++++++----------------------------- src/yggdrasil/router.go | 3 +- src/yggdrasil/search.go | 10 +- src/yggdrasil/session.go | 2 +- 7 files changed, 182 insertions(+), 510 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index c5c67e79..793ef219 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -386,7 +386,7 @@ func (n *Node) startTCP(listen string) { } func (n *Node) connectTCP(remoteAddr string) { - n.core.AddPeer(remoteAddr) + n.core.AddPeer(remoteAddr, remoteAddr) } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 9d3866f8..2446be51 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -554,26 +554,28 @@ func (a *admin) getData_getSwitchQueues() admin_nodeInfo { // getData_getDHT returns info from Core.dht for an admin response. func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo - now := time.Now() getDHT := func() { - for i := 0; i < a.core.dht.nBuckets(); i++ { - b := a.core.dht.getBucket(i) - getInfo := func(vs []*dhtInfo, isPeer bool) { - for _, v := range vs { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"ip", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", i}, - {"peer_only", isPeer}, - {"last_seen", int(now.Sub(v.recv).Seconds())}, + /* TODO fix this + now := time.Now() + for i := 0; i < a.core.dht.nBuckets(); i++ { + b := a.core.dht.getBucket(i) + getInfo := func(vs []*dhtInfo, isPeer bool) { + for _, v := range vs { + addr := *address_addrForNodeID(v.getNodeID()) + info := admin_nodeInfo{ + {"ip", net.IP(addr[:]).String()}, + {"coords", fmt.Sprint(v.coords)}, + {"bucket", i}, + {"peer_only", isPeer}, + {"last_seen", int(now.Sub(v.recv).Seconds())}, + } + infos = append(infos, info) + } } - infos = append(infos, info) + getInfo(b.other, false) + getInfo(b.peers, true) } - } - getInfo(b.other, false) - getInfo(b.peers, true) - } + */ } a.core.router.doAdmin(getDHT) return infos diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 892529b6..f614fece 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -230,11 +230,13 @@ func DEBUG_wire_encode_coords(coords []byte) []byte { func (c *Core) DEBUG_getDHTSize() int { total := 0 + /* FIXME for bidx := 0; bidx < c.dht.nBuckets(); bidx++ { b := c.dht.getBucket(bidx) total += len(b.peers) total += len(b.other) } + */ return total } diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 955ef839..068a4f37 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -1,38 +1,10 @@ package yggdrasil -/* - -This part has the (kademlia-like) distributed hash table - -It's used to look up coords for a NodeID - -Every node participates in the DHT, and the DHT stores no real keys/values -(Only the peer relationships / lookups are needed) - -This version is intentionally fragile, by being recursive instead of iterative -(it's also not parallel, as a result) -This is to make sure that DHT black holes are visible if they exist -(the iterative parallel approach tends to get around them sometimes) -I haven't seen this get stuck on blackholes, but I also haven't proven it can't -Slight changes *do* make it blackhole hard, bootstrapping isn't an easy problem - -*/ - import ( "sort" "time" ) -// Number of DHT buckets, equal to the number of bits in a NodeID. -// Note that, in practice, nearly all of these will be empty. -const dht_bucket_number = 8 * NodeIDLen - -// Number of nodes to keep in each DHT bucket. -// Additional entries may be kept for peers, for bootstrapping reasons, if they don't already have an entry in the bucket. -const dht_bucket_size = 2 - -// Number of responses to include in a lookup. -// If extras are given, they will be truncated from the response handler to prevent abuse. const dht_lookup_size = 16 // dhtInfo represents everything we know about a node in the DHT. @@ -41,11 +13,11 @@ type dhtInfo struct { nodeID_hidden *NodeID key boxPubKey coords []byte - send time.Time // When we last sent a message - recv time.Time // When we last received a message - pings int // Decide when to drop - throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute - bootstrapSend time.Time // The time checked/updated as part of throttle checks + send time.Time // When we last sent a message + recv time.Time // When we last received a message + //pings int // Decide when to drop + //throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute + //bootstrapSend time.Time // The time checked/updated as part of throttle checks } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -56,12 +28,6 @@ func (info *dhtInfo) getNodeID() *NodeID { return info.nodeID_hidden } -// The nodes we known in a bucket (a region of keyspace with a matching prefix of some length). -type bucket struct { - peers []*dhtInfo - other []*dhtInfo -} - // Request for a node to do a lookup. // Includes our key and coords so they can send a response back, and the destination NodeID we want to ask about. type dhtReq struct { @@ -74,42 +40,94 @@ type dhtReq struct { // Includes the key and coords of the node that's responding, and the destination they were asked about. // The main part is Infos []*dhtInfo, the lookup response. type dhtRes struct { - Key boxPubKey // key to respond to - Coords []byte // coords to respond to + Key boxPubKey // key of the sender + Coords []byte // coords of the sender Dest NodeID Infos []*dhtInfo // response } -// Information about a node, either taken from our table or from a lookup response. -// Used to schedule pings at a later time (they're throttled to 1/second for background maintenance traffic). -type dht_rumor struct { - info *dhtInfo - target *NodeID -} - // The main DHT struct. -// Includes a slice of buckets, to organize known nodes based on their region of keyspace. -// Also includes information about outstanding DHT requests and the rumor mill of nodes to ping at some point. type dht struct { - core *Core - nodeID NodeID - buckets_hidden [dht_bucket_number]bucket // Extra is for the self-bucket - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[boxPubKey]map[NodeID]time.Time - offset int - rumorMill []dht_rumor + core *Core + nodeID NodeID + table map[NodeID]*dhtInfo + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[boxPubKey]map[NodeID]time.Time + //rumorMill []dht_rumor } -// Initializes the DHT. func (t *dht) init(c *Core) { + // TODO t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) - t.reqs = make(map[boxPubKey]map[NodeID]time.Time) + t.reset() +} + +func (t *dht) reset() { + t.table = make(map[NodeID]*dhtInfo) +} + +func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { + return nil + var results []*dhtInfo + var successor *dhtInfo + sTarget := t.nodeID.next() + for infoID, info := range t.table { + if allowWorse || dht_ordered(&t.nodeID, &infoID, nodeID) { + results = append(results, info) + } else { + if successor == nil || dht_ordered(&sTarget, &infoID, successor.getNodeID()) { + successor = info + } + } + } + sort.SliceStable(results, func(i, j int) bool { + return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID) + }) + if successor != nil { + results = append([]*dhtInfo{successor}, results...) + } + if len(results) > dht_lookup_size { + results = results[:dht_lookup_size] + } + return results +} + +// Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now +func (t *dht) insert(info *dhtInfo) { + info.recv = time.Now() + if oldInfo, isIn := t.table[*info.getNodeID()]; isIn { + info.send = oldInfo.send + } else { + info.send = info.recv + } + t.table[*info.getNodeID()] = info +} + +// Return true if first/second/third are (partially) ordered correctly +// FIXME? maybe total ordering makes more sense +func dht_ordered(first, second, third *NodeID) bool { + var ordered bool + for idx := 0; idx < NodeIDLen; idx++ { + f, s, t := first[idx], second[idx], third[idx] + switch { + case f == s && s == t: + continue + case f <= s && s <= t: + ordered = true // nothing wrapped around 0 + case t <= f && f <= s: + ordered = true // 0 is between second and third + case s <= t && t <= f: + ordered = true // 0 is between first and second + } + break + } + return ordered } // Reads a request, performs a lookup, and responds. -// If the node that sent the request isn't in our DHT, but should be, then we add them. +// Update info about the node that sent the request. func (t *dht) handleReq(req *dhtReq) { // Send them what they asked for loc := t.core.switchTable.getLocator() @@ -129,11 +147,50 @@ func (t *dht) handleReq(req *dhtReq) { // For bootstrapping to work, we need to add these nodes to the table // Using insertIfNew, they can lie about coords, but searches will route around them // Using the mill would mean trying to block off the mill becomes an attack vector - t.insertIfNew(&info, false) + t.insert(&info) +} + +// Sends a lookup response to the specified node. +func (t *dht) sendRes(res *dhtRes, req *dhtReq) { + // Send a reply for a dhtReq + bs := res.encode() + shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key) + payload, nonce := boxSeal(shared, bs, nil) + p := wire_protoTrafficPacket{ + Coords: req.Coords, + ToKey: req.Key, + FromKey: t.core.boxPub, + Nonce: *nonce, + Payload: payload, + } + packet := p.encode() + t.core.router.out(packet) +} + +// Returns nodeID + 1 +func (nodeID NodeID) next() NodeID { + for idx := len(nodeID); idx >= 0; idx-- { + nodeID[idx] += 1 + if nodeID[idx] != 0 { + break + } + } + return nodeID +} + +// Returns nodeID - 1 +func (nodeID NodeID) prev() NodeID { + for idx := len(nodeID); idx >= 0; idx-- { + nodeID[idx] -= 1 + if nodeID[idx] != 0xff { + break + } + } + return nodeID } // Reads a lookup response, checks that we had sent a matching request, and processes the response info. -// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and adding the response info to the rumor mill. +// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { t.core.searches.handleDHTRes(res) reqs, isIn := t.reqs[res.Key] @@ -145,223 +202,36 @@ func (t *dht) handleRes(res *dhtRes) { return } delete(reqs, res.Dest) - now := time.Now() rinfo := dhtInfo{ - key: res.Key, - coords: res.Coords, - send: now, // Technically wrong but should be OK... - recv: now, - throttle: time.Second, - bootstrapSend: now, + key: res.Key, + coords: res.Coords, } - // If they're already in the table, then keep the correct send time - bidx, isOK := t.getBucketIndex(rinfo.getNodeID()) - if !isOK { - return - } - b := t.getBucket(bidx) - for _, oldinfo := range b.peers { - if oldinfo.key == rinfo.key { - rinfo.send = oldinfo.send - rinfo.throttle = oldinfo.throttle - rinfo.bootstrapSend = oldinfo.bootstrapSend + t.insert(&rinfo) // Or at the end, after checking successor/predecessor? + var successor *dhtInfo + var predecessor *dhtInfo + for infoID, info := range t.table { + // Get current successor and predecessor + if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info } - } - for _, oldinfo := range b.other { - if oldinfo.key == rinfo.key { - rinfo.send = oldinfo.send - rinfo.throttle = oldinfo.throttle - rinfo.bootstrapSend = oldinfo.bootstrapSend + if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { + predecessor = info } } - // Insert into table - t.insert(&rinfo, false) - if res.Dest == *rinfo.getNodeID() { - return - } // No infinite recursions - if len(res.Infos) > dht_lookup_size { - // Ignore any "extra" lookup results - res.Infos = res.Infos[:dht_lookup_size] - } for _, info := range res.Infos { - if dht_firstCloserThanThird(info.getNodeID(), &res.Dest, rinfo.getNodeID()) { - t.addToMill(info, info.getNodeID()) - } - } -} - -// Does a DHT lookup and returns the results, sorted in ascending order of distance from the destination. -func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo { - // FIXME this allocates a bunch, sorts, and keeps the part it likes - // It would be better to only track the part it likes to begin with - addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo { - for _, info := range infos { - if info == nil { - panic("Should never happen!") - } - if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { - res = append(res, info) - } - } - return res - } - var res []*dhtInfo - for bidx := 0; bidx < t.nBuckets(); bidx++ { - b := t.getBucket(bidx) - res = addInfos(res, b.peers) - res = addInfos(res, b.other) - } - doSort := func(infos []*dhtInfo) { - less := func(i, j int) bool { - return dht_firstCloserThanThird(infos[i].getNodeID(), - nodeID, - infos[j].getNodeID()) - } - sort.SliceStable(infos, less) - } - doSort(res) - if len(res) > dht_lookup_size { - res = res[:dht_lookup_size] - } - return res -} - -// Gets the bucket for a specified matching prefix length. -func (t *dht) getBucket(bidx int) *bucket { - return &t.buckets_hidden[bidx] -} - -// Lists the number of buckets. -func (t *dht) nBuckets() int { - return len(t.buckets_hidden) -} - -// Inserts a node into the DHT if they meet certain requirements. -// In particular, they must either be a peer that's not already in the DHT, or else be someone we should insert into the DHT (see: shouldInsert). -func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { - // Insert if no "other" entry already exists - nodeID := info.getNodeID() - bidx, isOK := t.getBucketIndex(nodeID) - if !isOK { - return - } - b := t.getBucket(bidx) - if (isPeer && !b.containsOther(info)) || t.shouldInsert(info) { - // We've never heard this node before - // TODO is there a better time than "now" to set send/recv to? - // (Is there another "natural" choice that bootstraps faster?) - info.send = time.Now() - info.recv = info.send - t.insert(info, isPeer) - } -} - -// Adds a node to the DHT, possibly removing another node in the process. -func (t *dht) insert(info *dhtInfo, isPeer bool) { - // First update the time on this info - info.recv = time.Now() - // Get the bucket for this node - nodeID := info.getNodeID() - bidx, isOK := t.getBucketIndex(nodeID) - if !isOK { - return - } - b := t.getBucket(bidx) - if !isPeer && !b.containsOther(info) { - // This is a new entry, give it an old age so it's pinged sooner - // This speeds up bootstrapping - info.recv = info.recv.Add(-time.Hour) - } - if isPeer || info.throttle > time.Minute { - info.throttle = time.Minute - } - // First drop any existing entry from the bucket - b.drop(&info.key) - // Now add to the *end* of the bucket - if isPeer { - // TODO make sure we don't duplicate peers in b.other too - b.peers = append(b.peers, info) - return - } - b.other = append(b.other, info) - // Shrink from the *front* to requied size - for len(b.other) > dht_bucket_size { - b.other = b.other[1:] - } -} - -// Gets the bucket index for the bucket where we would put the given NodeID. -func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) { - for bidx := 0; bidx < t.nBuckets(); bidx++ { - them := nodeID[bidx/8] & (0x80 >> byte(bidx%8)) - me := t.nodeID[bidx/8] & (0x80 >> byte(bidx%8)) - if them != me { - return bidx, true - } - } - return t.nBuckets(), false -} - -// Helper called by containsPeer, containsOther, and contains. -// Returns true if a node with the same ID *and coords* is already in the given part of the bucket. -func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool { - // Compares if key and coords match - if newInfo == nil { - panic("Should never happen") - } - for _, info := range infos { - if info == nil { - panic("Should never happen") - } - if info.key != newInfo.key { + if *info.getNodeID() == t.nodeID { continue + } // Skip self + // Send a request to all better successors or predecessors + // We could try sending to only the best, but then packet loss matters more + if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { + // ping } - if len(info.coords) != len(newInfo.coords) { - continue - } - match := true - for idx := 0; idx < len(info.coords); idx++ { - if info.coords[idx] != newInfo.coords[idx] { - match = false - break - } - } - if match { - return true + if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { + // ping } } - return false -} - -// Calls bucket_check over the bucket's peers infos. -func (b *bucket) containsPeer(info *dhtInfo) bool { - return dht_bucket_check(info, b.peers) -} - -// Calls bucket_check over the bucket's other info. -func (b *bucket) containsOther(info *dhtInfo) bool { - return dht_bucket_check(info, b.other) -} - -// returns containsPeer || containsOther -func (b *bucket) contains(info *dhtInfo) bool { - return b.containsPeer(info) || b.containsOther(info) -} - -// Removes a node with the corresponding key, if any, from a bucket. -func (b *bucket) drop(key *boxPubKey) { - clean := func(infos []*dhtInfo) []*dhtInfo { - cleaned := infos[:0] - for _, info := range infos { - if info.key == *key { - continue - } - cleaned = append(cleaned, info) - } - return cleaned - } - b.peers = clean(b.peers) - b.other = clean(b.other) + // TODO add everyting else to a rumor mill for later use? (when/how?) } // Sends a lookup request to the specified node. @@ -390,75 +260,10 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { reqsToDest[req.Dest] = time.Now() } -// Sends a lookup response to the specified node. -func (t *dht) sendRes(res *dhtRes, req *dhtReq) { - // Send a reply for a dhtReq - bs := res.encode() - shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key) - payload, nonce := boxSeal(shared, bs, nil) - p := wire_protoTrafficPacket{ - Coords: req.Coords, - ToKey: req.Key, - FromKey: t.core.boxPub, - Nonce: *nonce, - Payload: payload, - } - packet := p.encode() - t.core.router.out(packet) -} - -// Returns true of a bucket contains no peers and no other nodes. -func (b *bucket) isEmpty() bool { - return len(b.peers)+len(b.other) == 0 -} - -// Gets the next node that should be pinged from the bucket. -// There's a cooldown of 6 seconds between ping attempts for each node, to give them time to respond. -// It returns the least recently pinged node, subject to that send cooldown. -func (b *bucket) nextToPing() *dhtInfo { - // Check the nodes in the bucket - // Return whichever one responded least recently - // Delay of 6 seconds between pinging the same node - // Gives them time to respond - // And time between traffic loss from short term congestion in the network - var toPing *dhtInfo - update := func(infos []*dhtInfo) { - for _, next := range infos { - if time.Since(next.send) < 6*time.Second { - continue - } - if toPing == nil || next.recv.Before(toPing.recv) { - toPing = next - } - } - } - update(b.peers) - update(b.other) - return toPing -} - -// Returns a useful target address to ask about for pings. -// Equal to the our node's ID, except for exactly 1 bit at the bucket index. -func (t *dht) getTarget(bidx int) *NodeID { - targetID := t.nodeID - targetID[bidx/8] ^= 0x80 >> byte(bidx%8) - return &targetID -} - -// Sends a ping to a node, or removes the node if it has failed to respond to too many pings. -// If target is nil, we will ask the node about our own NodeID. func (t *dht) ping(info *dhtInfo, target *NodeID) { - if info.pings > 2 { - bidx, isOK := t.getBucketIndex(info.getNodeID()) - if !isOK { - panic("This should never happen") - } - b := t.getBucket(bidx) - b.drop(&info.key) - return - } + // Creates a req for the node at dhtInfo, asking them about the target (if one is given) or themself (if no target is given) if target == nil { - target = &t.nodeID + target = info.getNodeID() } loc := t.core.switchTable.getLocator() coords := loc.getCoords() @@ -467,160 +272,24 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { Coords: coords, Dest: *target, } - info.pings++ info.send = time.Now() t.sendReq(&req, info) } -// Adds a node info and target to the rumor mill. -// The node will be asked about the target at a later point, if doing so would still be useful at the time. -func (t *dht) addToMill(info *dhtInfo, target *NodeID) { - rumor := dht_rumor{ - info: info, - target: target, - } - t.rumorMill = append(t.rumorMill, rumor) -} - -// Regular periodic maintenance. -// If the mill is empty, it adds two pings to the rumor mill. -// The first is to the node that responded least recently, provided that it's been at least 1 minute, to make sure we eventually detect and remove unresponsive nodes. -// The second is used for bootstrapping, and attempts to fill some bucket, iterating over buckets and resetting after it hits the last non-empty one. -// If the mill is not empty, it pops nodes from the mill until it finds one that would be useful to ping (see: shouldInsert), and then pings it. func (t *dht) doMaintenance() { - // First clean up reqs - for key, reqs := range t.reqs { - for target, timeout := range reqs { - if time.Since(timeout) > time.Minute { - delete(reqs, target) - } - } - if len(reqs) == 0 { - delete(t.reqs, key) + // Ping successor, asking for their predecessor, and clean up old/expired info + var successor *dhtInfo + now := time.Now() + for infoID, info := range t.table { + if now.Sub(info.recv) > time.Minute { + delete(t.table, infoID) + } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info } } - if len(t.rumorMill) == 0 { - // Ping the least recently contacted node - // This is to make sure we eventually notice when someone times out - var oldest *dhtInfo - last := 0 - for bidx := 0; bidx < t.nBuckets(); bidx++ { - b := t.getBucket(bidx) - if !b.isEmpty() { - last = bidx - toPing := b.nextToPing() - if toPing == nil { - continue - } // We've recently pinged everyone in b - if oldest == nil || toPing.recv.Before(oldest.recv) { - oldest = toPing - } - } - } - if oldest != nil && time.Since(oldest.recv) > time.Minute { - // Ping the oldest node in the DHT, but don't ping nodes that have been checked within the last minute - t.addToMill(oldest, nil) - } - // Refresh buckets - if t.offset > last { - t.offset = 0 - } - target := t.getTarget(t.offset) - func() { - closer := t.lookup(target, false) - for _, info := range closer { - // Throttled ping of a node that's closer to the destination - if time.Since(info.recv) > info.throttle { - t.addToMill(info, target) - t.offset++ - info.bootstrapSend = time.Now() - info.throttle *= 2 - if info.throttle > time.Minute { - info.throttle = time.Minute - } - return - } - } - if len(closer) == 0 { - // If we don't know of anyone closer at all, then there's a hole in our dht - // Ping the closest node we know and ignore the throttle, to try to fill it - for _, info := range t.lookup(target, true) { - t.addToMill(info, target) - t.offset++ - return - } - } - }() - //t.offset++ - } - for len(t.rumorMill) > 0 { - var rumor dht_rumor - rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:] - if rumor.target == rumor.info.getNodeID() { - // Note that the above is a pointer comparison, and target can be nil - // This is only for adding new nodes (learned from other lookups) - // It only makes sense to ping if the node isn't already in the table - if !t.shouldInsert(rumor.info) { - continue - } - } - t.ping(rumor.info, rumor.target) - break - } -} - -// Returns true if it would be worth pinging the specified node. -// This requires that the bucket doesn't already contain the node, and that either the bucket isn't full yet or the node is closer to us in keyspace than some other node in that bucket. -func (t *dht) shouldInsert(info *dhtInfo) bool { - bidx, isOK := t.getBucketIndex(info.getNodeID()) - if !isOK { - return false - } - b := t.getBucket(bidx) - if b.containsOther(info) { - return false - } - if len(b.other) < dht_bucket_size { - return true - } - for _, other := range b.other { - if dht_firstCloserThanThird(info.getNodeID(), &t.nodeID, other.getNodeID()) { - return true - } - } - return false -} - -// Returns true if the keyspace distance between the first and second node is smaller than the keyspace distance between the second and third node. -func dht_firstCloserThanThird(first *NodeID, - second *NodeID, - third *NodeID) bool { - for idx := 0; idx < NodeIDLen; idx++ { - f := first[idx] ^ second[idx] - t := third[idx] ^ second[idx] - if f == t { - continue - } - return f < t - } - return false -} - -// Resets the DHT in response to coord changes. -// This empties all buckets, resets the bootstrapping cycle to 0, and empties the rumor mill. -// It adds all old "other" node info to the rumor mill, so they'll be pinged quickly. -// If those nodes haven't also changed coords, then this is a relatively quick way to notify those nodes of our new coords and re-add them to our own DHT if they respond. -func (t *dht) reset() { - // This is mostly so bootstrapping will reset to resend coords into the network - t.offset = 0 - t.rumorMill = nil // reset mill - for _, b := range t.buckets_hidden { - b.peers = b.peers[:0] - for _, info := range b.other { - // Add other nodes to the rumor mill so they'll be pinged soon - // This will hopefully tell them our coords and re-learn theirs quickly if they haven't changed - t.addToMill(info, info.getNodeID()) - } - b.other = b.other[:0] + if successor != nil && + now.Sub(successor.recv) > 30*time.Second && + now.Sub(successor.send) > 6*time.Second { + t.ping(successor, nil) } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d2a8c43b..1027eabd 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -91,8 +91,7 @@ func (r *router) mainLoop() { case p := <-r.send: r.sendPacket(p) case info := <-r.core.dht.peers: - r.core.dht.insertIfNew(info, false) // Insert as a normal node - r.core.dht.insertIfNew(info, true) // Insert as a peer + r.core.dht.insert(info) case <-r.reset: r.core.sessions.resetInits() r.core.dht.reset() diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 1b72a63f..1be18bae 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -92,7 +92,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { if sinfo.visited[*info.getNodeID()] { continue } - if dht_firstCloserThanThird(info.getNodeID(), &res.Dest, from.getNodeID()) { + if dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { sinfo.toVisit = append(sinfo.toVisit, info) } } @@ -107,7 +107,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { } // Sort sort.SliceStable(sinfo.toVisit, func(i, j int) bool { - return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.Dest, sinfo.toVisit[j].getNodeID()) + return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) }) // Truncate to some maximum size if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { @@ -126,10 +126,10 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] - var oldPings int - oldPings, next.pings = next.pings, 0 + //var oldPings int + //oldPings, next.pings = next.pings, 0 s.core.dht.ping(next, &sinfo.dest) - next.pings = oldPings // Don't evict a node for searching with it too much + //next.pings = oldPings // Don't evict a node for searching with it too much sinfo.visited[*next.getNodeID()] = true } } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 0bc27a12..662b4aea 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -358,7 +358,7 @@ func (ss *sessions) getSharedKey(myPriv *boxPrivKey, return skey } // First do some cleanup - const maxKeys = dht_bucket_number * dht_bucket_size + const maxKeys = 1024 for key := range ss.permShared { // Remove a random key until the store is small enough if len(ss.permShared) < maxKeys { From 1720dff4769dea7e8572b5cfe6bdc8182da02eb3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 15:21:40 -0500 Subject: [PATCH 02/20] add some debug output and get things to start working in the sim --- src/yggdrasil/debug.go | 12 ++++-------- src/yggdrasil/dht.go | 27 ++++++++++++++++++--------- src/yggdrasil/search.go | 2 +- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f614fece..638bd8f4 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -229,14 +229,10 @@ func DEBUG_wire_encode_coords(coords []byte) []byte { // DHT, via core func (c *Core) DEBUG_getDHTSize() int { - total := 0 - /* FIXME - for bidx := 0; bidx < c.dht.nBuckets(); bidx++ { - b := c.dht.getBucket(bidx) - total += len(b.peers) - total += len(b.other) - } - */ + var total int + c.router.doAdmin(func() { + total = len(c.dht.table) + }) return total } diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 068a4f37..ea614b07 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -1,6 +1,7 @@ package yggdrasil import ( + "fmt" "sort" "time" ) @@ -65,11 +66,11 @@ func (t *dht) init(c *Core) { } func (t *dht) reset() { + t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) } func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { - return nil var results []*dhtInfo var successor *dhtInfo sTarget := t.nodeID.next() @@ -96,6 +97,11 @@ func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { // Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now func (t *dht) insert(info *dhtInfo) { + if *info.getNodeID() == t.nodeID { + // This shouldn't happen, but don't crash or add it in case it does + return + panic("FIXME") + } info.recv = time.Now() if oldInfo, isIn := t.table[*info.getNodeID()]; isIn { info.send = oldInfo.send @@ -139,14 +145,12 @@ func (t *dht) handleReq(req *dhtReq) { Infos: t.lookup(&req.Dest, false), } t.sendRes(&res, req) - // Also (possibly) add them to our DHT + // Also add them to our DHT info := dhtInfo{ key: req.Key, coords: req.Coords, } // For bootstrapping to work, we need to add these nodes to the table - // Using insertIfNew, they can lie about coords, but searches will route around them - // Using the mill would mean trying to block off the mill becomes an attack vector t.insert(&info) } @@ -169,7 +173,7 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { // Returns nodeID + 1 func (nodeID NodeID) next() NodeID { - for idx := len(nodeID); idx >= 0; idx-- { + for idx := len(nodeID) - 1; idx >= 0; idx-- { nodeID[idx] += 1 if nodeID[idx] != 0 { break @@ -180,7 +184,7 @@ func (nodeID NodeID) next() NodeID { // Returns nodeID - 1 func (nodeID NodeID) prev() NodeID { - for idx := len(nodeID); idx >= 0; idx-- { + for idx := len(nodeID) - 1; idx >= 0; idx-- { nodeID[idx] -= 1 if nodeID[idx] != 0xff { break @@ -222,13 +226,19 @@ func (t *dht) handleRes(res *dhtRes) { if *info.getNodeID() == t.nodeID { continue } // Skip self + if _, isIn := t.table[*info.getNodeID()]; isIn { + // TODO? don't skip if coords are different? + continue + } // Send a request to all better successors or predecessors // We could try sending to only the best, but then packet loss matters more if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - // ping + t.ping(info, &t.nodeID) + fmt.Println("pinging new successor", t.nodeID[:4], info.getNodeID()[:4], successor) } if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { - // ping + t.ping(info, &t.nodeID) + fmt.Println("pinging new predecessor", t.nodeID[:4], info.getNodeID()[:4], predecessor) } } // TODO add everyting else to a rumor mill for later use? (when/how?) @@ -288,7 +298,6 @@ func (t *dht) doMaintenance() { } } if successor != nil && - now.Sub(successor.recv) > 30*time.Second && now.Sub(successor.send) > 6*time.Second { t.ping(successor, nil) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 1be18bae..3f7ff388 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -89,7 +89,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} for _, info := range res.Infos { - if sinfo.visited[*info.getNodeID()] { + if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } if dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { From 02f0611ddeb6d92960a188d980ffbce756ea007f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 16:27:01 -0500 Subject: [PATCH 03/20] more debugging --- misc/sim/treesim.go | 4 ++-- src/yggdrasil/debug.go | 1 + src/yggdrasil/dht.go | 35 ++++++++++++++++++++++++++++++----- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 793ef219..30ac2835 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -415,10 +415,10 @@ func main() { } fmt.Println("Test") Util_testAddrIDMask() - idxstore := makeStoreSquareGrid(4) + //idxstore := makeStoreSquareGrid(4) //idxstore := makeStoreStar(256) //idxstore := loadGraph("misc/sim/hype-2016-09-19.list") - //idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") + idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") //idxstore := loadGraph("skitter") kstore := getKeyedStore(idxstore) //* diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 638bd8f4..c71c2e5e 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -233,6 +233,7 @@ func (c *Core) DEBUG_getDHTSize() int { c.router.doAdmin(func() { total = len(c.dht.table) }) + fmt.Println("DEBUG_getDHTSize():", total) return total } diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index ea614b07..55121be1 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -66,6 +66,7 @@ func (t *dht) init(c *Core) { } func (t *dht) reset() { + fmt.Println("Resetting table:", t.nodeID) t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) } @@ -98,7 +99,7 @@ func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { // Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now func (t *dht) insert(info *dhtInfo) { if *info.getNodeID() == t.nodeID { - // This shouldn't happen, but don't crash or add it in case it does + // This shouldn't happen, but don't add it in case it does return panic("FIXME") } @@ -152,6 +153,7 @@ func (t *dht) handleReq(req *dhtReq) { } // For bootstrapping to work, we need to add these nodes to the table t.insert(&info) + info.send = info.send.Add(-time.Minute) } // Sends a lookup response to the specified node. @@ -234,11 +236,19 @@ func (t *dht) handleRes(res *dhtRes) { // We could try sending to only the best, but then packet loss matters more if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { t.ping(info, &t.nodeID) - fmt.Println("pinging new successor", t.nodeID[:4], info.getNodeID()[:4], successor) + if successor != nil { + fmt.Println("pinging better successor", t.nodeID[:4], info.getNodeID()[:4], successor.getNodeID()[:4], len(t.table)) + } else { + fmt.Println("pinging new successor", t.nodeID[:4], info.getNodeID()[:4], successor) + } } if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { t.ping(info, &t.nodeID) - fmt.Println("pinging new predecessor", t.nodeID[:4], info.getNodeID()[:4], predecessor) + if predecessor != nil { + fmt.Println("pinging better predecessor", t.nodeID[:4], info.getNodeID()[:4], predecessor.getNodeID()[:4], len(t.table)) + } else { + fmt.Println("pinging new predecessor", t.nodeID[:4], info.getNodeID()[:4]) + } } } // TODO add everyting else to a rumor mill for later use? (when/how?) @@ -290,15 +300,30 @@ func (t *dht) doMaintenance() { // Ping successor, asking for their predecessor, and clean up old/expired info var successor *dhtInfo now := time.Now() + size := len(t.table) for infoID, info := range t.table { + /* + if now.Sub(info.recv) > time.Minute { + delete(t.table, infoID) + } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info + } + */ + if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info + } if now.Sub(info.recv) > time.Minute { delete(t.table, infoID) - } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info } } if successor != nil && now.Sub(successor.send) > 6*time.Second { t.ping(successor, nil) } + if successor != nil && t.table[*successor.getNodeID()] == nil { + fmt.Println("DEBUG: successor timed out:", t.nodeID[:4], successor.getNodeID()[:4]) + } + if len(t.table) != size { + fmt.Println("DEBUG: timeouts:", t.nodeID[:4], size, len(t.table)) + } } From 5a85d3515d29febe5586c66cc58583d5f8a1b5a6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 17:32:54 -0500 Subject: [PATCH 04/20] cleanup --- misc/sim/treesim.go | 4 ++-- src/yggdrasil/dht.go | 38 ++++---------------------------------- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 30ac2835..793ef219 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -415,10 +415,10 @@ func main() { } fmt.Println("Test") Util_testAddrIDMask() - //idxstore := makeStoreSquareGrid(4) + idxstore := makeStoreSquareGrid(4) //idxstore := makeStoreStar(256) //idxstore := loadGraph("misc/sim/hype-2016-09-19.list") - idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") + //idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") //idxstore := loadGraph("skitter") kstore := getKeyedStore(idxstore) //* diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 55121be1..5b43d066 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -1,7 +1,6 @@ package yggdrasil import ( - "fmt" "sort" "time" ) @@ -16,9 +15,6 @@ type dhtInfo struct { coords []byte send time.Time // When we last sent a message recv time.Time // When we last received a message - //pings int // Decide when to drop - //throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute - //bootstrapSend time.Time // The time checked/updated as part of throttle checks } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -54,7 +50,6 @@ type dht struct { table map[NodeID]*dhtInfo peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[boxPubKey]map[NodeID]time.Time - //rumorMill []dht_rumor } func (t *dht) init(c *Core) { @@ -66,7 +61,7 @@ func (t *dht) init(c *Core) { } func (t *dht) reset() { - fmt.Println("Resetting table:", t.nodeID) + //fmt.Println("Resetting table:", t.nodeID) t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) } @@ -99,7 +94,7 @@ func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { // Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now func (t *dht) insert(info *dhtInfo) { if *info.getNodeID() == t.nodeID { - // This shouldn't happen, but don't add it in case it does + // This shouldn't happen, but don't add it if it does return panic("FIXME") } @@ -236,19 +231,9 @@ func (t *dht) handleRes(res *dhtRes) { // We could try sending to only the best, but then packet loss matters more if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { t.ping(info, &t.nodeID) - if successor != nil { - fmt.Println("pinging better successor", t.nodeID[:4], info.getNodeID()[:4], successor.getNodeID()[:4], len(t.table)) - } else { - fmt.Println("pinging new successor", t.nodeID[:4], info.getNodeID()[:4], successor) - } } if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { t.ping(info, &t.nodeID) - if predecessor != nil { - fmt.Println("pinging better predecessor", t.nodeID[:4], info.getNodeID()[:4], predecessor.getNodeID()[:4], len(t.table)) - } else { - fmt.Println("pinging new predecessor", t.nodeID[:4], info.getNodeID()[:4]) - } } } // TODO add everyting else to a rumor mill for later use? (when/how?) @@ -300,30 +285,15 @@ func (t *dht) doMaintenance() { // Ping successor, asking for their predecessor, and clean up old/expired info var successor *dhtInfo now := time.Now() - size := len(t.table) for infoID, info := range t.table { - /* - if now.Sub(info.recv) > time.Minute { - delete(t.table, infoID) - } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } - */ - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } if now.Sub(info.recv) > time.Minute { delete(t.table, infoID) + } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info } } if successor != nil && now.Sub(successor.send) > 6*time.Second { t.ping(successor, nil) } - if successor != nil && t.table[*successor.getNodeID()] == nil { - fmt.Println("DEBUG: successor timed out:", t.nodeID[:4], successor.getNodeID()[:4]) - } - if len(t.table) != size { - fmt.Println("DEBUG: timeouts:", t.nodeID[:4], size, len(t.table)) - } } From f3ec8c5b37d718b1dee249cfa8e528e256bac1e2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 17:58:54 -0500 Subject: [PATCH 05/20] fix admin dht function, more cleanup, and slowly throttle back dht traffic when idle --- src/yggdrasil/admin.go | 38 +++++++++++++++++--------------------- src/yggdrasil/dht.go | 7 +++++++ src/yggdrasil/search.go | 3 --- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 2446be51..cfbef94c 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -555,27 +555,23 @@ func (a *admin) getData_getSwitchQueues() admin_nodeInfo { func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo getDHT := func() { - /* TODO fix this - now := time.Now() - for i := 0; i < a.core.dht.nBuckets(); i++ { - b := a.core.dht.getBucket(i) - getInfo := func(vs []*dhtInfo, isPeer bool) { - for _, v := range vs { - addr := *address_addrForNodeID(v.getNodeID()) - info := admin_nodeInfo{ - {"ip", net.IP(addr[:]).String()}, - {"coords", fmt.Sprint(v.coords)}, - {"bucket", i}, - {"peer_only", isPeer}, - {"last_seen", int(now.Sub(v.recv).Seconds())}, - } - infos = append(infos, info) - } - } - getInfo(b.other, false) - getInfo(b.peers, true) - } - */ + now := time.Now() + var dhtInfos []*dhtInfo + for _, v := range a.core.dht.table { + dhtInfos = append(dhtInfos, v) + } + sort.SliceStable(dhtInfos, func(i, j int) bool { + return dht_ordered(&a.core.dht.nodeID, dhtInfos[i].getNodeID(), dhtInfos[j].getNodeID()) + }) + for _, v := range dhtInfos { + addr := *address_addrForNodeID(v.getNodeID()) + info := admin_nodeInfo{ + {"ip", net.IP(addr[:]).String()}, + {"coords", fmt.Sprint(v.coords)}, + {"last_seen", int(now.Sub(v.recv).Seconds())}, + } + infos = append(infos, info) + } } a.core.router.doAdmin(getDHT) return infos diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 5b43d066..0516aa5b 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -15,6 +15,7 @@ type dhtInfo struct { coords []byte send time.Time // When we last sent a message recv time.Time // When we last received a message + throttle time.Duration } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -101,9 +102,14 @@ func (t *dht) insert(info *dhtInfo) { info.recv = time.Now() if oldInfo, isIn := t.table[*info.getNodeID()]; isIn { info.send = oldInfo.send + info.throttle = oldInfo.throttle } else { info.send = info.recv } + info.throttle += time.Second + if info.throttle > 30*time.Second { + info.throttle = 30 * time.Second + } t.table[*info.getNodeID()] = info } @@ -293,6 +299,7 @@ func (t *dht) doMaintenance() { } } if successor != nil && + now.Sub(successor.recv) > successor.throttle && now.Sub(successor.send) > 6*time.Second { t.ping(successor, nil) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 3f7ff388..dd6b09cf 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -126,10 +126,7 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] - //var oldPings int - //oldPings, next.pings = next.pings, 0 s.core.dht.ping(next, &sinfo.dest) - //next.pings = oldPings // Don't evict a node for searching with it too much sinfo.visited[*next.getNodeID()] = true } } From 63d6ab425124537de9a671bc17b0323ac85c97c3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 18:12:34 -0500 Subject: [PATCH 06/20] more cleanup, comments, and dht reset() changes --- src/yggdrasil/dht.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 0516aa5b..6682eccf 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -53,20 +53,42 @@ type dht struct { reqs map[boxPubKey]map[NodeID]time.Time } +// Initializes the DHT func (t *dht) init(c *Core) { - // TODO t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) t.reset() } +// Resets the DHT in response to coord changes +// This empties all info from the DHT and drops outstanding requests +// It sends a ping to the old successor and predecessor, in case they're still around func (t *dht) reset() { - //fmt.Println("Resetting table:", t.nodeID) + var successor *dhtInfo + var predecessor *dhtInfo + for infoID, info := range t.table { + // Get current successor and predecessor + if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = info + } + if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { + predecessor = info + } + } t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) + if successor != nil { + t.ping(successor, &t.nodeID) + } + if predecessor != nil { + t.ping(predecessor, &t.nodeID) + } } +// Does a DHT lookup and returns up to dht_lookup_size results +// If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches +// If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node) func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { var results []*dhtInfo var successor *dhtInfo From d851d9afe7d29d1bd13cfc23946f022f77c0a55a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 18:31:11 -0500 Subject: [PATCH 07/20] add max pings before timing out a successor --- src/yggdrasil/dht.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 6682eccf..4ac425c9 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -16,6 +16,7 @@ type dhtInfo struct { send time.Time // When we last sent a message recv time.Time // When we last received a message throttle time.Duration + pings int // Time out if at least 3 consecutive maintenance pings drop } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -314,7 +315,7 @@ func (t *dht) doMaintenance() { var successor *dhtInfo now := time.Now() for infoID, info := range t.table { - if now.Sub(info.recv) > time.Minute { + if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { successor = info @@ -324,5 +325,6 @@ func (t *dht) doMaintenance() { now.Sub(successor.recv) > successor.throttle && now.Sub(successor.send) > 6*time.Second { t.ping(successor, nil) + successor.pings++ } } From 3dbffae99f7aee6231e4a0e7294c1afac10b9454 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 19:09:25 -0500 Subject: [PATCH 08/20] add search for successor, via parent, to the dht maintenance cycle --- src/yggdrasil/dht.go | 33 ++++++++++++++++++++++++++++++++- src/yggdrasil/switch.go | 2 +- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 4ac425c9..da3ed75a 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -52,6 +52,7 @@ type dht struct { table map[NodeID]*dhtInfo peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[boxPubKey]map[NodeID]time.Time + search time.Time } // Initializes the DHT @@ -79,6 +80,7 @@ func (t *dht) reset() { } t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) + t.search = time.Now().Add(-time.Minute) if successor != nil { t.ping(successor, &t.nodeID) } @@ -323,8 +325,37 @@ func (t *dht) doMaintenance() { } if successor != nil && now.Sub(successor.recv) > successor.throttle && - now.Sub(successor.send) > 6*time.Second { + now.Sub(successor.send) > 3*time.Second { t.ping(successor, nil) successor.pings++ + if now.Sub(t.search) > time.Minute { + // Start a search for our successor, beginning at this node's parent + // This should (hopefully) help bootstrap + t.core.switchTable.mutex.RLock() + parentPort := t.core.switchTable.parent + t.core.switchTable.mutex.RUnlock() + ports := t.core.peers.getPorts() + if parent, isIn := ports[parentPort]; isIn { + t.search = now + target := successor.getNodeID().prev() + sinfo, isIn := t.core.searches.searches[target] + if !isIn { + var mask NodeID + for idx := range mask { + mask[idx] = 0xff + } + sinfo = t.core.searches.newIterSearch(&target, &mask) + toVisit := sinfo.toVisit + parentNodeID := getNodeID(&parent.box) + for _, ninfo := range toVisit { + if *ninfo.getNodeID() == *parentNodeID { + toVisit = append(toVisit, ninfo) + } + } + sinfo.toVisit = toVisit + } + t.core.searches.continueSearch(sinfo) + } + } } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 72f17ed2..e877880f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -158,9 +158,9 @@ type switchTable struct { core *Core key sigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated - parent switchPort // Port of whatever peer is our parent, or self if we're root drop map[sigPubKey]int64 // Tstamp associated with a dropped root mutex sync.RWMutex // Lock for reads/writes of switchData + parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // updater atomic.Value // *sync.Once table atomic.Value // lookupTable From 8825494d5964aa96336ab0b0fe68cbe4b1793009 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 20:11:32 -0500 Subject: [PATCH 09/20] remove maintenance searches and throttle logic, to focus on debugging in this simpler case first --- src/yggdrasil/dht.go | 49 ++------------------------------------------ 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index da3ed75a..17d0a235 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -13,10 +13,8 @@ type dhtInfo struct { nodeID_hidden *NodeID key boxPubKey coords []byte - send time.Time // When we last sent a message recv time.Time // When we last received a message - throttle time.Duration - pings int // Time out if at least 3 consecutive maintenance pings drop + pings int // Time out if at least 3 consecutive maintenance pings drop } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -125,16 +123,6 @@ func (t *dht) insert(info *dhtInfo) { panic("FIXME") } info.recv = time.Now() - if oldInfo, isIn := t.table[*info.getNodeID()]; isIn { - info.send = oldInfo.send - info.throttle = oldInfo.throttle - } else { - info.send = info.recv - } - info.throttle += time.Second - if info.throttle > 30*time.Second { - info.throttle = 30 * time.Second - } t.table[*info.getNodeID()] = info } @@ -179,7 +167,6 @@ func (t *dht) handleReq(req *dhtReq) { } // For bootstrapping to work, we need to add these nodes to the table t.insert(&info) - info.send = info.send.Add(-time.Minute) } // Sends a lookup response to the specified node. @@ -308,7 +295,6 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { Coords: coords, Dest: *target, } - info.send = time.Now() t.sendReq(&req, info) } @@ -323,39 +309,8 @@ func (t *dht) doMaintenance() { successor = info } } - if successor != nil && - now.Sub(successor.recv) > successor.throttle && - now.Sub(successor.send) > 3*time.Second { + if successor != nil { t.ping(successor, nil) successor.pings++ - if now.Sub(t.search) > time.Minute { - // Start a search for our successor, beginning at this node's parent - // This should (hopefully) help bootstrap - t.core.switchTable.mutex.RLock() - parentPort := t.core.switchTable.parent - t.core.switchTable.mutex.RUnlock() - ports := t.core.peers.getPorts() - if parent, isIn := ports[parentPort]; isIn { - t.search = now - target := successor.getNodeID().prev() - sinfo, isIn := t.core.searches.searches[target] - if !isIn { - var mask NodeID - for idx := range mask { - mask[idx] = 0xff - } - sinfo = t.core.searches.newIterSearch(&target, &mask) - toVisit := sinfo.toVisit - parentNodeID := getNodeID(&parent.box) - for _, ninfo := range toVisit { - if *ninfo.getNodeID() == *parentNodeID { - toVisit = append(toVisit, ninfo) - } - } - sinfo.toVisit = toVisit - } - t.core.searches.continueSearch(sinfo) - } - } } } From 95201669fe7475fa463ec0014eae1133edb1b258 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 20 Oct 2018 22:06:36 -0500 Subject: [PATCH 10/20] reintroduce (better) dht throttling --- src/yggdrasil/dht.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 17d0a235..b04d2585 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -15,6 +15,7 @@ type dhtInfo struct { coords []byte recv time.Time // When we last received a message pings int // Time out if at least 3 consecutive maintenance pings drop + throttle time.Duration } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -123,6 +124,22 @@ func (t *dht) insert(info *dhtInfo) { panic("FIXME") } info.recv = time.Now() + if oldInfo, isIn := t.table[*info.getNodeID()]; isIn { + sameCoords := true + if len(info.coords) != len(oldInfo.coords) { + sameCoords = false + } else { + for idx := 0; idx < len(info.coords); idx++ { + if info.coords[idx] != oldInfo.coords[idx] { + sameCoords = false + break + } + } + } + if sameCoords { + info.throttle = oldInfo.throttle + } + } t.table[*info.getNodeID()] = info } @@ -309,8 +326,13 @@ func (t *dht) doMaintenance() { successor = info } } - if successor != nil { + if successor != nil && + now.Sub(successor.recv) > successor.throttle { t.ping(successor, nil) successor.pings++ + successor.throttle += time.Second + if successor.throttle > 30*time.Second { + successor.throttle = 30 * time.Second + } } } From 6c59ae862a8de95419961a35067385c891523e3d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 00:05:04 -0500 Subject: [PATCH 11/20] more debugging --- src/yggdrasil/dht.go | 68 ++++++++++++++++++++++++++++++++--------- src/yggdrasil/peer.go | 6 ++-- src/yggdrasil/router.go | 1 + 3 files changed, 59 insertions(+), 16 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b04d2585..2a97ec03 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -1,6 +1,12 @@ package yggdrasil +// TODO signal to predecessor when we replace them? +// Sending a ping with an extra 0 at the end of our coords should be enough to reset our throttle in their table +// That should encorage them to ping us again sooner, and then we can reply with new info +// Maybe remember old predecessor and check this during maintenance? + import ( + "fmt" "sort" "time" ) @@ -146,22 +152,32 @@ func (t *dht) insert(info *dhtInfo) { // Return true if first/second/third are (partially) ordered correctly // FIXME? maybe total ordering makes more sense func dht_ordered(first, second, third *NodeID) bool { - var ordered bool - for idx := 0; idx < NodeIDLen; idx++ { - f, s, t := first[idx], second[idx], third[idx] - switch { - case f == s && s == t: - continue - case f <= s && s <= t: - ordered = true // nothing wrapped around 0 - case t <= f && f <= s: - ordered = true // 0 is between second and third - case s <= t && t <= f: - ordered = true // 0 is between first and second + lessOrEqual := func(first, second *NodeID) bool { + for idx := 0; idx < NodeIDLen; idx++ { + if first[idx] > second[idx] { + return false + } + if first[idx] < second[idx] { + return true + } } - break + return true } - return ordered + firstLessThanSecond := lessOrEqual(first, second) + secondLessThanThird := lessOrEqual(second, third) + thirdLessThanFirst := lessOrEqual(third, first) + switch { + case firstLessThanSecond && secondLessThanThird: + // Nothing wrapped around 0, the easy case + return true + case thirdLessThanFirst && firstLessThanSecond: + // Third wrapped around 0 + return true + case secondLessThanThird && thirdLessThanFirst: + // Second (and third) wrapped around 0 + return true + } + return false } // Reads a request, performs a lookup, and responds. @@ -254,6 +270,9 @@ func (t *dht) handleRes(res *dhtRes) { predecessor = info } } + if len(res.Infos) > dht_lookup_size { + res.Infos = res.Infos[:dht_lookup_size] + } for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { continue @@ -331,8 +350,29 @@ func (t *dht) doMaintenance() { t.ping(successor, nil) successor.pings++ successor.throttle += time.Second + ///// + if now.Sub(t.search) > 30*time.Second { + t.search = now + target := successor.getNodeID().prev() + sinfo, isIn := t.core.searches.searches[target] + if !isIn { + var mask NodeID + for idx := range mask { + mask[idx] = 0xff + } + sinfo = t.core.searches.newIterSearch(&target, &mask) + } + t.core.searches.continueSearch(sinfo) + } + ///// + return + fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", successor.throttle, "nodeID:", successor.getNodeID()[:8], "coords:", successor.coords) + for infoID := range t.table { + fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID)) + } if successor.throttle > 30*time.Second { successor.throttle = 30 * time.Second } + fmt.Println("Table size:", len(t.table)) } } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index de463b43..f9511d5a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -183,8 +183,10 @@ func (p *peer) linkLoop() { } p.sendSwitchMsg() case _ = <-tick.C: - if p.dinfo != nil { - p.core.dht.peers <- p.dinfo + pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line + if pdinfo != nil { + dinfo := *pdinfo + p.core.dht.peers <- &dinfo } } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 1027eabd..27aad8d7 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -92,6 +92,7 @@ func (r *router) mainLoop() { r.sendPacket(p) case info := <-r.core.dht.peers: r.core.dht.insert(info) + info.throttle = 0 case <-r.reset: r.core.sessions.resetInits() r.core.dht.reset() From efe6cec11a865da58f53e44cdfaea45a384f4862 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 12:28:21 -0500 Subject: [PATCH 12/20] more debugging, trying to understand bootstrap issues --- src/yggdrasil/dht.go | 103 +++++++++++++++++++++++++++++------------- src/yggdrasil/peer.go | 3 +- 2 files changed, 73 insertions(+), 33 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 2a97ec03..b5bf926e 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -57,7 +57,6 @@ type dht struct { table map[NodeID]*dhtInfo peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[boxPubKey]map[NodeID]time.Time - search time.Time } // Initializes the DHT @@ -85,12 +84,11 @@ func (t *dht) reset() { } t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) - t.search = time.Now().Add(-time.Minute) if successor != nil { - t.ping(successor, &t.nodeID) + t.ping(successor, nil) } if predecessor != nil { - t.ping(predecessor, &t.nodeID) + t.ping(predecessor, nil) } } @@ -199,7 +197,24 @@ func (t *dht) handleReq(req *dhtReq) { coords: req.Coords, } // For bootstrapping to work, we need to add these nodes to the table - t.insert(&info) + //t.insert(&info) + // FIXME? DEBUG testing what happens if we only add better predecessors/successors + var successor *dhtInfo + var predecessor *dhtInfo + for infoID, v := range t.table { + // Get current successor and predecessor + if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + successor = v + } + if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { + predecessor = v + } + } + if successor != nil && dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { + t.insert(&info) + } else if predecessor != nil && dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { + t.insert(&info) + } } // Sends a lookup response to the specified node. @@ -263,10 +278,10 @@ func (t *dht) handleRes(res *dhtRes) { var predecessor *dhtInfo for infoID, info := range t.table { // Get current successor and predecessor - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { + if successor != nil && dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { successor = info } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { + if predecessor != nil && dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { predecessor = info } } @@ -284,10 +299,9 @@ func (t *dht) handleRes(res *dhtRes) { // Send a request to all better successors or predecessors // We could try sending to only the best, but then packet loss matters more if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - t.ping(info, &t.nodeID) - } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { - t.ping(info, &t.nodeID) + t.ping(info, nil) + } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { + t.ping(info, nil) } } // TODO add everyting else to a rumor mill for later use? (when/how?) @@ -322,7 +336,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { func (t *dht) ping(info *dhtInfo, target *NodeID) { // Creates a req for the node at dhtInfo, asking them about the target (if one is given) or themself (if no target is given) if target == nil { - target = info.getNodeID() + target = &t.nodeID } loc := t.core.switchTable.getLocator() coords := loc.getCoords() @@ -337,42 +351,67 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { func (t *dht) doMaintenance() { // Ping successor, asking for their predecessor, and clean up old/expired info var successor *dhtInfo + var predecessor *dhtInfo + toPing := make(map[NodeID]*dhtInfo) now := time.Now() for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { successor = info + } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { + predecessor = info } } + ////////////////////////////////////////////////////////////////////////////// + t.core.switchTable.mutex.RLock() + parentPort := t.core.switchTable.parent + parentInfo := t.core.switchTable.data.peers[parentPort] + t.core.switchTable.mutex.RUnlock() + ports := t.core.peers.getPorts() + if parent, isIn := ports[parentPort]; isIn { + loc := parentInfo.locator.clone() + end := len(loc.coords) + if end > 0 { + end -= 1 + } + loc.coords = loc.coords[:end] + pinfo := dhtInfo{key: parent.box, coords: loc.getCoords()} + t.insert(&pinfo) + } + ////////////////////////////////////////////////////////////////////////////// + if successor != nil { + toPing[*successor.getNodeID()] = successor + } + if predecessor != nil { + toPing[*predecessor.getNodeID()] = predecessor + } + for _, info := range toPing { + if now.Sub(info.recv) > info.throttle { + t.ping(info, nil) + info.pings++ + info.throttle += time.Second + if info.throttle > 30*time.Second { + info.throttle = 30 * time.Second + } + //fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) + } + } + return + ////////////////////////////////////////////////////////////////////////////// if successor != nil && now.Sub(successor.recv) > successor.throttle { t.ping(successor, nil) successor.pings++ successor.throttle += time.Second - ///// - if now.Sub(t.search) > 30*time.Second { - t.search = now - target := successor.getNodeID().prev() - sinfo, isIn := t.core.searches.searches[target] - if !isIn { - var mask NodeID - for idx := range mask { - mask[idx] = 0xff - } - sinfo = t.core.searches.newIterSearch(&target, &mask) - } - t.core.searches.continueSearch(sinfo) - } - ///// - return + //return fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", successor.throttle, "nodeID:", successor.getNodeID()[:8], "coords:", successor.coords) - for infoID := range t.table { - fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID)) - } + //for infoID := range t.table { + // fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID)) + //} if successor.throttle > 30*time.Second { successor.throttle = 30 * time.Second } - fmt.Println("Table size:", len(t.table)) + //fmt.Println("Table size:", len(t.table)) } } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f9511d5a..eee40fd6 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -183,6 +183,7 @@ func (p *peer) linkLoop() { } p.sendSwitchMsg() case _ = <-tick.C: + break // FIXME disabled the below completely to test something pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line if pdinfo != nil { dinfo := *pdinfo @@ -332,7 +333,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { key: p.box, coords: loc.getCoords(), } - p.core.dht.peers <- &dinfo + //p.core.dht.peers <- &dinfo p.dinfo = &dinfo } From bcbd24120d806c0f004fd1d1e3b4ca80aaff2732 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 14:57:04 -0500 Subject: [PATCH 13/20] keep track of all keys we're supposed to care about in the dht, don't give special treatment to successors/predecessors --- src/yggdrasil/dht.go | 146 +++++++++++++++++-------------------------- 1 file changed, 57 insertions(+), 89 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b5bf926e..c5102f04 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -52,11 +52,23 @@ type dhtRes struct { // The main DHT struct. type dht struct { - core *Core - nodeID NodeID - table map[NodeID]*dhtInfo - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[boxPubKey]map[NodeID]time.Time + core *Core + nodeID NodeID + table map[NodeID]*dhtInfo + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[boxPubKey]map[NodeID]time.Time + targets [NodeIDLen*8 + 1]NodeID +} + +func (nodeID NodeID) add(toAdd *NodeID) NodeID { + var accumulator uint16 + for idx := len(nodeID) - 1; idx >= 0; idx-- { + accumulator += uint16(nodeID[idx]) + accumulator += uint16(toAdd[idx]) + nodeID[idx] = byte(accumulator) + accumulator >>= 8 + } + return nodeID } // Initializes the DHT @@ -64,32 +76,27 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) + getDist := func(bit int) *NodeID { + nBits := NodeIDLen * 8 + theByte := (nBits - bit) / 8 + theBitmask := uint8(0x80) >> uint8(nBits-bit) + var nid NodeID + //fmt.Println("DEBUG: bit", bit, "theByte", theByte) + nid[theByte] = theBitmask + return &nid + } + for idx := range t.targets { + t.targets[idx] = t.nodeID.add(getDist(idx + 1)) + } + t.targets[len(t.targets)-1] = t.nodeID // Last one wraps around to self t.reset() } // Resets the DHT in response to coord changes // This empties all info from the DHT and drops outstanding requests -// It sends a ping to the old successor and predecessor, in case they're still around func (t *dht) reset() { - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, info := range t.table { - // Get current successor and predecessor - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info - } - } t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) - if successor != nil { - t.ping(successor, nil) - } - if predecessor != nil { - t.ping(predecessor, nil) - } } // Does a DHT lookup and returns up to dht_lookup_size results @@ -197,24 +204,7 @@ func (t *dht) handleReq(req *dhtReq) { coords: req.Coords, } // For bootstrapping to work, we need to add these nodes to the table - //t.insert(&info) - // FIXME? DEBUG testing what happens if we only add better predecessors/successors - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, v := range t.table { - // Get current successor and predecessor - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = v - } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = v - } - } - if successor != nil && dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - t.insert(&info) - } else if predecessor != nil && dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { - t.insert(&info) - } + t.insert(&info) } // Sends a lookup response to the specified node. @@ -274,17 +264,6 @@ func (t *dht) handleRes(res *dhtRes) { coords: res.Coords, } t.insert(&rinfo) // Or at the end, after checking successor/predecessor? - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, info := range t.table { - // Get current successor and predecessor - if successor != nil && dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } - if predecessor != nil && dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info - } - } if len(res.Infos) > dht_lookup_size { res.Infos = res.Infos[:dht_lookup_size] } @@ -296,11 +275,7 @@ func (t *dht) handleRes(res *dhtRes) { // TODO? don't skip if coords are different? continue } - // Send a request to all better successors or predecessors - // We could try sending to only the best, but then packet loss matters more - if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - t.ping(info, nil) - } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { + if t.isImportant(info.getNodeID()) { t.ping(info, nil) } } @@ -349,18 +324,13 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { } func (t *dht) doMaintenance() { - // Ping successor, asking for their predecessor, and clean up old/expired info - var successor *dhtInfo - var predecessor *dhtInfo toPing := make(map[NodeID]*dhtInfo) now := time.Now() for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) - } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info + } else if t.isImportant(info.getNodeID()) { + toPing[infoID] = info } } ////////////////////////////////////////////////////////////////////////////// @@ -380,38 +350,36 @@ func (t *dht) doMaintenance() { t.insert(&pinfo) } ////////////////////////////////////////////////////////////////////////////// - if successor != nil { - toPing[*successor.getNodeID()] = successor - } - if predecessor != nil { - toPing[*predecessor.getNodeID()] = predecessor - } for _, info := range toPing { if now.Sub(info.recv) > info.throttle { - t.ping(info, nil) + t.ping(info, info.getNodeID()) info.pings++ info.throttle += time.Second if info.throttle > 30*time.Second { info.throttle = 30 * time.Second } - //fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) + //continue + fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) } } - return - ////////////////////////////////////////////////////////////////////////////// - if successor != nil && - now.Sub(successor.recv) > successor.throttle { - t.ping(successor, nil) - successor.pings++ - successor.throttle += time.Second - //return - fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", successor.throttle, "nodeID:", successor.getNodeID()[:8], "coords:", successor.coords) - //for infoID := range t.table { - // fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID)) - //} - if successor.throttle > 30*time.Second { - successor.throttle = 30 * time.Second - } - //fmt.Println("Table size:", len(t.table)) - } +} + +func (t *dht) isImportant(nodeID *NodeID) bool { + // TODO persistently store stuff about best nodes, so we don't need to keep doing this + // Ideally switch to a better data structure... linked list? + for _, target := range t.targets { + // Get the best known node for this target + var best *dhtInfo + for _, info := range t.table { + if best == nil || dht_ordered(best.getNodeID(), info.getNodeID(), &target) { + best = info + } + } + if best != nil && dht_ordered(best.getNodeID(), nodeID, &target) { + // This is an equal or better finger table entry than what we currently have + return true + } + } + // We didn't find anything where this is better, so it must be worse + return false } From f0bd40ff6853943a66494e394979a5a924f0eba9 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 15:10:18 -0500 Subject: [PATCH 14/20] more testing --- src/yggdrasil/dht.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index c5102f04..c89663a4 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -102,12 +102,22 @@ func (t *dht) reset() { // Does a DHT lookup and returns up to dht_lookup_size results // If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches // If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node) -func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { +func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { + var results []*dhtInfo + for infoID, info := range t.table { + if everything || t.isImportant(&infoID) { + results = append(results, info) + } + } + return results +} + +func (t *dht) old_lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { var results []*dhtInfo var successor *dhtInfo sTarget := t.nodeID.next() for infoID, info := range t.table { - if allowWorse || dht_ordered(&t.nodeID, &infoID, nodeID) { + if true || allowWorse || dht_ordered(&t.nodeID, &infoID, nodeID) { results = append(results, info) } else { if successor == nil || dht_ordered(&sTarget, &infoID, successor.getNodeID()) { @@ -122,7 +132,7 @@ func (t *dht) lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { results = append([]*dhtInfo{successor}, results...) } if len(results) > dht_lookup_size { - results = results[:dht_lookup_size] + //results = results[:dht_lookup_size] //FIXME debug } return results } @@ -265,7 +275,7 @@ func (t *dht) handleRes(res *dhtRes) { } t.insert(&rinfo) // Or at the end, after checking successor/predecessor? if len(res.Infos) > dht_lookup_size { - res.Infos = res.Infos[:dht_lookup_size] + //res.Infos = res.Infos[:dht_lookup_size] //FIXME debug } for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { From 5e3959f1d0d26c0503208a6ac34fa34fc0875b5f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 17:40:43 -0500 Subject: [PATCH 15/20] yet more debugging --- misc/sim/treesim.go | 2 +- src/yggdrasil/dht.go | 130 +++++++++++++++++++--------------------- src/yggdrasil/peer.go | 2 +- src/yggdrasil/search.go | 11 +++- 4 files changed, 72 insertions(+), 73 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 793ef219..fc5e8449 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -437,7 +437,7 @@ func main() { pingNodes(kstore) //pingBench(kstore) // Only after disabling debug output //stressTest(kstore) - //time.Sleep(120*time.Second) + time.Sleep(120 * time.Second) dumpDHTSize(kstore) // note that this uses racey functions to read things... if false { // This connects the sim to the local network diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index c89663a4..bbf08ca8 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -11,7 +11,7 @@ import ( "time" ) -const dht_lookup_size = 16 +const dht_lookup_size = 4 // dhtInfo represents everything we know about a node in the DHT. // This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance. @@ -52,23 +52,11 @@ type dhtRes struct { // The main DHT struct. type dht struct { - core *Core - nodeID NodeID - table map[NodeID]*dhtInfo - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[boxPubKey]map[NodeID]time.Time - targets [NodeIDLen*8 + 1]NodeID -} - -func (nodeID NodeID) add(toAdd *NodeID) NodeID { - var accumulator uint16 - for idx := len(nodeID) - 1; idx >= 0; idx-- { - accumulator += uint16(nodeID[idx]) - accumulator += uint16(toAdd[idx]) - nodeID[idx] = byte(accumulator) - accumulator >>= 8 - } - return nodeID + core *Core + nodeID NodeID + table map[NodeID]*dhtInfo + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[boxPubKey]map[NodeID]time.Time } // Initializes the DHT @@ -76,19 +64,6 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) - getDist := func(bit int) *NodeID { - nBits := NodeIDLen * 8 - theByte := (nBits - bit) / 8 - theBitmask := uint8(0x80) >> uint8(nBits-bit) - var nid NodeID - //fmt.Println("DEBUG: bit", bit, "theByte", theByte) - nid[theByte] = theBitmask - return &nid - } - for idx := range t.targets { - t.targets[idx] = t.nodeID.add(getDist(idx + 1)) - } - t.targets[len(t.targets)-1] = t.nodeID // Last one wraps around to self t.reset() } @@ -103,11 +78,15 @@ func (t *dht) reset() { // If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches // If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node) func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { - var results []*dhtInfo - for infoID, info := range t.table { - if everything || t.isImportant(&infoID) { - results = append(results, info) - } + results := make([]*dhtInfo, 0, len(t.table)) + for _, info := range t.table { + results = append(results, info) + } + sort.SliceStable(results, func(i, j int) bool { + return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID) + }) + if len(results) > dht_lookup_size { + //results = results[:dht_lookup_size] //FIXME debug } return results } @@ -277,6 +256,7 @@ func (t *dht) handleRes(res *dhtRes) { if len(res.Infos) > dht_lookup_size { //res.Infos = res.Infos[:dht_lookup_size] //FIXME debug } + imp := t.getImportant() for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { continue @@ -285,7 +265,7 @@ func (t *dht) handleRes(res *dhtRes) { // TODO? don't skip if coords are different? continue } - if t.isImportant(info.getNodeID()) { + if t.isImportant(info, imp) { t.ping(info, nil) } } @@ -336,30 +316,18 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { func (t *dht) doMaintenance() { toPing := make(map[NodeID]*dhtInfo) now := time.Now() + imp := t.getImportant() + good := make(map[NodeID]*dhtInfo) + for _, info := range imp { + good[*info.getNodeID()] = info + } for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) - } else if t.isImportant(info.getNodeID()) { + } else if t.isImportant(info, imp) { toPing[infoID] = info } } - ////////////////////////////////////////////////////////////////////////////// - t.core.switchTable.mutex.RLock() - parentPort := t.core.switchTable.parent - parentInfo := t.core.switchTable.data.peers[parentPort] - t.core.switchTable.mutex.RUnlock() - ports := t.core.peers.getPorts() - if parent, isIn := ports[parentPort]; isIn { - loc := parentInfo.locator.clone() - end := len(loc.coords) - if end > 0 { - end -= 1 - } - loc.coords = loc.coords[:end] - pinfo := dhtInfo{key: parent.box, coords: loc.getCoords()} - t.insert(&pinfo) - } - ////////////////////////////////////////////////////////////////////////////// for _, info := range toPing { if now.Sub(info.recv) > info.throttle { t.ping(info, info.getNodeID()) @@ -368,28 +336,52 @@ func (t *dht) doMaintenance() { if info.throttle > 30*time.Second { info.throttle = 30 * time.Second } - //continue + continue fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) } } } -func (t *dht) isImportant(nodeID *NodeID) bool { - // TODO persistently store stuff about best nodes, so we don't need to keep doing this - // Ideally switch to a better data structure... linked list? - for _, target := range t.targets { - // Get the best known node for this target - var best *dhtInfo - for _, info := range t.table { - if best == nil || dht_ordered(best.getNodeID(), info.getNodeID(), &target) { - best = info - } +func (t *dht) getImportant() []*dhtInfo { + // Get a list of all known nodes + infos := make([]*dhtInfo, 0, len(t.table)) + for _, info := range t.table { + infos = append(infos, info) + } + // Sort them by increasing order in distance along the ring + sort.SliceStable(infos, func(i, j int) bool { + // Sort in order of successors + return dht_ordered(&t.nodeID, infos[i].getNodeID(), infos[j].getNodeID()) + }) + // Keep the ones that are no further than the closest seen so far + minDist := ^uint64(0) + loc := t.core.switchTable.getLocator() + important := infos[:0] + for _, info := range infos { + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + important = append(important, info) } - if best != nil && dht_ordered(best.getNodeID(), nodeID, &target) { - // This is an equal or better finger table entry than what we currently have + } + return important +} + +func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { + // Check if ninfo is of equal or greater importance to what we already know + loc := t.core.switchTable.getLocator() + ndist := uint64(loc.dist(ninfo.coords)) + minDist := ^uint64(0) + for _, info := range important { + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + } + if dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID()) && ndist <= minDist { + // This node is at least as close in both key space and tree space return true } } - // We didn't find anything where this is better, so it must be worse + // We didn't find any important node that ninfo is better than return false } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index eee40fd6..05351e60 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -183,7 +183,7 @@ func (p *peer) linkLoop() { } p.sendSwitchMsg() case _ = <-tick.C: - break // FIXME disabled the below completely to test something + //break // FIXME disabled the below completely to test something pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line if pdinfo != nil { dinfo := *pdinfo diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index dd6b09cf..d27b8c5a 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -12,6 +12,7 @@ package yggdrasil // A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) import ( + "fmt" "sort" "time" ) @@ -73,6 +74,9 @@ func (s *searches) handleDHTRes(res *dhtRes) { sinfo, isIn := s.searches[res.Dest] if !isIn || s.checkDHTRes(sinfo, res) { // Either we don't recognize this search, or we just finished it + if isIn { + fmt.Println("DEBUG: search finished, length:", len(sinfo.visited)) + } return } else { // Add to the search and continue @@ -92,7 +96,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } - if dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { + if true || dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { sinfo.toVisit = append(sinfo.toVisit, info) } } @@ -107,11 +111,13 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { } // Sort sort.SliceStable(sinfo.toVisit, func(i, j int) bool { + // Should return true if i is closer to the destination than j + // FIXME for some reason it works better backwards, why?! return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) }) // Truncate to some maximum size if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { - sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] + //sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] } } @@ -121,6 +127,7 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup delete(s.searches, sinfo.dest) + fmt.Println("DEBUG: search abandoned, length:", len(sinfo.visited)) return } else { // Send to the next search target From 253861ebd39f309f19fe3e3bcd41f3b7120a8814 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 21 Oct 2018 18:15:04 -0500 Subject: [PATCH 16/20] reverse dht ownership order from predecessor to successor, this plays nicer with the default 0 bits in unknown node IDs --- src/yggdrasil/dht.go | 41 ++++++++--------------------------------- src/yggdrasil/search.go | 5 +++-- 2 files changed, 11 insertions(+), 35 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index bbf08ca8..398cdb74 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -11,7 +11,7 @@ import ( "time" ) -const dht_lookup_size = 4 +const dht_lookup_size = 16 // dhtInfo represents everything we know about a node in the DHT. // This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance. @@ -75,43 +75,16 @@ func (t *dht) reset() { } // Does a DHT lookup and returns up to dht_lookup_size results -// If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches -// If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node) func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { results := make([]*dhtInfo, 0, len(t.table)) for _, info := range t.table { results = append(results, info) } sort.SliceStable(results, func(i, j int) bool { - return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID) + return dht_ordered(nodeID, results[i].getNodeID(), results[j].getNodeID()) }) if len(results) > dht_lookup_size { - //results = results[:dht_lookup_size] //FIXME debug - } - return results -} - -func (t *dht) old_lookup(nodeID *NodeID, allowWorse bool) []*dhtInfo { - var results []*dhtInfo - var successor *dhtInfo - sTarget := t.nodeID.next() - for infoID, info := range t.table { - if true || allowWorse || dht_ordered(&t.nodeID, &infoID, nodeID) { - results = append(results, info) - } else { - if successor == nil || dht_ordered(&sTarget, &infoID, successor.getNodeID()) { - successor = info - } - } - } - sort.SliceStable(results, func(i, j int) bool { - return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID) - }) - if successor != nil { - results = append([]*dhtInfo{successor}, results...) - } - if len(results) > dht_lookup_size { - //results = results[:dht_lookup_size] //FIXME debug + results = results[:dht_lookup_size] } return results } @@ -350,8 +323,9 @@ func (t *dht) getImportant() []*dhtInfo { } // Sort them by increasing order in distance along the ring sort.SliceStable(infos, func(i, j int) bool { - // Sort in order of successors - return dht_ordered(&t.nodeID, infos[i].getNodeID(), infos[j].getNodeID()) + // Sort in order of predecessors (!), reverse from chord normal, becuase it plays nicer with zero bits for unknown parts of target addresses + return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID) + //return dht_ordered(&t.nodeID, infos[i].getNodeID(), infos[j].getNodeID()) }) // Keep the ones that are no further than the closest seen so far minDist := ^uint64(0) @@ -377,7 +351,8 @@ func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { if dist < minDist { minDist = dist } - if dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID()) && ndist <= minDist { + //if dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID()) && ndist <= minDist { + if dht_ordered(info.getNodeID(), ninfo.getNodeID(), &t.nodeID) && ndist <= minDist { // This node is at least as close in both key space and tree space return true } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index d27b8c5a..b4bbb123 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -113,11 +113,12 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { sort.SliceStable(sinfo.toVisit, func(i, j int) bool { // Should return true if i is closer to the destination than j // FIXME for some reason it works better backwards, why?! - return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) + //return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) + return dht_ordered(&res.Dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) }) // Truncate to some maximum size if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { - //sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] } } From c0531627bc4d6e765bec15cc48e7f43179823d16 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 24 Oct 2018 22:03:27 -0500 Subject: [PATCH 17/20] fix some chord dht bootstrapping bugs, no known cases where it now fails --- misc/sim/treesim.go | 1 + src/yggdrasil/debug.go | 1 - src/yggdrasil/dht.go | 92 +++++++++++++++++++++++++++-------------- src/yggdrasil/search.go | 5 ++- 4 files changed, 65 insertions(+), 34 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index fc5e8449..4aa463dc 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -300,6 +300,7 @@ func pingNodes(store map[[32]byte]*Node) { } case <-ch: sendTo(payload, destAddr) + //dumpDHTSize(store) // note that this uses racey functions to read things... } } ticker.Stop() diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c71c2e5e..638bd8f4 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -233,7 +233,6 @@ func (c *Core) DEBUG_getDHTSize() int { c.router.doAdmin(func() { total = len(c.dht.table) }) - fmt.Println("DEBUG_getDHTSize():", total) return total } diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 398cdb74..3774b5c4 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -5,6 +5,16 @@ package yggdrasil // That should encorage them to ping us again sooner, and then we can reply with new info // Maybe remember old predecessor and check this during maintenance? +// TODO make sure that, if your peer is your successor or predecessor, you still bother to ping them and ask for better nodes +// Basically, don't automatically reset the dhtInfo.recv to time.Now() whenever updating them from the outside +// But *do* set it to something that won't instantly time them out or make them get pingspammed? +// Could set throttle to 0, but that's imperfect at best... pingspam + +// TODO? cache all nodes we ping (from e.g. searches), not just the important ones +// But only send maintenance pings to the important ones + +// TODO reoptimize search stuff (size, timeouts, etc) to play nicer with DHT churn + import ( "fmt" "sort" @@ -77,8 +87,12 @@ func (t *dht) reset() { // Does a DHT lookup and returns up to dht_lookup_size results func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { results := make([]*dhtInfo, 0, len(t.table)) + //imp := t.getImportant() for _, info := range t.table { results = append(results, info) + //if t.isImportant(info, imp) { + // results = append(results, info) + //} } sort.SliceStable(results, func(i, j int) bool { return dht_ordered(nodeID, results[i].getNodeID(), results[j].getNodeID()) @@ -165,8 +179,10 @@ func (t *dht) handleReq(req *dhtReq) { key: req.Key, coords: req.Coords, } - // For bootstrapping to work, we need to add these nodes to the table - t.insert(&info) + imp := t.getImportant() + if t.isImportant(&info, imp) { + t.insert(&info) + } } // Sends a lookup response to the specified node. @@ -186,28 +202,6 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { t.core.router.out(packet) } -// Returns nodeID + 1 -func (nodeID NodeID) next() NodeID { - for idx := len(nodeID) - 1; idx >= 0; idx-- { - nodeID[idx] += 1 - if nodeID[idx] != 0 { - break - } - } - return nodeID -} - -// Returns nodeID - 1 -func (nodeID NodeID) prev() NodeID { - for idx := len(nodeID) - 1; idx >= 0; idx-- { - nodeID[idx] -= 1 - if nodeID[idx] != 0xff { - break - } - } - return nodeID -} - // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { @@ -225,11 +219,14 @@ func (t *dht) handleRes(res *dhtRes) { key: res.Key, coords: res.Coords, } - t.insert(&rinfo) // Or at the end, after checking successor/predecessor? + imp := t.getImportant() + if t.isImportant(&rinfo, imp) { + t.insert(&rinfo) + } + //t.insert(&rinfo) // Or at the end, after checking successor/predecessor? if len(res.Infos) > dht_lookup_size { //res.Infos = res.Infos[:dht_lookup_size] //FIXME debug } - imp := t.getImportant() for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { continue @@ -313,6 +310,14 @@ func (t *dht) doMaintenance() { fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) } } + return // Skip printing debug info + var out []interface{} + out = append(out, "DEBUG important:") + out = append(out, t.nodeID[:8]) + for _, info := range imp { + out = append(out, info.getNodeID()[:8]) + } + fmt.Println(out...) } func (t *dht) getImportant() []*dhtInfo { @@ -323,9 +328,8 @@ func (t *dht) getImportant() []*dhtInfo { } // Sort them by increasing order in distance along the ring sort.SliceStable(infos, func(i, j int) bool { - // Sort in order of predecessors (!), reverse from chord normal, becuase it plays nicer with zero bits for unknown parts of target addresses + // Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID) - //return dht_ordered(&t.nodeID, infos[i].getNodeID(), infos[j].getNodeID()) }) // Keep the ones that are no further than the closest seen so far minDist := ^uint64(0) @@ -338,6 +342,19 @@ func (t *dht) getImportant() []*dhtInfo { important = append(important, info) } } + var temp []*dhtInfo + minDist = ^uint64(0) + for idx := len(infos) - 1; idx >= 0; idx-- { + info := infos[idx] + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + temp = append(temp, info) + } + } + for idx := len(temp) - 1; idx >= 0; idx-- { + important = append(important, temp[idx]) + } return important } @@ -347,15 +364,28 @@ func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { ndist := uint64(loc.dist(ninfo.coords)) minDist := ^uint64(0) for _, info := range important { + if (*info.getNodeID() == *ninfo.getNodeID()) || + (ndist < minDist && dht_ordered(info.getNodeID(), ninfo.getNodeID(), &t.nodeID)) { + // Either the same node, or a better one + return true + } dist := uint64(loc.dist(info.coords)) if dist < minDist { minDist = dist } - //if dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID()) && ndist <= minDist { - if dht_ordered(info.getNodeID(), ninfo.getNodeID(), &t.nodeID) && ndist <= minDist { - // This node is at least as close in both key space and tree space + } + minDist = ^uint64(0) + for idx := len(important) - 1; idx >= 0; idx-- { + info := important[idx] + if (*info.getNodeID() == *ninfo.getNodeID()) || + (ndist < minDist && dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID())) { + // Either the same node, or a better one return true } + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + } } // We didn't find any important node that ninfo is better than return false diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index b4bbb123..9039444f 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -96,7 +96,8 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } - if true || dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { + if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { + // Response is closer to the destination sinfo.toVisit = append(sinfo.toVisit, info) } } @@ -118,7 +119,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { }) // Truncate to some maximum size if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { - sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] //FIXME debug } } From 671c7f2a479facd407d69f03b888ba55e83a007d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 28 Oct 2018 15:04:44 -0500 Subject: [PATCH 18/20] don't update recv time for known nodes that ping us or known peers --- src/yggdrasil/dht.go | 2 +- src/yggdrasil/router.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 3774b5c4..a94de5e8 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -180,7 +180,7 @@ func (t *dht) handleReq(req *dhtReq) { coords: req.Coords, } imp := t.getImportant() - if t.isImportant(&info, imp) { + if _, isIn := t.table[*info.getNodeID()]; !isIn || t.isImportant(&info, imp) { t.insert(&info) } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 27aad8d7..cd01e740 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -91,8 +91,12 @@ func (r *router) mainLoop() { case p := <-r.send: r.sendPacket(p) case info := <-r.core.dht.peers: + now := time.Now() + oldInfo, isIn := r.core.dht.table[*info.getNodeID()] r.core.dht.insert(info) - info.throttle = 0 + if isIn && now.Sub(oldInfo.recv) < 45*time.Second { + info.recv = oldInfo.recv + } case <-r.reset: r.core.sessions.resetInits() r.core.dht.reset() From a008b42f99ad8b365841f305f60a3c749278e6cd Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 29 Oct 2018 22:24:18 -0500 Subject: [PATCH 19/20] cleanup and some bugfixes, cache important dht nodes until something gets added/removed --- src/yggdrasil/dht.go | 101 +++++++++++++++++----------------------- src/yggdrasil/search.go | 10 ++-- 2 files changed, 47 insertions(+), 64 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index a94de5e8..b8a303a8 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -16,7 +16,6 @@ package yggdrasil // TODO reoptimize search stuff (size, timeouts, etc) to play nicer with DHT churn import ( - "fmt" "sort" "time" ) @@ -64,9 +63,11 @@ type dhtRes struct { type dht struct { core *Core nodeID NodeID - table map[NodeID]*dhtInfo peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[boxPubKey]map[NodeID]time.Time + // These next two could be replaced by a single linked list or similar... + table map[NodeID]*dhtInfo + imp []*dhtInfo } // Initializes the DHT @@ -82,6 +83,7 @@ func (t *dht) init(c *Core) { func (t *dht) reset() { t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) + t.imp = nil } // Does a DHT lookup and returns up to dht_lookup_size results @@ -127,6 +129,7 @@ func (t *dht) insert(info *dhtInfo) { info.throttle = oldInfo.throttle } } + t.imp = nil // It needs to update to get a pointer to the new info t.table[*info.getNodeID()] = info } @@ -180,7 +183,7 @@ func (t *dht) handleReq(req *dhtReq) { coords: req.Coords, } imp := t.getImportant() - if _, isIn := t.table[*info.getNodeID()]; !isIn || t.isImportant(&info, imp) { + if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info, imp) { t.insert(&info) } } @@ -223,10 +226,6 @@ func (t *dht) handleRes(res *dhtRes) { if t.isImportant(&rinfo, imp) { t.insert(&rinfo) } - //t.insert(&rinfo) // Or at the end, after checking successor/predecessor? - if len(res.Infos) > dht_lookup_size { - //res.Infos = res.Infos[:dht_lookup_size] //FIXME debug - } for _, info := range res.Infos { if *info.getNodeID() == t.nodeID { continue @@ -284,21 +283,14 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { } func (t *dht) doMaintenance() { - toPing := make(map[NodeID]*dhtInfo) now := time.Now() - imp := t.getImportant() - good := make(map[NodeID]*dhtInfo) - for _, info := range imp { - good[*info.getNodeID()] = info - } for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) - } else if t.isImportant(info, imp) { - toPing[infoID] = info + t.imp = nil } } - for _, info := range toPing { + for _, info := range t.getImportant() { if now.Sub(info.recv) > info.throttle { t.ping(info, info.getNodeID()) info.pings++ @@ -306,56 +298,49 @@ func (t *dht) doMaintenance() { if info.throttle > 30*time.Second { info.throttle = 30 * time.Second } - continue - fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) } } - return // Skip printing debug info - var out []interface{} - out = append(out, "DEBUG important:") - out = append(out, t.nodeID[:8]) - for _, info := range imp { - out = append(out, info.getNodeID()[:8]) - } - fmt.Println(out...) } func (t *dht) getImportant() []*dhtInfo { - // Get a list of all known nodes - infos := make([]*dhtInfo, 0, len(t.table)) - for _, info := range t.table { - infos = append(infos, info) - } - // Sort them by increasing order in distance along the ring - sort.SliceStable(infos, func(i, j int) bool { - // Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses - return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID) - }) - // Keep the ones that are no further than the closest seen so far - minDist := ^uint64(0) - loc := t.core.switchTable.getLocator() - important := infos[:0] - for _, info := range infos { - dist := uint64(loc.dist(info.coords)) - if dist < minDist { - minDist = dist - important = append(important, info) + if t.imp == nil { + // Get a list of all known nodes + infos := make([]*dhtInfo, 0, len(t.table)) + for _, info := range t.table { + infos = append(infos, info) } - } - var temp []*dhtInfo - minDist = ^uint64(0) - for idx := len(infos) - 1; idx >= 0; idx-- { - info := infos[idx] - dist := uint64(loc.dist(info.coords)) - if dist < minDist { - minDist = dist - temp = append(temp, info) + // Sort them by increasing order in distance along the ring + sort.SliceStable(infos, func(i, j int) bool { + // Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses + return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID) + }) + // Keep the ones that are no further than the closest seen so far + minDist := ^uint64(0) + loc := t.core.switchTable.getLocator() + important := infos[:0] + for _, info := range infos { + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + important = append(important, info) + } } + var temp []*dhtInfo + minDist = ^uint64(0) + for idx := len(infos) - 1; idx >= 0; idx-- { + info := infos[idx] + dist := uint64(loc.dist(info.coords)) + if dist < minDist { + minDist = dist + temp = append(temp, info) + } + } + for idx := len(temp) - 1; idx >= 0; idx-- { + important = append(important, temp[idx]) + } + t.imp = important } - for idx := len(temp) - 1; idx >= 0; idx-- { - important = append(important, temp[idx]) - } - return important + return t.imp } func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 9039444f..f22fbe2a 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -11,8 +11,10 @@ package yggdrasil // A new search packet is sent immediately after receiving a response // A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) +// TODO? +// Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there? + import ( - "fmt" "sort" "time" ) @@ -74,9 +76,6 @@ func (s *searches) handleDHTRes(res *dhtRes) { sinfo, isIn := s.searches[res.Dest] if !isIn || s.checkDHTRes(sinfo, res) { // Either we don't recognize this search, or we just finished it - if isIn { - fmt.Println("DEBUG: search finished, length:", len(sinfo.visited)) - } return } else { // Add to the search and continue @@ -92,6 +91,7 @@ func (s *searches) handleDHTRes(res *dhtRes) { func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} + sinfo.visited[*from.getNodeID()] = true for _, info := range res.Infos { if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue @@ -129,14 +129,12 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup delete(s.searches, sinfo.dest) - fmt.Println("DEBUG: search abandoned, length:", len(sinfo.visited)) return } else { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] s.core.dht.ping(next, &sinfo.dest) - sinfo.visited[*next.getNodeID()] = true } } From 15d5b3f82c21fa772c7fe7de44c8852eb685b7ad Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 9 Nov 2018 23:02:38 -0600 Subject: [PATCH 20/20] comments and minor cleanup --- src/yggdrasil/dht.go | 51 +++++++++++++++-------------------------- src/yggdrasil/search.go | 4 +--- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b8a303a8..9891ec90 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -1,19 +1,9 @@ package yggdrasil -// TODO signal to predecessor when we replace them? -// Sending a ping with an extra 0 at the end of our coords should be enough to reset our throttle in their table -// That should encorage them to ping us again sooner, and then we can reply with new info -// Maybe remember old predecessor and check this during maintenance? - -// TODO make sure that, if your peer is your successor or predecessor, you still bother to ping them and ask for better nodes -// Basically, don't automatically reset the dhtInfo.recv to time.Now() whenever updating them from the outside -// But *do* set it to something that won't instantly time them out or make them get pingspammed? -// Could set throttle to 0, but that's imperfect at best... pingspam - -// TODO? cache all nodes we ping (from e.g. searches), not just the important ones -// But only send maintenance pings to the important ones - -// TODO reoptimize search stuff (size, timeouts, etc) to play nicer with DHT churn +// A chord-like Distributed Hash Table (DHT). +// Used to look up coords given a NodeID and bitmask (taken from an IPv6 address). +// Keeps track of immediate successor, predecessor, and all peers. +// Also keeps track of other nodes if they're closer in tree space than all other known nodes encountered when heading in either direction to that point, under the hypothesis that, for the kinds of networks we care about, this should probabilistically include the node needed to keep lookups to near O(logn) steps. import ( "sort" @@ -70,7 +60,7 @@ type dht struct { imp []*dhtInfo } -// Initializes the DHT +// Initializes the DHT. func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() @@ -78,23 +68,19 @@ func (t *dht) init(c *Core) { t.reset() } -// Resets the DHT in response to coord changes -// This empties all info from the DHT and drops outstanding requests +// Resets the DHT in response to coord changes. +// This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) t.imp = nil } -// Does a DHT lookup and returns up to dht_lookup_size results +// Does a DHT lookup and returns up to dht_lookup_size results. func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { results := make([]*dhtInfo, 0, len(t.table)) - //imp := t.getImportant() for _, info := range t.table { results = append(results, info) - //if t.isImportant(info, imp) { - // results = append(results, info) - //} } sort.SliceStable(results, func(i, j int) bool { return dht_ordered(nodeID, results[i].getNodeID(), results[j].getNodeID()) @@ -105,7 +91,7 @@ func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { return results } -// Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now +// Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now. func (t *dht) insert(info *dhtInfo) { if *info.getNodeID() == t.nodeID { // This shouldn't happen, but don't add it if it does @@ -133,8 +119,7 @@ func (t *dht) insert(info *dhtInfo) { t.table[*info.getNodeID()] = info } -// Return true if first/second/third are (partially) ordered correctly -// FIXME? maybe total ordering makes more sense +// Return true if first/second/third are (partially) ordered correctly. func dht_ordered(first, second, third *NodeID) bool { lessOrEqual := func(first, second *NodeID) bool { for idx := 0; idx < NodeIDLen; idx++ { @@ -182,8 +167,7 @@ func (t *dht) handleReq(req *dhtReq) { key: req.Key, coords: req.Coords, } - imp := t.getImportant() - if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info, imp) { + if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info) { t.insert(&info) } } @@ -222,8 +206,7 @@ func (t *dht) handleRes(res *dhtRes) { key: res.Key, coords: res.Coords, } - imp := t.getImportant() - if t.isImportant(&rinfo, imp) { + if t.isImportant(&rinfo) { t.insert(&rinfo) } for _, info := range res.Infos { @@ -234,11 +217,10 @@ func (t *dht) handleRes(res *dhtRes) { // TODO? don't skip if coords are different? continue } - if t.isImportant(info, imp) { + if t.isImportant(info) { t.ping(info, nil) } } - // TODO add everyting else to a rumor mill for later use? (when/how?) } // Sends a lookup request to the specified node. @@ -267,6 +249,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { reqsToDest[req.Dest] = time.Now() } +// Sends a lookup to this info, looking for the target. func (t *dht) ping(info *dhtInfo, target *NodeID) { // Creates a req for the node at dhtInfo, asking them about the target (if one is given) or themself (if no target is given) if target == nil { @@ -282,6 +265,7 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { t.sendReq(&req, info) } +// Periodic maintenance work to keep important DHT nodes alive. func (t *dht) doMaintenance() { now := time.Now() for infoID, info := range t.table { @@ -302,6 +286,7 @@ func (t *dht) doMaintenance() { } } +// Gets a list of important nodes, used by isImportant. func (t *dht) getImportant() []*dhtInfo { if t.imp == nil { // Get a list of all known nodes @@ -343,7 +328,9 @@ func (t *dht) getImportant() []*dhtInfo { return t.imp } -func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool { +// Returns true if this is a node we need to keep track of for the DHT to work. +func (t *dht) isImportant(ninfo *dhtInfo) bool { + important := t.getImportant() // Check if ninfo is of equal or greater importance to what we already know loc := t.core.switchTable.getLocator() ndist := uint64(loc.dist(ninfo.coords)) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index f22fbe2a..21694907 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -113,13 +113,11 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Sort sort.SliceStable(sinfo.toVisit, func(i, j int) bool { // Should return true if i is closer to the destination than j - // FIXME for some reason it works better backwards, why?! - //return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) return dht_ordered(&res.Dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID()) }) // Truncate to some maximum size if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { - sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] //FIXME debug + sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] } }