Merge pull request #55 from Arceliar/dht

DHT peers/other partitioning
This commit is contained in:
Neil Alexander 2018-03-10 21:32:30 +00:00 committed by GitHub
commit 0dc1dd6292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 34 deletions

View File

@ -290,7 +290,16 @@ func (a *admin) getData_getDHT() []admin_nodeInfo {
getDHT := func() {
for i := 0; i < a.core.dht.nBuckets(); i++ {
b := a.core.dht.getBucket(i)
for _, v := range b.infos {
for _, v := range b.other {
addr := *address_addrForNodeID(v.getNodeID())
info := admin_nodeInfo{
{"IP", net.IP(addr[:]).String()},
{"coords", fmt.Sprint(v.coords)},
{"bucket", fmt.Sprint(i)},
}
infos = append(infos, info)
}
for _, v := range b.peers {
addr := *address_addrForNodeID(v.getNodeID())
info := admin_nodeInfo{
{"IP", net.IP(addr[:]).String()},

View File

@ -154,7 +154,8 @@ func (c *Core) DEBUG_getDHTSize() int {
total := 0
for bidx := 0; bidx < c.dht.nBuckets(); bidx++ {
b := c.dht.getBucket(bidx)
total += len(b.infos)
total += len(b.peers)
total += len(b.other)
}
return total
}

View File

@ -46,7 +46,8 @@ func (info *dhtInfo) getNodeID() *NodeID {
}
type bucket struct {
infos []*dhtInfo
peers []*dhtInfo
other []*dhtInfo
}
type dhtReq struct {
@ -100,7 +101,7 @@ func (t *dht) handleReq(req *dhtReq) {
key: req.key,
coords: req.coords,
}
t.insertIfNew(&info) // This seems DoSable (we just trust their coords...)
t.insertIfNew(&info, false) // This seems DoSable (we just trust their coords...)
//if req.dest != t.nodeID { t.ping(&info, info.getNodeID()) } // Or spam...
}
@ -125,13 +126,18 @@ func (t *dht) handleRes(res *dhtRes) {
return
}
b := t.getBucket(bidx)
for _, oldinfo := range b.infos {
for _, oldinfo := range b.peers {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
}
}
for _, oldinfo := range b.other {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
}
}
// Insert into table
t.insert(&rinfo)
t.insert(&rinfo, false)
if res.dest == *rinfo.getNodeID() {
return
} // No infinite recursions
@ -170,7 +176,8 @@ func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
var res []*dhtInfo
for bidx := 0; bidx < t.nBuckets(); bidx++ {
b := t.getBucket(bidx)
res = addInfos(res, b.infos)
res = addInfos(res, b.peers)
res = addInfos(res, b.other)
}
doSort := func(infos []*dhtInfo) {
less := func(i, j int) bool {
@ -195,7 +202,7 @@ func (t *dht) nBuckets() int {
return len(t.buckets_hidden)
}
func (t *dht) insertIfNew(info *dhtInfo) {
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
// Insert a peer if and only if the bucket doesn't already contain it
nodeID := info.getNodeID()
@ -210,11 +217,11 @@ func (t *dht) insertIfNew(info *dhtInfo) {
// (Is there another "natural" choice that bootstraps faster?)
info.send = time.Now()
info.recv = info.send
t.insert(info)
t.insert(info, isPeer)
}
}
func (t *dht) insert(info *dhtInfo) {
func (t *dht) insert(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insert:", info.getNodeID(), info.coords)
// First update the time on this info
info.recv = time.Now()
@ -228,18 +235,23 @@ func (t *dht) insert(info *dhtInfo) {
// First drop any existing entry from the bucket
b.drop(&info.key)
// Now add to the *end* of the bucket
b.infos = append(b.infos, info)
if isPeer {
// TODO make sure we don't duplicate peers in b.other too
b.peers = append(b.peers, info)
return
}
b.other = append(b.other, info)
// Check if the next bucket is non-full and return early if it is
if bidx+1 == t.nBuckets() {
return
}
bnext := t.getBucket(bidx + 1)
if len(bnext.infos) < dht_bucket_size {
if len(bnext.other) < dht_bucket_size {
return
}
// Shrink from the *front* to requied size
for len(b.infos) > dht_bucket_size {
b.infos = b.infos[1:]
for len(b.other) > dht_bucket_size {
b.other = b.other[1:]
}
}
@ -256,23 +268,34 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
func (b *bucket) contains(ninfo *dhtInfo) bool {
// Compares if key and coords match
for _, info := range b.infos {
if info == nil {
panic("Should never happen")
}
if info.key == ninfo.key {
if len(info.coords) != len(ninfo.coords) {
return false
var found bool
check := func(infos []*dhtInfo) {
for _, info := range infos {
if info == nil {
panic("Should never happen")
}
if info.key != info.key {
continue
}
if len(info.coords) != len(ninfo.coords) {
continue
}
match := true
for idx := 0; idx < len(info.coords); idx++ {
if info.coords[idx] != ninfo.coords[idx] {
return false
match = false
break
}
}
return true
if match {
found = true
break
}
}
}
return false
check(b.peers)
check(b.other)
return found
}
func (b *bucket) drop(key *boxPubKey) {
@ -286,7 +309,8 @@ func (b *bucket) drop(key *boxPubKey) {
}
return cleaned
}
b.infos = clean(b.infos)
b.peers = clean(b.peers)
b.other = clean(b.other)
}
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
@ -333,7 +357,7 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
}
func (b *bucket) isEmpty() bool {
return len(b.infos) == 0
return len(b.peers)+len(b.other) == 0
}
func (b *bucket) nextToPing() *dhtInfo {
@ -343,14 +367,18 @@ func (b *bucket) nextToPing() *dhtInfo {
// Gives them time to respond
// And time between traffic loss from short term congestion in the network
var toPing *dhtInfo
for _, next := range b.infos {
if time.Since(next.send) < 6*time.Second {
continue
}
if toPing == nil || next.recv.Before(toPing.recv) {
toPing = next
update := func(infos []*dhtInfo) {
for _, next := range infos {
if time.Since(next.send) < 6*time.Second {
continue
}
if toPing == nil || next.recv.Before(toPing.recv) {
toPing = next
}
}
}
update(b.peers)
update(b.other)
return toPing
}
@ -457,3 +485,9 @@ func dht_firstCloserThanThird(first *NodeID,
}
return false
}
func (t *dht) resetPeers() {
for _, b := range t.buckets_hidden {
b.peers = b.peers[:0]
}
}

View File

@ -77,9 +77,10 @@ func (r *router) mainLoop() {
case p := <-r.send:
r.sendPacket(p)
case info := <-r.core.dht.peers:
r.core.dht.insert(info) //r.core.dht.insertIfNew(info)
r.core.dht.insertIfNew(info, true)
case <-r.reset:
r.core.sessions.resetInits()
r.core.dht.resetPeers()
case <-ticker.C:
{
// Any periodic maintenance stuff goes here

View File

@ -113,7 +113,7 @@ func generateConfig(isAutoconf bool) *nodeConfig {
cfg.Listen = "[::]:0"
} else {
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534 - 32768) + 32768)
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)
}
cfg.AdminListen = "[::1]:9001"
cfg.BoxPub = hex.EncodeToString(bpub[:])