From da44ec282f84e0035701ee9e6aabbbb0ff1bc761 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 17:10:08 -0600 Subject: [PATCH] Add a rumor mill to throttle dht maintenance traffic --- misc/sim/treesim.go | 26 ++++++++------- src/yggdrasil/debug.go | 4 +++ src/yggdrasil/dht.go | 75 +++++++++++++++++++++++++++--------------- 3 files changed, 66 insertions(+), 39 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index afb681e7..9f1802b5 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -26,6 +26,7 @@ func (n *Node) init(index int) { n.core.Init() n.send = n.core.DEBUG_getSend() n.recv = n.core.DEBUG_getRecv() + n.core.DEBUG_simFixMTU() } func (n *Node) printTraffic() { @@ -424,7 +425,19 @@ func main() { } */ startNetwork(kstore) - if true { + //time.Sleep(10*time.Second) + // Note that testPaths only works if pressure is turend off + // Otherwise congestion can lead to routing loops? + for finished := false; !finished; { + finished = testPaths(kstore) + } + pingNodes(kstore) + //pingBench(kstore) // Only after disabling debug output + //stressTest(kstore) + //time.Sleep(120*time.Second) + dumpDHTSize(kstore) // note that this uses racey functions to read things... + if false { + // This connects the sim to the local network for _, node := range kstore { node.startUDP("localhost:0") node.connectUDP("localhost:12345") @@ -440,15 +453,4 @@ func main() { var block chan struct{} <-block } - //time.Sleep(10*time.Second) - // Note that testPaths only works if pressure is turend off - // Otherwise congestion can lead to routing loops? - for finished := false; !finished; { - finished = testPaths(kstore) - } - pingNodes(kstore) - //pingBench(kstore) // Only after disabling debug output - //stressTest(kstore) - //time.Sleep(120*time.Second) - dumpDHTSize(kstore) // note that this uses racey functions to read things... } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 15ba491b..79fa0596 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -369,3 +369,7 @@ func DEBUG_simLinkPeers(p, q *peer) { go p.linkLoop(plinkIn) go q.linkLoop(qlinkIn) } + +func (c *Core) DEBUG_simFixMTU() { + c.tun.mtu = 65535 +} diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 1b8dcf71..6e859abb 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -62,6 +62,11 @@ type dhtRes struct { infos []*dhtInfo // response } +type dht_rumor struct { + info *dhtInfo + target *NodeID +} + type dht struct { core *Core nodeID NodeID @@ -69,6 +74,7 @@ type dht struct { peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[boxPubKey]map[NodeID]time.Time offset int + rumorMill []dht_rumor } func (t *dht) init(c *Core) { @@ -143,7 +149,7 @@ func (t *dht) handleRes(res *dhtRes) { if b.contains(info) { continue } // wait for maintenance cycle to get them - t.ping(info, info.getNodeID()) + t.addToMill(info, info.getNodeID()) } } @@ -379,6 +385,14 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { t.sendReq(&req, info) } +func (t *dht) addToMill(info *dhtInfo, target *NodeID) { + rumor := dht_rumor{ + info: info, + target: target, + } + t.rumorMill = append(t.rumorMill, rumor) +} + func (t *dht) doMaintenance() { // First clean up reqs for key, reqs := range t.reqs { @@ -391,36 +405,43 @@ func (t *dht) doMaintenance() { delete(t.reqs, key) } } - // Ping the least recently contacted node - // This is to make sure we eventually notice when someone times out - var oldest *dhtInfo - last := 0 - for bidx := 0; bidx < t.nBuckets(); bidx++ { - b := t.getBucket(bidx) - if !b.isEmpty() { - last = bidx - toPing := b.nextToPing() - if toPing == nil { - continue - } // We've recently pinged everyone in b - if oldest == nil || toPing.recv.Before(oldest.recv) { - oldest = toPing + if len(t.rumorMill) == 0 { + // Ping the least recently contacted node + // This is to make sure we eventually notice when someone times out + var oldest *dhtInfo + last := 0 + for bidx := 0; bidx < t.nBuckets(); bidx++ { + b := t.getBucket(bidx) + if !b.isEmpty() { + last = bidx + toPing := b.nextToPing() + if toPing == nil { + continue + } // We've recently pinged everyone in b + if oldest == nil || toPing.recv.Before(oldest.recv) { + oldest = toPing + } } } + if oldest != nil { + t.addToMill(oldest, nil) + } // if the DHT isn't empty + // Refresh buckets + if t.offset > last { + t.offset = 0 + } + target := t.getTarget(t.offset) + for _, info := range t.lookup(target) { + t.addToMill(info, target) + break + } + t.offset++ } - if oldest != nil { - t.ping(oldest, nil) - } // if the DHT isn't empty - // Refresh buckets - if t.offset > last { - t.offset = 0 + if len(t.rumorMill) > 0 { + var rumor dht_rumor + rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:] + t.ping(rumor.info, rumor.target) } - target := t.getTarget(t.offset) - for _, info := range t.lookup(target) { - t.ping(info, target) - break - } - t.offset++ } func dht_firstCloserThanThird(first *NodeID,