Merge pull request #79 from Arceliar/dht

DHT and Switch updates
This commit is contained in:
Neil Alexander 2018-05-17 13:59:38 +01:00 committed by GitHub
commit edf8f2e239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 38 deletions

View File

@ -6,6 +6,7 @@ import "os"
import "strings" import "strings"
import "strconv" import "strconv"
import "time" import "time"
import "log"
import "runtime/pprof" import "runtime/pprof"
import "flag" import "flag"
@ -418,12 +419,12 @@ func main() {
idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt") idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt")
//idxstore := loadGraph("skitter") //idxstore := loadGraph("skitter")
kstore := getKeyedStore(idxstore) kstore := getKeyedStore(idxstore)
/* //*
logger := log.New(os.Stderr, "", log.Flags())
for _, n := range kstore { for _, n := range kstore {
log := n.core.DEBUG_getLogger() n.core.DEBUG_setLogger(logger)
log.SetOutput(os.Stderr)
} }
*/ //*/
startNetwork(kstore) startNetwork(kstore)
//time.Sleep(10*time.Second) //time.Sleep(10*time.Second)
// Note that testPaths only works if pressure is turend off // Note that testPaths only works if pressure is turend off

View File

@ -25,9 +25,9 @@ import "time"
// Maximum size for buckets and lookups // Maximum size for buckets and lookups
// Exception for buckets if the next one is non-full // Exception for buckets if the next one is non-full
const dht_bucket_size = 2 // This should be at least 2
const dht_lookup_size = 2 // This should be at least 1, below 2 is impractical
const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed
const dht_bucket_size = 2 // This should be at least 2
const dht_lookup_size = 16 // This should be at least 1, below 2 is impractical
type dhtInfo struct { type dhtInfo struct {
nodeID_hidden *NodeID nodeID_hidden *NodeID
@ -141,23 +141,16 @@ func (t *dht) handleRes(res *dhtRes) {
if res.dest == *rinfo.getNodeID() { if res.dest == *rinfo.getNodeID() {
return return
} // No infinite recursions } // No infinite recursions
// ping the nodes we were told about
if len(res.infos) > dht_lookup_size { if len(res.infos) > dht_lookup_size {
// Ignore any "extra" lookup results // Ignore any "extra" lookup results
res.infos = res.infos[:dht_lookup_size] res.infos = res.infos[:dht_lookup_size]
} }
for _, info := range res.infos { for _, info := range res.infos {
bidx, isOK := t.getBucketIndex(info.getNodeID()) if dht_firstCloserThanThird(info.getNodeID(), &res.dest, rinfo.getNodeID()) {
if !isOK {
continue
}
b := t.getBucket(bidx)
if b.contains(info) {
continue
} // wait for maintenance cycle to get them
t.addToMill(info, info.getNodeID()) t.addToMill(info, info.getNodeID())
} }
} }
}
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo { func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
// FIXME this allocates a bunch, sorts, and keeps the part it likes // FIXME this allocates a bunch, sorts, and keeps the part it likes
@ -167,7 +160,7 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
if info == nil { if info == nil {
panic("Should never happen!") panic("Should never happen!")
} }
if true || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) { if dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
res = append(res, info) res = append(res, info)
} }
} }
@ -241,14 +234,6 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) {
return return
} }
b.other = append(b.other, info) b.other = append(b.other, info)
// Check if the next bucket is non-full and return early if it is
if bidx+1 == t.nBuckets() {
return
}
bnext := t.getBucket(bidx + 1)
if len(bnext.other) < dht_bucket_size {
return
}
// Shrink from the *front* to requied size // Shrink from the *front* to requied size
for len(b.other) > dht_bucket_size { for len(b.other) > dht_bucket_size {
b.other = b.other[1:] b.other = b.other[1:]
@ -467,10 +452,36 @@ func (t *dht) doMaintenance() {
} }
t.offset++ t.offset++
} }
if len(t.rumorMill) > 0 { for len(t.rumorMill) > 0 {
var rumor dht_rumor var rumor dht_rumor
rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:] rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
if rumor.target == rumor.info.getNodeID() {
// Note that the above is a pointer comparison, and target can be nil
// This is only for adding new nodes (learned from other lookups)
// It only makes sense to ping if the node isn't already in the table
bidx, isOK := t.getBucketIndex(rumor.info.getNodeID())
if !isOK {
continue
}
b := t.getBucket(bidx)
if b.contains(rumor.info) {
// Already know about this node
continue
}
// This is a good spot to check if a node is worth pinging
doPing := len(b.other) < dht_bucket_size
for _, info := range b.other {
if dht_firstCloserThanThird(rumor.info.getNodeID(), &t.nodeID, info.getNodeID()) {
// Add the node if they are closer to us than someone in the same bucket
doPing = true
}
}
if !doPing {
continue
}
}
t.ping(rumor.info, rumor.target) t.ping(rumor.info, rumor.target)
break
} }
} }
@ -488,8 +499,11 @@ func dht_firstCloserThanThird(first *NodeID,
return false return false
} }
func (t *dht) resetPeers() { func (t *dht) reset() {
// This is mostly so bootstrapping will reset to resend coords into the network
for _, b := range t.buckets_hidden { for _, b := range t.buckets_hidden {
b.peers = b.peers[:0] b.peers = b.peers[:0]
b.other = b.other[:0]
} }
t.offset = 0
} }

View File

@ -80,7 +80,7 @@ func (r *router) mainLoop() {
r.core.dht.insertIfNew(info, true) r.core.dht.insertIfNew(info, true)
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.resetInits()
r.core.dht.resetPeers() r.core.dht.reset()
case <-ticker.C: case <-ticker.C:
{ {
// Any periodic maintenance stuff goes here // Any periodic maintenance stuff goes here

View File

@ -128,7 +128,6 @@ type switchMessage struct {
type switchPort uint64 type switchPort uint64
type tableElem struct { type tableElem struct {
port switchPort port switchPort
firstSeen time.Time
locator switchLocator locator switchLocator
} }
@ -304,7 +303,6 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
doUpdate := false doUpdate := false
if !equiv(&msg.locator, &oldSender.locator) { if !equiv(&msg.locator, &oldSender.locator) {
doUpdate = true doUpdate = true
sender.firstSeen = now
} }
t.data.peers[fromPort] = sender t.data.peers[fromPort] = sender
updateRoot := false updateRoot := false
@ -355,7 +353,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
case t.core.router.reset <- struct{}{}: case t.core.router.reset <- struct{}{}:
default: default:
} }
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) //t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords)
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) //fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
} }
if t.data.locator.tstamp != msg.locator.tstamp { if t.data.locator.tstamp != msg.locator.tstamp {
@ -396,9 +394,6 @@ func (t *switchTable) updateTable() {
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems = append(newTable.elems, tableElem{ newTable.elems = append(newTable.elems, tableElem{
locator: loc, locator: loc,
//degree: pinfo.degree,
firstSeen: pinfo.firstSeen,
//forward: pinfo.forward,
port: pinfo.port, port: pinfo.port,
}) })
} }