From 7954fa3c33cbb7632a2bd24caf414221925440ef Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 25 Nov 2018 17:08:45 -0600 Subject: [PATCH] store one callback instead of many, needed to prevent search failures if there are multiple outstanding packets --- src/yggdrasil/dht.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e49e343a..6e08ad23 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -59,9 +59,9 @@ type dhtReqKey struct { type dht struct { core *Core nodeID NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey][]func(*dhtRes) // Search and admin lookup callbacks + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[NodeID]*dhtInfo imp []*dhtInfo @@ -72,7 +72,7 @@ func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) - t.callbacks = make(map[dhtReqKey][]func(*dhtRes)) + t.callbacks = make(map[dhtReqKey]dht_callbackInfo) t.reset() } @@ -202,26 +202,25 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { t.core.router.out(packet) } +type dht_callbackInfo struct { + f func(*dhtRes) + time time.Time +} + // Adds a callback and removes it after some timeout. func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { - t.callbacks[*rq] = append(t.callbacks[*rq], callback) - go func() { - time.Sleep(6 * time.Second) - t.core.router.admin <- func() { - delete(t.callbacks, *rq) - } - }() + info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} + t.callbacks[*rq] = info } // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { rq := dhtReqKey{res.Key, res.Dest} - for _, callback := range t.callbacks[rq] { - callback(res) + if callback, isIn := t.callbacks[rq]; isIn { + callback.f(res) + delete(t.callbacks, rq) } - delete(t.callbacks, rq) - t.core.searches.handleDHTRes(res) _, isIn := t.reqs[rq] if !isIn { return @@ -263,7 +262,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { } packet := p.encode() t.core.router.out(packet) - rq := dhtReqKey{req.Key, req.Dest} + rq := dhtReqKey{dest.key, req.Dest} t.reqs[rq] = time.Now() } @@ -293,9 +292,11 @@ func (t *dht) doMaintenance() { } } t.reqs = newReqs - newCallbacks := make(map[dhtReqKey][]func(*dhtRes), len(t.callbacks)) + newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) for key, callback := range t.callbacks { - newCallbacks[key] = callback + if now.Before(callback.time) { + newCallbacks[key] = callback + } } t.callbacks = newCallbacks for infoID, info := range t.table {