Merge pull request #18 from Arceliar/rumormill

Add a rumor mill to throttle dht maintenance traffic
This commit is contained in:
Arceliar 2018-02-17 17:25:51 -06:00 committed by GitHub
commit c0579024c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 39 deletions

View File

@ -26,6 +26,7 @@ func (n *Node) init(index int) {
n.core.Init() n.core.Init()
n.send = n.core.DEBUG_getSend() n.send = n.core.DEBUG_getSend()
n.recv = n.core.DEBUG_getRecv() n.recv = n.core.DEBUG_getRecv()
n.core.DEBUG_simFixMTU()
} }
func (n *Node) printTraffic() { func (n *Node) printTraffic() {
@ -424,7 +425,19 @@ func main() {
} }
*/ */
startNetwork(kstore) startNetwork(kstore)
if true { //time.Sleep(10*time.Second)
// Note that testPaths only works if pressure is turend off
// Otherwise congestion can lead to routing loops?
for finished := false; !finished; {
finished = testPaths(kstore)
}
pingNodes(kstore)
//pingBench(kstore) // Only after disabling debug output
//stressTest(kstore)
//time.Sleep(120*time.Second)
dumpDHTSize(kstore) // note that this uses racey functions to read things...
if false {
// This connects the sim to the local network
for _, node := range kstore { for _, node := range kstore {
node.startUDP("localhost:0") node.startUDP("localhost:0")
node.connectUDP("localhost:12345") node.connectUDP("localhost:12345")
@ -440,15 +453,4 @@ func main() {
var block chan struct{} var block chan struct{}
<-block <-block
} }
//time.Sleep(10*time.Second)
// Note that testPaths only works if pressure is turend off
// Otherwise congestion can lead to routing loops?
for finished := false; !finished; {
finished = testPaths(kstore)
}
pingNodes(kstore)
//pingBench(kstore) // Only after disabling debug output
//stressTest(kstore)
//time.Sleep(120*time.Second)
dumpDHTSize(kstore) // note that this uses racey functions to read things...
} }

View File

@ -369,3 +369,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
go p.linkLoop(plinkIn) go p.linkLoop(plinkIn)
go q.linkLoop(qlinkIn) go q.linkLoop(qlinkIn)
} }
func (c *Core) DEBUG_simFixMTU() {
c.tun.mtu = 65535
}

View File

@ -62,6 +62,11 @@ type dhtRes struct {
infos []*dhtInfo // response infos []*dhtInfo // response
} }
type dht_rumor struct {
info *dhtInfo
target *NodeID
}
type dht struct { type dht struct {
core *Core core *Core
nodeID NodeID nodeID NodeID
@ -69,6 +74,7 @@ type dht struct {
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time reqs map[boxPubKey]map[NodeID]time.Time
offset int offset int
rumorMill []dht_rumor
} }
func (t *dht) init(c *Core) { func (t *dht) init(c *Core) {
@ -143,7 +149,7 @@ func (t *dht) handleRes(res *dhtRes) {
if b.contains(info) { if b.contains(info) {
continue continue
} // wait for maintenance cycle to get them } // wait for maintenance cycle to get them
t.ping(info, info.getNodeID()) t.addToMill(info, info.getNodeID())
} }
} }
@ -379,6 +385,14 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
t.sendReq(&req, info) t.sendReq(&req, info)
} }
func (t *dht) addToMill(info *dhtInfo, target *NodeID) {
rumor := dht_rumor{
info: info,
target: target,
}
t.rumorMill = append(t.rumorMill, rumor)
}
func (t *dht) doMaintenance() { func (t *dht) doMaintenance() {
// First clean up reqs // First clean up reqs
for key, reqs := range t.reqs { for key, reqs := range t.reqs {
@ -391,36 +405,43 @@ func (t *dht) doMaintenance() {
delete(t.reqs, key) delete(t.reqs, key)
} }
} }
// Ping the least recently contacted node if len(t.rumorMill) == 0 {
// This is to make sure we eventually notice when someone times out // Ping the least recently contacted node
var oldest *dhtInfo // This is to make sure we eventually notice when someone times out
last := 0 var oldest *dhtInfo
for bidx := 0; bidx < t.nBuckets(); bidx++ { last := 0
b := t.getBucket(bidx) for bidx := 0; bidx < t.nBuckets(); bidx++ {
if !b.isEmpty() { b := t.getBucket(bidx)
last = bidx if !b.isEmpty() {
toPing := b.nextToPing() last = bidx
if toPing == nil { toPing := b.nextToPing()
continue if toPing == nil {
} // We've recently pinged everyone in b continue
if oldest == nil || toPing.recv.Before(oldest.recv) { } // We've recently pinged everyone in b
oldest = toPing if oldest == nil || toPing.recv.Before(oldest.recv) {
oldest = toPing
}
} }
} }
if oldest != nil {
t.addToMill(oldest, nil)
} // if the DHT isn't empty
// Refresh buckets
if t.offset > last {
t.offset = 0
}
target := t.getTarget(t.offset)
for _, info := range t.lookup(target) {
t.addToMill(info, target)
break
}
t.offset++
} }
if oldest != nil { if len(t.rumorMill) > 0 {
t.ping(oldest, nil) var rumor dht_rumor
} // if the DHT isn't empty rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
// Refresh buckets t.ping(rumor.info, rumor.target)
if t.offset > last {
t.offset = 0
} }
target := t.getTarget(t.offset)
for _, info := range t.lookup(target) {
t.ping(info, target)
break
}
t.offset++
} }
func dht_firstCloserThanThird(first *NodeID, func dht_firstCloserThanThird(first *NodeID,