From 9835c638180dc7a493c10ecb0a5e4772bacc145c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 23 Aug 2019 20:26:15 -0500 Subject: [PATCH] refactor things the router owns (dht, sessions, searches) into that struct, to make the ownership more explicit --- src/yggdrasil/api.go | 26 +++++++++++++------------- src/yggdrasil/conn.go | 8 ++++---- src/yggdrasil/core.go | 15 ++++++--------- src/yggdrasil/dht.go | 4 ++-- src/yggdrasil/listener.go | 4 ++-- src/yggdrasil/nodeinfo.go | 2 +- src/yggdrasil/router.go | 27 +++++++++++++++------------ src/yggdrasil/search.go | 20 ++++++++++---------- src/yggdrasil/session.go | 12 ++++++------ 9 files changed, 59 insertions(+), 59 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 785a5f72..f50c8ceb 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -156,11 +156,11 @@ func (c *Core) GetDHT() []DHTEntry { getDHT := func() { now := time.Now() var dhtentry []*dhtInfo - for _, v := range c.dht.table { + for _, v := range c.router.dht.table { dhtentry = append(dhtentry, v) } sort.SliceStable(dhtentry, func(i, j int) bool { - return dht_ordered(&c.dht.nodeID, dhtentry[i].getNodeID(), dhtentry[j].getNodeID()) + return dht_ordered(&c.router.dht.nodeID, dhtentry[i].getNodeID(), dhtentry[j].getNodeID()) }) for _, v := range dhtentry { info := DHTEntry{ @@ -208,7 +208,7 @@ func (c *Core) GetSwitchQueues() SwitchQueues { func (c *Core) GetSessions() []Session { var sessions []Session getSessions := func() { - for _, sinfo := range c.sessions.sinfos { + for _, sinfo := range c.router.sessions.sinfos { var session Session workerFunc := func() { session = Session{ @@ -243,17 +243,17 @@ func (c *Core) GetSessions() []Session { // ConnListen returns a listener for Yggdrasil session connections. func (c *Core) ConnListen() (*Listener, error) { - c.sessions.listenerMutex.Lock() - defer c.sessions.listenerMutex.Unlock() - if c.sessions.listener != nil { + c.router.sessions.listenerMutex.Lock() + defer c.router.sessions.listenerMutex.Unlock() + if c.router.sessions.listener != nil { return nil, errors.New("a listener already exists") } - c.sessions.listener = &Listener{ + c.router.sessions.listener = &Listener{ core: c, conn: make(chan *Conn), close: make(chan interface{}), } - return c.sessions.listener, nil + return c.router.sessions.listener, nil } // ConnDialer returns a dialer for Yggdrasil session connections. @@ -356,10 +356,10 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) // received an incoming session request. The function should return true to // allow the session or false to reject it. func (c *Core) SetSessionGatekeeper(f func(pubkey *crypto.BoxPubKey, initiator bool) bool) { - c.sessions.isAllowedMutex.Lock() - defer c.sessions.isAllowedMutex.Unlock() + c.router.sessions.isAllowedMutex.Lock() + defer c.router.sessions.isAllowedMutex.Unlock() - c.sessions.isAllowedHandler = f + c.router.sessions.isAllowedHandler = f } // SetLogger sets the output logger of the Yggdrasil node after startup. This @@ -445,10 +445,10 @@ func (c *Core) DHTPing(key crypto.BoxPubKey, coords []uint64, target *crypto.Nod } rq := dhtReqKey{info.key, *target} sendPing := func() { - c.dht.addCallback(&rq, func(res *dhtRes) { + c.router.dht.addCallback(&rq, func(res *dhtRes) { resCh <- res }) - c.dht.ping(&info, &rq.dest) + c.router.dht.ping(&info, &rq.dest) } c.router.doAdmin(sendPing) // TODO: do something better than the below... diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 51dd9509..b4ce71f9 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -84,7 +84,7 @@ func (c *Conn) String() string { func (c *Conn) search() error { var sinfo *searchInfo var isIn bool - c.core.router.doAdmin(func() { sinfo, isIn = c.core.searches.searches[*c.nodeID] }) + c.core.router.doAdmin(func() { sinfo, isIn = c.core.router.searches.searches[*c.nodeID] }) if !isIn { done := make(chan struct{}, 1) var sess *sessionInfo @@ -99,7 +99,7 @@ func (c *Conn) search() error { } } c.core.router.doAdmin(func() { - sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo.continueSearch() }) <-done @@ -124,11 +124,11 @@ func (c *Conn) search() error { func (c *Conn) doSearch() { routerWork := func() { // Check to see if there is a search already matching the destination - sinfo, isIn := c.core.searches.searches[*c.nodeID] + sinfo, isIn := c.core.router.searches.searches[*c.nodeID] if !isIn { // Nothing was found, so create a new search searchCompleted := func(sinfo *sessionInfo, e error) {} - sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) // Start the search sinfo.continueSearch() diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 0921ab9f..870597a9 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -26,10 +26,7 @@ type Core struct { sigPriv crypto.SigPrivKey switchTable switchTable peers peers - sessions sessions router router - dht dht - searches searches link link log *log.Logger } @@ -76,9 +73,9 @@ func (c *Core) init() error { c.log.Warnln("SigningPublicKey in config is incorrect, should be", sp) } - c.searches.init(c) - c.dht.init(c) - c.sessions.init(c) + c.router.searches.init(c) + c.router.dht.init(c) + c.router.sessions.init(c) c.peers.init(c) c.router.init(c) c.switchTable.init(c) // TODO move before peers? before router? @@ -124,9 +121,9 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) { errors := 0 components := []chan chan error{ - c.searches.reconfigure, - c.dht.reconfigure, - c.sessions.reconfigure, + c.router.searches.reconfigure, + c.router.dht.reconfigure, + c.router.sessions.reconfigure, c.peers.reconfigure, c.router.reconfigure, c.switchTable.reconfigure, diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index d35d3aaf..d0d98558 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -221,7 +221,7 @@ func (t *dht) handleReq(req *dhtReq) { func (t *dht) sendRes(res *dhtRes, req *dhtReq) { // Send a reply for a dhtReq bs := res.encode() - shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key) + shared := t.core.router.sessions.getSharedKey(&t.core.boxPriv, &req.Key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: req.Coords, @@ -285,7 +285,7 @@ func (t *dht) handleRes(res *dhtRes) { func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { // Send a dhtReq to the node in dhtInfo bs := req.encode() - shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key) + shared := t.core.router.sessions.getSharedKey(&t.core.boxPriv, &dest.key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: dest.coords, diff --git a/src/yggdrasil/listener.go b/src/yggdrasil/listener.go index 62225412..fec543f4 100644 --- a/src/yggdrasil/listener.go +++ b/src/yggdrasil/listener.go @@ -31,8 +31,8 @@ func (l *Listener) Close() (err error) { recover() err = errors.New("already closed") }() - if l.core.sessions.listener == l { - l.core.sessions.listener = nil + if l.core.router.sessions.listener == l { + l.core.router.sessions.listener = nil } close(l.close) close(l.conn) diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 73d4e115..50f5bf9c 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -172,7 +172,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse NodeInfo: m.getNodeInfo(), } bs := nodeinfo.encode() - shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key) + shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key) payload, nonce := crypto.BoxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ Coords: coords, diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 95521841..ed919983 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -43,15 +43,18 @@ type router struct { addr address.Address subnet address.Subnet out func([]byte) // packets we're sending to the network, link to peer's "in" + dht dht nodeinfo nodeinfo + searches searches + sessions sessions } // Initializes the router struct, which includes setting up channels to/from the adapter. func (r *router) init(core *Core) { r.core = core r.reconfigure = make(chan chan error, 1) - r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) - r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) + r.addr = *address.AddrForNodeID(&r.dht.nodeID) + r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) self := linkInterface{ name: "(self)", info: linkInfo{ @@ -91,15 +94,15 @@ func (r *router) handlePackets(from phony.IActor, packets [][]byte) { // Insert a peer info into the dht, TODO? make the dht a separate actor func (r *router) insertPeer(from phony.IActor, info *dhtInfo) { r.EnqueueFrom(from, func() { - r.core.dht.insertPeer(info) + r.dht.insertPeer(info) }) } // Reset sessions and DHT after the switch sees our coords change func (r *router) reset(from phony.IActor) { r.EnqueueFrom(from, func() { - r.core.sessions.reset(r) - r.core.dht.reset() + r.sessions.reset() + r.dht.reset() }) } @@ -114,8 +117,8 @@ func (r *router) _mainLoop() { <-r.SyncExec(func() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() - r.core.dht.doMaintenance() - r.core.sessions.cleanup() + r.dht.doMaintenance() + r.sessions.cleanup() }) case e := <-r.reconfigure: <-r.SyncExec(func() { @@ -149,7 +152,7 @@ func (r *router) _handleTraffic(packet []byte) { if !p.decode(packet) { return } - sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle) + sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle) if !isIn { util.PutBytes(p.Payload) return @@ -172,7 +175,7 @@ func (r *router) _handleProto(packet []byte) { var sharedKey *crypto.BoxSharedKey if p.ToKey == r.core.boxPub { // Try to open using our permanent key - sharedKey = r.core.sessions.getSharedKey(&r.core.boxPriv, &p.FromKey) + sharedKey = r.sessions.getSharedKey(&r.core.boxPriv, &p.FromKey) } else { return } @@ -212,7 +215,7 @@ func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey) { return } ping.SendPermPub = *fromKey - r.core.sessions.handlePing(&ping) + r.sessions.handlePing(&ping) } // Handles session pongs (which are really pings with an extra flag to prevent acknowledgement). @@ -227,7 +230,7 @@ func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey) { return } req.Key = *fromKey - r.core.dht.handleReq(&req) + r.dht.handleReq(&req) } // Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). @@ -237,7 +240,7 @@ func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) { return } res.Key = *fromKey - r.core.dht.handleRes(&res) + r.dht.handleRes(&res) } // Decodes nodeinfo request diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 56adda80..5751043f 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -100,7 +100,7 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) { from := dhtInfo{key: res.Key, coords: res.Coords} sinfo.visited[*from.getNodeID()] = true for _, info := range res.Infos { - if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { + if *info.getNodeID() == sinfo.core.router.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { @@ -134,7 +134,7 @@ func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { if time.Since(sinfo.time) > search_RETRY_TIME { // Dead end and no response in too long, do cleanup - delete(sinfo.core.searches.searches, sinfo.dest) + delete(sinfo.core.router.searches.searches, sinfo.dest) sinfo.callback(nil, errors.New("search reached dead end")) } return @@ -143,8 +143,8 @@ func (sinfo *searchInfo) doSearchStep() { var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] rq := dhtReqKey{next.key, sinfo.dest} - sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) - sinfo.core.dht.ping(next, &sinfo.dest) + sinfo.core.router.dht.addCallback(&rq, sinfo.handleDHTRes) + sinfo.core.router.dht.ping(next, &sinfo.dest) sinfo.time = time.Now() } @@ -157,7 +157,7 @@ func (sinfo *searchInfo) continueSearch() { // Any that die aren't restarted, but a new one will start later retryLater := func() { // FIXME this keeps the search alive forever if not for the searches map, fix that - newSearchInfo := sinfo.core.searches.searches[sinfo.dest] + newSearchInfo := sinfo.core.router.searches.searches[sinfo.dest] if newSearchInfo != sinfo { return } @@ -196,17 +196,17 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { return false } // They match, so create a session and send a sessionRequest - sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) + sess, isIn := sinfo.core.router.sessions.getByTheirPerm(&res.Key) if !isIn { - sess = sinfo.core.sessions.createSession(&res.Key) + sess = sinfo.core.router.sessions.createSession(&res.Key) if sess == nil { // nil if the DHT search finished but the session wasn't allowed sinfo.callback(nil, errors.New("session not allowed")) // Cleanup - delete(sinfo.core.searches.searches, res.Dest) + delete(sinfo.core.router.searches.searches, res.Dest) return true } - _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) + _, isIn := sinfo.core.router.sessions.getByTheirPerm(&res.Key) if !isIn { panic("This should never happen") } @@ -216,6 +216,6 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { sess.ping(&sinfo.core.router) sinfo.callback(sess, nil) // Cleanup - delete(sinfo.core.searches.searches, res.Dest) + delete(sinfo.core.router.searches.searches, res.Dest) return true } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index f28e014a..02d4dc82 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -261,7 +261,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { // Run cleanup when the session is canceled <-sinfo.cancel.Finished() sinfo.core.router.doAdmin(func() { - sinfo.core.sessions.removeSession(&sinfo) + sinfo.core.router.sessions.removeSession(&sinfo) }) }() go sinfo.startWorkers() @@ -298,9 +298,9 @@ func (ss *sessions) cleanup() { // Closes a session, removing it from sessions maps. func (ss *sessions) removeSession(sinfo *sessionInfo) { - if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo { - delete(sinfo.core.sessions.sinfos, sinfo.myHandle) - delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) + if s := sinfo.core.router.sessions.sinfos[sinfo.myHandle]; s == sinfo { + delete(sinfo.core.router.sessions.sinfos, sinfo.myHandle) + delete(sinfo.core.router.sessions.byTheirPerm, sinfo.theirPermPub) } } @@ -466,9 +466,9 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Resets all sessions to an uninitialized state. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. -func (ss *sessions) reset(from phony.IActor) { +func (ss *sessions) reset() { for _, sinfo := range ss.sinfos { - sinfo.EnqueueFrom(from, func() { + sinfo.EnqueueFrom(&ss.core.router, func() { sinfo.reset = true }) }