yet more debugging

This commit is contained in:
Arceliar 2018-10-21 17:40:43 -05:00
parent f0bd40ff68
commit 5e3959f1d0
4 changed files with 72 additions and 73 deletions

View File

@ -437,7 +437,7 @@ func main() {
pingNodes(kstore) pingNodes(kstore)
//pingBench(kstore) // Only after disabling debug output //pingBench(kstore) // Only after disabling debug output
//stressTest(kstore) //stressTest(kstore)
//time.Sleep(120*time.Second) time.Sleep(120 * time.Second)
dumpDHTSize(kstore) // note that this uses racey functions to read things... dumpDHTSize(kstore) // note that this uses racey functions to read things...
if false { if false {
// This connects the sim to the local network // This connects the sim to the local network

View File

@ -11,7 +11,7 @@ import (
"time" "time"
) )
const dht_lookup_size = 16 const dht_lookup_size = 4
// dhtInfo represents everything we know about a node in the DHT. // dhtInfo represents everything we know about a node in the DHT.
// This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance. // This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance.
@ -52,23 +52,11 @@ type dhtRes struct {
// The main DHT struct. // The main DHT struct.
type dht struct { type dht struct {
core *Core core *Core
nodeID NodeID nodeID NodeID
table map[NodeID]*dhtInfo table map[NodeID]*dhtInfo
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time reqs map[boxPubKey]map[NodeID]time.Time
targets [NodeIDLen*8 + 1]NodeID
}
func (nodeID NodeID) add(toAdd *NodeID) NodeID {
var accumulator uint16
for idx := len(nodeID) - 1; idx >= 0; idx-- {
accumulator += uint16(nodeID[idx])
accumulator += uint16(toAdd[idx])
nodeID[idx] = byte(accumulator)
accumulator >>= 8
}
return nodeID
} }
// Initializes the DHT // Initializes the DHT
@ -76,19 +64,6 @@ func (t *dht) init(c *Core) {
t.core = c t.core = c
t.nodeID = *t.core.GetNodeID() t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024) t.peers = make(chan *dhtInfo, 1024)
getDist := func(bit int) *NodeID {
nBits := NodeIDLen * 8
theByte := (nBits - bit) / 8
theBitmask := uint8(0x80) >> uint8(nBits-bit)
var nid NodeID
//fmt.Println("DEBUG: bit", bit, "theByte", theByte)
nid[theByte] = theBitmask
return &nid
}
for idx := range t.targets {
t.targets[idx] = t.nodeID.add(getDist(idx + 1))
}
t.targets[len(t.targets)-1] = t.nodeID // Last one wraps around to self
t.reset() t.reset()
} }
@ -103,11 +78,15 @@ func (t *dht) reset() {
// If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches // If allowWorse = true, begins with best know predecessor for ID and works backwards, even if these nodes are worse predecessors than we are, to be used when intializing searches
// If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node) // If allowWorse = false, begins with the best known successor for ID and works backwards (next is predecessor, etc, inclusive of the ID if it's a known node)
func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo { func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo {
var results []*dhtInfo results := make([]*dhtInfo, 0, len(t.table))
for infoID, info := range t.table { for _, info := range t.table {
if everything || t.isImportant(&infoID) { results = append(results, info)
results = append(results, info) }
} sort.SliceStable(results, func(i, j int) bool {
return dht_ordered(results[j].getNodeID(), results[i].getNodeID(), nodeID)
})
if len(results) > dht_lookup_size {
//results = results[:dht_lookup_size] //FIXME debug
} }
return results return results
} }
@ -277,6 +256,7 @@ func (t *dht) handleRes(res *dhtRes) {
if len(res.Infos) > dht_lookup_size { if len(res.Infos) > dht_lookup_size {
//res.Infos = res.Infos[:dht_lookup_size] //FIXME debug //res.Infos = res.Infos[:dht_lookup_size] //FIXME debug
} }
imp := t.getImportant()
for _, info := range res.Infos { for _, info := range res.Infos {
if *info.getNodeID() == t.nodeID { if *info.getNodeID() == t.nodeID {
continue continue
@ -285,7 +265,7 @@ func (t *dht) handleRes(res *dhtRes) {
// TODO? don't skip if coords are different? // TODO? don't skip if coords are different?
continue continue
} }
if t.isImportant(info.getNodeID()) { if t.isImportant(info, imp) {
t.ping(info, nil) t.ping(info, nil)
} }
} }
@ -336,30 +316,18 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
func (t *dht) doMaintenance() { func (t *dht) doMaintenance() {
toPing := make(map[NodeID]*dhtInfo) toPing := make(map[NodeID]*dhtInfo)
now := time.Now() now := time.Now()
imp := t.getImportant()
good := make(map[NodeID]*dhtInfo)
for _, info := range imp {
good[*info.getNodeID()] = info
}
for infoID, info := range t.table { for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute || info.pings > 3 { if now.Sub(info.recv) > time.Minute || info.pings > 3 {
delete(t.table, infoID) delete(t.table, infoID)
} else if t.isImportant(info.getNodeID()) { } else if t.isImportant(info, imp) {
toPing[infoID] = info toPing[infoID] = info
} }
} }
//////////////////////////////////////////////////////////////////////////////
t.core.switchTable.mutex.RLock()
parentPort := t.core.switchTable.parent
parentInfo := t.core.switchTable.data.peers[parentPort]
t.core.switchTable.mutex.RUnlock()
ports := t.core.peers.getPorts()
if parent, isIn := ports[parentPort]; isIn {
loc := parentInfo.locator.clone()
end := len(loc.coords)
if end > 0 {
end -= 1
}
loc.coords = loc.coords[:end]
pinfo := dhtInfo{key: parent.box, coords: loc.getCoords()}
t.insert(&pinfo)
}
//////////////////////////////////////////////////////////////////////////////
for _, info := range toPing { for _, info := range toPing {
if now.Sub(info.recv) > info.throttle { if now.Sub(info.recv) > info.throttle {
t.ping(info, info.getNodeID()) t.ping(info, info.getNodeID())
@ -368,28 +336,52 @@ func (t *dht) doMaintenance() {
if info.throttle > 30*time.Second { if info.throttle > 30*time.Second {
info.throttle = 30 * time.Second info.throttle = 30 * time.Second
} }
//continue continue
fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords) fmt.Println("DEBUG self:", t.nodeID[:8], "throttle:", info.throttle, "nodeID:", info.getNodeID()[:8], "coords:", info.coords)
} }
} }
} }
func (t *dht) isImportant(nodeID *NodeID) bool { func (t *dht) getImportant() []*dhtInfo {
// TODO persistently store stuff about best nodes, so we don't need to keep doing this // Get a list of all known nodes
// Ideally switch to a better data structure... linked list? infos := make([]*dhtInfo, 0, len(t.table))
for _, target := range t.targets { for _, info := range t.table {
// Get the best known node for this target infos = append(infos, info)
var best *dhtInfo }
for _, info := range t.table { // Sort them by increasing order in distance along the ring
if best == nil || dht_ordered(best.getNodeID(), info.getNodeID(), &target) { sort.SliceStable(infos, func(i, j int) bool {
best = info // Sort in order of successors
} return dht_ordered(&t.nodeID, infos[i].getNodeID(), infos[j].getNodeID())
})
// Keep the ones that are no further than the closest seen so far
minDist := ^uint64(0)
loc := t.core.switchTable.getLocator()
important := infos[:0]
for _, info := range infos {
dist := uint64(loc.dist(info.coords))
if dist < minDist {
minDist = dist
important = append(important, info)
} }
if best != nil && dht_ordered(best.getNodeID(), nodeID, &target) { }
// This is an equal or better finger table entry than what we currently have return important
}
func (t *dht) isImportant(ninfo *dhtInfo, important []*dhtInfo) bool {
// Check if ninfo is of equal or greater importance to what we already know
loc := t.core.switchTable.getLocator()
ndist := uint64(loc.dist(ninfo.coords))
minDist := ^uint64(0)
for _, info := range important {
dist := uint64(loc.dist(info.coords))
if dist < minDist {
minDist = dist
}
if dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID()) && ndist <= minDist {
// This node is at least as close in both key space and tree space
return true return true
} }
} }
// We didn't find anything where this is better, so it must be worse // We didn't find any important node that ninfo is better than
return false return false
} }

View File

@ -183,7 +183,7 @@ func (p *peer) linkLoop() {
} }
p.sendSwitchMsg() p.sendSwitchMsg()
case _ = <-tick.C: case _ = <-tick.C:
break // FIXME disabled the below completely to test something //break // FIXME disabled the below completely to test something
pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line
if pdinfo != nil { if pdinfo != nil {
dinfo := *pdinfo dinfo := *pdinfo

View File

@ -12,6 +12,7 @@ package yggdrasil
// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason) // A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason)
import ( import (
"fmt"
"sort" "sort"
"time" "time"
) )
@ -73,6 +74,9 @@ func (s *searches) handleDHTRes(res *dhtRes) {
sinfo, isIn := s.searches[res.Dest] sinfo, isIn := s.searches[res.Dest]
if !isIn || s.checkDHTRes(sinfo, res) { if !isIn || s.checkDHTRes(sinfo, res) {
// Either we don't recognize this search, or we just finished it // Either we don't recognize this search, or we just finished it
if isIn {
fmt.Println("DEBUG: search finished, length:", len(sinfo.visited))
}
return return
} else { } else {
// Add to the search and continue // Add to the search and continue
@ -92,7 +96,7 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue continue
} }
if dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) { if true || dht_ordered(from.getNodeID(), info.getNodeID(), &res.Dest) {
sinfo.toVisit = append(sinfo.toVisit, info) sinfo.toVisit = append(sinfo.toVisit, info)
} }
} }
@ -107,11 +111,13 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
} }
// Sort // Sort
sort.SliceStable(sinfo.toVisit, func(i, j int) bool { sort.SliceStable(sinfo.toVisit, func(i, j int) bool {
// Should return true if i is closer to the destination than j
// FIXME for some reason it works better backwards, why?!
return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest) return dht_ordered(sinfo.toVisit[j].getNodeID(), sinfo.toVisit[i].getNodeID(), &res.Dest)
}) })
// Truncate to some maximum size // Truncate to some maximum size
if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE { if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE {
sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE] //sinfo.toVisit = sinfo.toVisit[:search_MAX_SEARCH_SIZE]
} }
} }
@ -121,6 +127,7 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup // Dead end, do cleanup
delete(s.searches, sinfo.dest) delete(s.searches, sinfo.dest)
fmt.Println("DEBUG: search abandoned, length:", len(sinfo.visited))
return return
} else { } else {
// Send to the next search target // Send to the next search target