refactor things the router owns (dht, sessions, searches) into that struct, to make the ownership more explicit

This commit is contained in:
Arceliar 2019-08-23 20:26:15 -05:00
parent bbcbbaf3b1
commit 9835c63818
9 changed files with 59 additions and 59 deletions

View File

@ -156,11 +156,11 @@ func (c *Core) GetDHT() []DHTEntry {
getDHT := func() {
now := time.Now()
var dhtentry []*dhtInfo
for _, v := range c.dht.table {
for _, v := range c.router.dht.table {
dhtentry = append(dhtentry, v)
}
sort.SliceStable(dhtentry, func(i, j int) bool {
return dht_ordered(&c.dht.nodeID, dhtentry[i].getNodeID(), dhtentry[j].getNodeID())
return dht_ordered(&c.router.dht.nodeID, dhtentry[i].getNodeID(), dhtentry[j].getNodeID())
})
for _, v := range dhtentry {
info := DHTEntry{
@ -208,7 +208,7 @@ func (c *Core) GetSwitchQueues() SwitchQueues {
func (c *Core) GetSessions() []Session {
var sessions []Session
getSessions := func() {
for _, sinfo := range c.sessions.sinfos {
for _, sinfo := range c.router.sessions.sinfos {
var session Session
workerFunc := func() {
session = Session{
@ -243,17 +243,17 @@ func (c *Core) GetSessions() []Session {
// ConnListen returns a listener for Yggdrasil session connections.
func (c *Core) ConnListen() (*Listener, error) {
c.sessions.listenerMutex.Lock()
defer c.sessions.listenerMutex.Unlock()
if c.sessions.listener != nil {
c.router.sessions.listenerMutex.Lock()
defer c.router.sessions.listenerMutex.Unlock()
if c.router.sessions.listener != nil {
return nil, errors.New("a listener already exists")
}
c.sessions.listener = &Listener{
c.router.sessions.listener = &Listener{
core: c,
conn: make(chan *Conn),
close: make(chan interface{}),
}
return c.sessions.listener, nil
return c.router.sessions.listener, nil
}
// ConnDialer returns a dialer for Yggdrasil session connections.
@ -356,10 +356,10 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool)
// received an incoming session request. The function should return true to
// allow the session or false to reject it.
func (c *Core) SetSessionGatekeeper(f func(pubkey *crypto.BoxPubKey, initiator bool) bool) {
c.sessions.isAllowedMutex.Lock()
defer c.sessions.isAllowedMutex.Unlock()
c.router.sessions.isAllowedMutex.Lock()
defer c.router.sessions.isAllowedMutex.Unlock()
c.sessions.isAllowedHandler = f
c.router.sessions.isAllowedHandler = f
}
// SetLogger sets the output logger of the Yggdrasil node after startup. This
@ -445,10 +445,10 @@ func (c *Core) DHTPing(key crypto.BoxPubKey, coords []uint64, target *crypto.Nod
}
rq := dhtReqKey{info.key, *target}
sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) {
c.router.dht.addCallback(&rq, func(res *dhtRes) {
resCh <- res
})
c.dht.ping(&info, &rq.dest)
c.router.dht.ping(&info, &rq.dest)
}
c.router.doAdmin(sendPing)
// TODO: do something better than the below...

View File

@ -84,7 +84,7 @@ func (c *Conn) String() string {
func (c *Conn) search() error {
var sinfo *searchInfo
var isIn bool
c.core.router.doAdmin(func() { sinfo, isIn = c.core.searches.searches[*c.nodeID] })
c.core.router.doAdmin(func() { sinfo, isIn = c.core.router.searches.searches[*c.nodeID] })
if !isIn {
done := make(chan struct{}, 1)
var sess *sessionInfo
@ -99,7 +99,7 @@ func (c *Conn) search() error {
}
}
c.core.router.doAdmin(func() {
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo.continueSearch()
})
<-done
@ -124,11 +124,11 @@ func (c *Conn) search() error {
func (c *Conn) doSearch() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
sinfo, isIn := c.core.router.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Start the search
sinfo.continueSearch()

View File

@ -26,10 +26,7 @@ type Core struct {
sigPriv crypto.SigPrivKey
switchTable switchTable
peers peers
sessions sessions
router router
dht dht
searches searches
link link
log *log.Logger
}
@ -76,9 +73,9 @@ func (c *Core) init() error {
c.log.Warnln("SigningPublicKey in config is incorrect, should be", sp)
}
c.searches.init(c)
c.dht.init(c)
c.sessions.init(c)
c.router.searches.init(c)
c.router.dht.init(c)
c.router.sessions.init(c)
c.peers.init(c)
c.router.init(c)
c.switchTable.init(c) // TODO move before peers? before router?
@ -124,9 +121,9 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
errors := 0
components := []chan chan error{
c.searches.reconfigure,
c.dht.reconfigure,
c.sessions.reconfigure,
c.router.searches.reconfigure,
c.router.dht.reconfigure,
c.router.sessions.reconfigure,
c.peers.reconfigure,
c.router.reconfigure,
c.switchTable.reconfigure,

View File

@ -221,7 +221,7 @@ func (t *dht) handleReq(req *dhtReq) {
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
// Send a reply for a dhtReq
bs := res.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
shared := t.core.router.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
payload, nonce := crypto.BoxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: req.Coords,
@ -285,7 +285,7 @@ func (t *dht) handleRes(res *dhtRes) {
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
// Send a dhtReq to the node in dhtInfo
bs := req.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
shared := t.core.router.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
payload, nonce := crypto.BoxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: dest.coords,

View File

@ -31,8 +31,8 @@ func (l *Listener) Close() (err error) {
recover()
err = errors.New("already closed")
}()
if l.core.sessions.listener == l {
l.core.sessions.listener = nil
if l.core.router.sessions.listener == l {
l.core.router.sessions.listener = nil
}
close(l.close)
close(l.conn)

View File

@ -172,7 +172,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
NodeInfo: m.getNodeInfo(),
}
bs := nodeinfo.encode()
shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key)
shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key)
payload, nonce := crypto.BoxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: coords,

View File

@ -43,15 +43,18 @@ type router struct {
addr address.Address
subnet address.Subnet
out func([]byte) // packets we're sending to the network, link to peer's "in"
dht dht
nodeinfo nodeinfo
searches searches
sessions sessions
}
// Initializes the router struct, which includes setting up channels to/from the adapter.
func (r *router) init(core *Core) {
r.core = core
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
r.addr = *address.AddrForNodeID(&r.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.dht.nodeID)
self := linkInterface{
name: "(self)",
info: linkInfo{
@ -91,15 +94,15 @@ func (r *router) handlePackets(from phony.IActor, packets [][]byte) {
// Insert a peer info into the dht, TODO? make the dht a separate actor
func (r *router) insertPeer(from phony.IActor, info *dhtInfo) {
r.EnqueueFrom(from, func() {
r.core.dht.insertPeer(info)
r.dht.insertPeer(info)
})
}
// Reset sessions and DHT after the switch sees our coords change
func (r *router) reset(from phony.IActor) {
r.EnqueueFrom(from, func() {
r.core.sessions.reset(r)
r.core.dht.reset()
r.sessions.reset()
r.dht.reset()
})
}
@ -114,8 +117,8 @@ func (r *router) _mainLoop() {
<-r.SyncExec(func() {
// Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance()
r.core.sessions.cleanup()
r.dht.doMaintenance()
r.sessions.cleanup()
})
case e := <-r.reconfigure:
<-r.SyncExec(func() {
@ -149,7 +152,7 @@ func (r *router) _handleTraffic(packet []byte) {
if !p.decode(packet) {
return
}
sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle)
sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle)
if !isIn {
util.PutBytes(p.Payload)
return
@ -172,7 +175,7 @@ func (r *router) _handleProto(packet []byte) {
var sharedKey *crypto.BoxSharedKey
if p.ToKey == r.core.boxPub {
// Try to open using our permanent key
sharedKey = r.core.sessions.getSharedKey(&r.core.boxPriv, &p.FromKey)
sharedKey = r.sessions.getSharedKey(&r.core.boxPriv, &p.FromKey)
} else {
return
}
@ -212,7 +215,7 @@ func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey) {
return
}
ping.SendPermPub = *fromKey
r.core.sessions.handlePing(&ping)
r.sessions.handlePing(&ping)
}
// Handles session pongs (which are really pings with an extra flag to prevent acknowledgement).
@ -227,7 +230,7 @@ func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey) {
return
}
req.Key = *fromKey
r.core.dht.handleReq(&req)
r.dht.handleReq(&req)
}
// Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable).
@ -237,7 +240,7 @@ func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) {
return
}
res.Key = *fromKey
r.core.dht.handleRes(&res)
r.dht.handleRes(&res)
}
// Decodes nodeinfo request

View File

@ -100,7 +100,7 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) {
from := dhtInfo{key: res.Key, coords: res.Coords}
sinfo.visited[*from.getNodeID()] = true
for _, info := range res.Infos {
if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
if *info.getNodeID() == sinfo.core.router.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue
}
if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) {
@ -134,7 +134,7 @@ func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 {
if time.Since(sinfo.time) > search_RETRY_TIME {
// Dead end and no response in too long, do cleanup
delete(sinfo.core.searches.searches, sinfo.dest)
delete(sinfo.core.router.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
}
return
@ -143,8 +143,8 @@ func (sinfo *searchInfo) doSearchStep() {
var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
rq := dhtReqKey{next.key, sinfo.dest}
sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.dht.ping(next, &sinfo.dest)
sinfo.core.router.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.router.dht.ping(next, &sinfo.dest)
sinfo.time = time.Now()
}
@ -157,7 +157,7 @@ func (sinfo *searchInfo) continueSearch() {
// Any that die aren't restarted, but a new one will start later
retryLater := func() {
// FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.core.searches.searches[sinfo.dest]
newSearchInfo := sinfo.core.router.searches.searches[sinfo.dest]
if newSearchInfo != sinfo {
return
}
@ -196,17 +196,17 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
return false
}
// They match, so create a session and send a sessionRequest
sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
sess, isIn := sinfo.core.router.sessions.getByTheirPerm(&res.Key)
if !isIn {
sess = sinfo.core.sessions.createSession(&res.Key)
sess = sinfo.core.router.sessions.createSession(&res.Key)
if sess == nil {
// nil if the DHT search finished but the session wasn't allowed
sinfo.callback(nil, errors.New("session not allowed"))
// Cleanup
delete(sinfo.core.searches.searches, res.Dest)
delete(sinfo.core.router.searches.searches, res.Dest)
return true
}
_, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
_, isIn := sinfo.core.router.sessions.getByTheirPerm(&res.Key)
if !isIn {
panic("This should never happen")
}
@ -216,6 +216,6 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
sess.ping(&sinfo.core.router)
sinfo.callback(sess, nil)
// Cleanup
delete(sinfo.core.searches.searches, res.Dest)
delete(sinfo.core.router.searches.searches, res.Dest)
return true
}

View File

@ -261,7 +261,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
// Run cleanup when the session is canceled
<-sinfo.cancel.Finished()
sinfo.core.router.doAdmin(func() {
sinfo.core.sessions.removeSession(&sinfo)
sinfo.core.router.sessions.removeSession(&sinfo)
})
}()
go sinfo.startWorkers()
@ -298,9 +298,9 @@ func (ss *sessions) cleanup() {
// Closes a session, removing it from sessions maps.
func (ss *sessions) removeSession(sinfo *sessionInfo) {
if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo {
delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
if s := sinfo.core.router.sessions.sinfos[sinfo.myHandle]; s == sinfo {
delete(sinfo.core.router.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.router.sessions.byTheirPerm, sinfo.theirPermPub)
}
}
@ -466,9 +466,9 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) {
// Resets all sessions to an uninitialized state.
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) reset(from phony.IActor) {
func (ss *sessions) reset() {
for _, sinfo := range ss.sinfos {
sinfo.EnqueueFrom(from, func() {
sinfo.EnqueueFrom(&ss.core.router, func() {
sinfo.reset = true
})
}