keep track of all keys we're supposed to care about in the dht, don't give special treatment to successors/predecessors

This commit is contained in:
Arceliar 2018-10-21 14:57:04 -05:00
parent efe6cec11a
commit bcbd24120d

View File

@ -52,11 +52,23 @@ type dhtRes struct {
// The main DHT struct.
type dht struct {
core *Core
nodeID NodeID
table map[NodeID]*dhtInfo
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time
core *Core
nodeID NodeID
table map[NodeID]*dhtInfo
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time
targets [NodeIDLen*8 + 1]NodeID
}
func (nodeID NodeID) add(toAdd *NodeID) NodeID {
var accumulator uint16
for idx := len(nodeID) - 1; idx >= 0; idx-- {
accumulator += uint16(nodeID[idx])
accumulator += uint16(toAdd[idx])
nodeID[idx] = byte(accumulator)
accumulator >>= 8
}
return nodeID
}
// Initializes the DHT
@ -64,32 +76,27 @@ func (t *dht) init(c *Core) {
t.core = c
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024)
getDist := func(bit int) *NodeID {
nBits := NodeIDLen * 8
theByte := (nBits - bit) / 8
theBitmask := uint8(0x80) >> uint8(nBits-bit)
var nid NodeID
//fmt.Println("DEBUG: bit", bit, "theByte", theByte)
nid[theByte] = theBitmask
return &nid
}
for idx := range t.targets {
t.targets[idx] = t.nodeID.add(getDist(idx + 1))
}
t.targets[len(t.targets)-1] = t.nodeID // Last one wraps around to self
t.reset()
}
// Resets the DHT in response to coord changes
// This empties all info from the DHT and drops outstanding requests
// It sends a ping to the old successor and predecessor, in case they're still around
func (t *dht) reset() {
var successor *dhtInfo
var predecessor *dhtInfo
for infoID, info := range t.table {
// Get current successor and predecessor
if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = info
}
if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) {
predecessor = info
}
}
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
t.table = make(map[NodeID]*dhtInfo)
if successor != nil {
t.ping(successor, nil)
}
if predecessor != nil {
t.ping(predecessor, nil)
}
}
// Does a DHT lookup and returns up to dht_lookup_size results
@ -197,24 +204,7 @@ func (t *dht) handleReq(req *dhtReq) {
coords: req.Coords,
}
// For bootstrapping to work, we need to add these nodes to the table
//t.insert(&info)
// FIXME? DEBUG testing what happens if we only add better predecessors/successors
var successor *dhtInfo
var predecessor *dhtInfo
for infoID, v := range t.table {
// Get current successor and predecessor
if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = v
}
if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) {
predecessor = v
}
}
if successor != nil && dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) {
t.insert(&info)
} else if predecessor != nil && dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) {
t.insert(&info)
}
t.insert(&info)
}
// Sends a lookup response to the specified node.
@ -274,17 +264,6 @@ func (t *dht) handleRes(res *dhtRes) {
coords: res.Coords,
}
t.insert(&rinfo) // Or at the end, after checking successor/predecessor?
var successor *dhtInfo
var predecessor *dhtInfo
for infoID, info := range t.table {
// Get current successor and predecessor
if successor != nil && dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = info
}
if predecessor != nil && dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) {
predecessor = info
}
}
if len(res.Infos) > dht_lookup_size {
res.Infos = res.Infos[:dht_lookup_size]
}
@ -296,11 +275,7 @@ func (t *dht) handleRes(res *dhtRes) {
// TODO? don't skip if coords are different?
continue
}
// Send a request to all better successors or predecessors
// We could try sending to only the best, but then packet loss matters more
if successor == nil || dht_ordered(&t.nodeID, info.getNodeID(), successor.getNodeID()) {
t.ping(info, nil)
} else if predecessor == nil || dht_ordered(predecessor.getNodeID(), info.getNodeID(), &t.nodeID) {
if t.isImportant(info.getNodeID()) {
t.ping(info, nil)
}
}
@ -349,18 +324,13 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
}
func (t *dht) doMaintenance() {
// Ping successor, asking for their predecessor, and clean up old/expired info
var successor *dhtInfo
var predecessor *dhtInfo
toPing := make(map[NodeID]*dhtInfo)
now := time.Now()
for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute || info.pings > 3 {
delete(t.table, infoID)
} else if successor == nil || dht_ordered(&t.nodeID, &infoID, successor.getNodeID()) {
successor = info
} else if predecessor == nil || dht_ordered(predecessor.getNodeID(), &infoID, &t.nodeID) {
predecessor = info
} else if t.isImportant(info.getNodeID()) {
toPing[infoID] = info
}
}
//////////////////////////////////////////////////////////////////////////////
@ -380,38 +350,36 @@ func (t *dht) doMaintenance() {
t.insert(&pinfo)
}
//////////////////////////////////////////////////////////////////////////////
if successor != nil {
toPing[*successor.getNodeID()] = successor
}
if predecessor != nil {
toPing[*predecessor.getNodeID()] = predecessor
}
for _, info := range toPing {
if now.Sub(info.recv) > info.throttle {
t.ping(info, nil)
t.ping(info, info.getNodeID())
info.pings++
info.throttle += time.Second
if info.throttle > 30*time.Second {
info.throttle = 30 * time.Second
}
//fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords)
//continue
fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords)
}
}
return
//////////////////////////////////////////////////////////////////////////////
if successor != nil &&
now.Sub(successor.recv) > successor.throttle {
t.ping(successor, nil)
successor.pings++
successor.throttle += time.Second
//return
fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", successor.throttle, "nodeID:", successor.getNodeID()[:8], "coords:", successor.coords)
//for infoID := range t.table {
// fmt.Println("DEBUG other info:", infoID[:8], "ordered", dht_ordered(&t.nodeID, &infoID, successor.getNodeID()), "swapped:", dht_ordered(&t.nodeID, successor.getNodeID(), &infoID))
//}
if successor.throttle > 30*time.Second {
successor.throttle = 30 * time.Second
}
//fmt.Println("Table size:", len(t.table))
}
}
func (t *dht) isImportant(nodeID *NodeID) bool {
// TODO persistently store stuff about best nodes, so we don't need to keep doing this
// Ideally switch to a better data structure... linked list?
for _, target := range t.targets {
// Get the best known node for this target
var best *dhtInfo
for _, info := range t.table {
if best == nil || dht_ordered(best.getNodeID(), info.getNodeID(), &target) {
best = info
}
}
if best != nil && dht_ordered(best.getNodeID(), nodeID, &target) {
// This is an equal or better finger table entry than what we currently have
return true
}
}
// We didn't find anything where this is better, so it must be worse
return false
}