diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b5bf926e..c5102f04 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -52,11 +52,23 @@ type dhtRes struct { // The main DHT struct. type dht struct { - core *Core - nodeID NodeID - table map[NodeID]*dhtInfo - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[boxPubKey]map[NodeID]time.Time + core *Core + nodeID NodeID + table map[NodeID]*dhtInfo + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[boxPubKey]map[NodeID]time.Time + targets [NodeIDLen*8 + 1]NodeID +} + +func (nodeID NodeID) add(toAdd *NodeID) NodeID { + var accumulator uint16 + for idx := len(nodeID) - 1; idx >= 0; idx-- { + accumulator += uint16(nodeID[idx]) + accumulator += uint16(toAdd[idx]) + nodeID[idx] = byte(accumulator) + accumulator >>= 8 + } + return nodeID } // Initializes the DHT @@ -64,32 +76,27 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) + getDist := func(bit int) *NodeID { + nBits := NodeIDLen * 8 + theByte := (nBits - bit) / 8 + theBitmask := uint8(0x80) >> uint8(nBits-bit) + var nid NodeID + //fmt.Println("DEBUG: bit", bit, "theByte", theByte) + nid[theByte] = theBitmask + return &nid + } + for idx := range t.targets { + t.targets[idx] = t.nodeID.add(getDist(idx + 1)) + } + t.targets[len(t.targets)-1] = t.nodeID // Last one wraps around to self t.reset() } // Resets the DHT in response to coord changes // This empties all info from the DHT and drops outstanding requests -// It sends a ping to the old successor and predecessor, in case they're still around func (t *dht) reset() { - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, info := range t.table { - // Get current successor and predecessor - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info - } - } t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.table = make(map[NodeID]*dhtInfo) - if successor != nil { - t.ping(successor, nil) - } - if predecessor != nil { - t.ping(predecessor, nil) - } } // Does a DHT lookup and returns up to dht_lookup_size results @@ -197,24 +204,7 @@ func (t *dht) handleReq(req *dhtReq) { coords: req.Coords, } // For bootstrapping to work, we need to add these nodes to the table - //t.insert(&info) - // FIXME? DEBUG testing what happens if we only add better predecessors/successors - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, v := range t.table { - // Get current successor and predecessor - if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = v - } - if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = v - } - } - if successor != nil && dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - t.insert(&info) - } else if predecessor != nil && dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { - t.insert(&info) - } + t.insert(&info) } // Sends a lookup response to the specified node. @@ -274,17 +264,6 @@ func (t *dht) handleRes(res *dhtRes) { coords: res.Coords, } t.insert(&rinfo) // Or at the end, after checking successor/predecessor? - var successor *dhtInfo - var predecessor *dhtInfo - for infoID, info := range t.table { - // Get current successor and predecessor - if successor != nil && dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } - if predecessor != nil && dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info - } - } if len(res.Infos) > dht_lookup_size { res.Infos = res.Infos[:dht_lookup_size] } @@ -296,11 +275,7 @@ func (t *dht) handleRes(res *dhtRes) { // TODO? don't skip if coords are different? continue } - // Send a request to all better successors or predecessors - // We could try sending to only the best, but then packet loss matters more - if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) { - t.ping(info, nil) - } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) { + if t.isImportant(info.getNodeID()) { t.ping(info, nil) } } @@ -349,18 +324,13 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { } func (t *dht) doMaintenance() { - // Ping successor, asking for their predecessor, and clean up old/expired info - var successor *dhtInfo - var predecessor *dhtInfo toPing := make(map[NodeID]*dhtInfo) now := time.Now() for infoID, info := range t.table { if now.Sub(info.recv) > time.Minute || info.pings > 3 { delete(t.table, infoID) - } else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) { - successor = info - } else if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) { - predecessor = info + } else if t.isImportant(info.getNodeID()) { + toPing[infoID] = info } } ////////////////////////////////////////////////////////////////////////////// @@ -380,38 +350,36 @@ func (t *dht) doMaintenance() { t.insert(&pinfo) } ////////////////////////////////////////////////////////////////////////////// - if successor != nil { - toPing[*successor.getNodeID()] = successor - } - if predecessor != nil { - toPing[*predecessor.getNodeID()] = predecessor - } for _, info := range toPing { if now.Sub(info.recv) > info.throttle { - t.ping(info, nil) + t.ping(info, info.getNodeID()) info.pings++ info.throttle += time.Second if info.throttle > 30*time.Second { info.throttle = 30 * time.Second } - //fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) + //continue + fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) } } - return - ////////////////////////////////////////////////////////////////////////////// - if successor != nil && - now.Sub(successor.recv) > successor.throttle { - t.ping(successor, nil) - successor.pings++ - successor.throttle += time.Second - //return - fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", successor.throttle, "nodeID:", successor.getNodeID()[:8], "coords:", successor.coords) - //for infoID := range t.table { - // fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID)) - //} - if successor.throttle > 30*time.Second { - successor.throttle = 30 * time.Second - } - //fmt.Println("Table size:", len(t.table)) - } +} + +func (t *dht) isImportant(nodeID *NodeID) bool { + // TODO persistently store stuff about best nodes, so we don't need to keep doing this + // Ideally switch to a better data structure... linked list? + for _, target := range t.targets { + // Get the best known node for this target + var best *dhtInfo + for _, info := range t.table { + if best == nil || dht_ordered(best.getNodeID(), info.getNodeID(), &target) { + best = info + } + } + if best != nil && dht_ordered(best.getNodeID(), nodeID, &target) { + // This is an equal or better finger table entry than what we currently have + return true + } + } + // We didn't find anything where this is better, so it must be worse + return false }