more work in progress actorizing the remaining parts of the switch

This commit is contained in:
Arceliar 2020-03-29 19:01:50 -05:00
parent 15b850be6e
commit 9834f222db
9 changed files with 111 additions and 101 deletions

View File

@ -330,8 +330,11 @@ func (c *Core) EncryptionPublicKey() string {
// connected to any other nodes (effectively making you the root of a
// single-node network).
func (c *Core) Coords() []uint64 {
loc := c.switchTable.getLocator()
return wire_coordsBytestoUint64s(loc.getCoords())
var coords []byte
phony.Block(&c.router, func() {
coords = c.router.table.self.getCoords()
})
return wire_coordsBytestoUint64s(coords)
}
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128

View File

@ -186,11 +186,9 @@ func dht_ordered(first, second, third *crypto.NodeID) bool {
// Update info about the node that sent the request.
func (t *dht) handleReq(req *dhtReq) {
// Send them what they asked for
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{
Key: t.router.core.boxPub,
Coords: coords,
Coords: t.router.table.self.getCoords(),
Dest: req.Dest,
Infos: t.lookup(&req.Dest, false),
}
@ -300,11 +298,9 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) {
if target == nil {
target = &t.nodeID
}
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{
Key: t.router.core.boxPub,
Coords: coords,
Coords: t.router.table.self.getCoords(),
Dest: *target,
}
t.sendReq(&req, info)
@ -378,7 +374,7 @@ func (t *dht) getImportant() []*dhtInfo {
})
// Keep the ones that are no further than the closest seen so far
minDist := ^uint64(0)
loc := t.router.core.switchTable.getLocator()
loc := t.router.table.self
important := infos[:0]
for _, info := range infos {
dist := uint64(loc.dist(info.coords))
@ -416,7 +412,7 @@ func (t *dht) isImportant(ninfo *dhtInfo) bool {
}
important := t.getImportant()
// Check if ninfo is of equal or greater importance to what we already know
loc := t.router.core.switchTable.getLocator()
loc := t.router.table.self
ndist := uint64(loc.dist(ninfo.coords))
minDist := ^uint64(0)
for _, info := range important {

View File

@ -300,7 +300,7 @@ func (intf *linkInterface) notifyBlockedSend() {
intf.Act(nil, func() {
if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port)
intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
}
})
}
@ -340,7 +340,7 @@ func (intf *linkInterface) notifyStalled() {
intf.stallTimer.Stop()
intf.stallTimer = nil
intf.stalled = true
intf.link.core.switchTable.blockPeer(intf.peer.port)
intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
}
})
}

View File

@ -18,6 +18,7 @@ type nodeinfo struct {
myNodeInfo NodeInfoPayload
callbacks map[crypto.BoxPubKey]nodeinfoCallback
cache map[crypto.BoxPubKey]nodeinfoCached
table *lookupTable
}
type nodeinfoCached struct {
@ -187,7 +188,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
}
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
loc := m.core.switchTable.getLocator()
loc := m.table.self
nodeinfo := nodeinfoReqRes{
SendCoords: loc.getCoords(),
IsResponse: isResponse,

View File

@ -162,7 +162,7 @@ func (ps *peers) _removePeer(p *peer) {
if q := ps.ports[p.port]; p.port == 0 || q != p {
return
} // Can't remove self peer or nonexistant peer
ps.core.switchTable.forgetPeer(p.port)
ps.core.switchTable.forgetPeer(ps, p.port)
oldPorts := ps.ports
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
@ -328,7 +328,7 @@ func (p *peer) _handleLinkTraffic(bs []byte) {
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
func (p *peer) _sendSwitchMsg() {
msg := p.core.switchTable.getMsg()
msg := p.table.getMsg()
if msg == nil {
return
}
@ -367,12 +367,16 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
}
prevKey = hop.Next
}
p.core.switchTable.handleMsg(&msg, p.port)
if !p.core.switchTable.checkRoot(&msg) {
p.core.switchTable.Act(p, func() {
if !p.core.switchTable._checkRoot(&msg) {
// Bad switch message
p.Act(&p.core.switchTable, func() {
p.dinfo = nil
return
}
})
} else {
// handle the message
p.core.switchTable._handleMsg(&msg, p.port, false)
p.Act(&p.core.switchTable, func() {
// Pass a message to the dht informing it that this peer (still) exists
loc.coords = loc.coords[:len(loc.coords)-1]
p.dinfo = &dhtInfo{
@ -380,6 +384,9 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
coords: loc.getCoords(),
}
p._updateDHT()
})
}
})
}
// This generates the bytes that we sign or check the signature of for a switchMsg.

View File

@ -46,6 +46,7 @@ type router struct {
nodeinfo nodeinfo
searches searches
sessions sessions
table *lookupTable // has a copy of our locator
}
// Initializes the router struct, which includes setting up channels to/from the adapter.
@ -77,6 +78,21 @@ func (r *router) init(core *Core) {
r.sessions.init(r)
}
func (r *router) updateTable(from phony.Actor, table *lookupTable) {
r.Act(from, func() {
r.table = table
r.nodeinfo.Act(r, func() {
r.nodeinfo.table = table
})
for _, ses := range r.sessions.sinfos {
sinfo := ses
sinfo.Act(r, func() {
sinfo.table = table
})
}
})
}
// Reconfigures the router and any child modules. This should only ever be run
// by the router actor.
func (r *router) reconfigure() {
@ -130,7 +146,7 @@ func (r *router) reset(from phony.Actor) {
func (r *router) doMaintenance() {
phony.Block(r, func() {
// Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance()
r.core.switchTable.doMaintenance(r)
r.dht.doMaintenance()
r.sessions.cleanup()
})

View File

@ -161,11 +161,10 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
// Initially start a search
func (sinfo *searchInfo) startSearch() {
loc := sinfo.searches.router.core.switchTable.getLocator()
var infos []*dhtInfo
infos = append(infos, &dhtInfo{
key: sinfo.searches.router.core.boxPub,
coords: loc.getCoords(),
coords: sinfo.searches.router.table.self.getCoords(),
})
// Start the search by asking ourself, useful if we're the destination
sinfo.continueSearch(infos)

View File

@ -52,6 +52,7 @@ type sessionInfo struct {
cancel util.Cancellation // Used to terminate workers
conn *Conn // The associated Conn object
callbacks []chan func() // Finished work from crypto workers
table *lookupTable // table.self is a locator where we get our coords
}
// Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -217,6 +218,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.table = ss.router.table
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
return &sinfo
@ -266,8 +268,7 @@ func (ss *sessions) removeSession(sinfo *sessionInfo) {
// Returns a session ping appropriate for the given session info.
func (sinfo *sessionInfo) _getPing() sessionPing {
loc := sinfo.sessions.router.core.switchTable.getLocator()
coords := loc.getCoords()
coords := sinfo.table.self.getCoords()
ping := sessionPing{
SendPermPub: sinfo.sessions.router.core.boxPub,
Handle: sinfo.myHandle,

View File

@ -12,12 +12,9 @@ package yggdrasil
// A little annoying to do with constant changes from backpressure
import (
//"math/rand"
"sync"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
//"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony"
)
@ -149,6 +146,7 @@ type tableElem struct {
type lookupTable struct {
self switchLocator
elems map[switchPort]tableElem
_msg switchMsg
}
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
@ -168,7 +166,6 @@ type switchTable struct {
key crypto.SigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData //
phony.Inbox // Owns the below
@ -208,24 +205,17 @@ func (t *switchTable) reconfigure() {
t.core.peers.reconfigure()
}
// Safely gets a copy of this node's locator.
func (t *switchTable) getLocator() switchLocator {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
}
// Regular maintenance to possibly timeout/reset the root and similar.
func (t *switchTable) doMaintenance() {
func (t *switchTable) doMaintenance(from phony.Actor) {
t.Act(from, func() {
// Periodic maintenance work to keep things internally consistent
t.mutex.Lock() // Write lock
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanDropped()
t._cleanRoot()
t._cleanDropped()
})
}
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
func (t *switchTable) cleanRoot() {
func (t *switchTable) _cleanRoot() {
// TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out
now := time.Now()
@ -259,9 +249,8 @@ func (t *switchTable) cleanRoot() {
}
// Blocks and, if possible, unparents a peer
func (t *switchTable) blockPeer(port switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
t.Act(from, func() {
peer, isIn := t.data.peers[port]
if !isIn {
return
@ -276,17 +265,17 @@ func (t *switchTable) blockPeer(port switchPort) {
if info.port == port {
continue
}
t.unlockedHandleMsg(&info.msg, info.port, true)
t._handleMsg(&info.msg, info.port, true)
}
t.unlockedHandleMsg(&peer.msg, peer.port, true)
t._handleMsg(&peer.msg, peer.port, true)
})
}
// Removes a peer.
// Must be called by the router actor with a lambda that calls this.
// If the removed peer was this node's parent, it immediately tries to find a new parent.
func (t *switchTable) forgetPeer(port switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
func (t *switchTable) forgetPeer(from phony.Actor, port switchPort) {
t.Act(from, func() {
delete(t.data.peers, port)
defer t._updateTable()
if port != t.parent {
@ -294,14 +283,15 @@ func (t *switchTable) forgetPeer(port switchPort) {
}
t.parent = 0
for _, info := range t.data.peers {
t.unlockedHandleMsg(&info.msg, info.port, true)
t._handleMsg(&info.msg, info.port, true)
}
})
}
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup.
func (t *switchTable) cleanDropped() {
func (t *switchTable) _cleanDropped() {
// TODO? only call this after root changes, not periodically
for root := range t.drop {
if !firstIsBetter(&root, &t.data.locator.root) {
@ -327,9 +317,7 @@ type switchMsgHop struct {
}
// This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer.
func (t *switchTable) getMsg() *switchMsg {
t.mutex.RLock()
defer t.mutex.RUnlock()
func (t *switchTable) _getMsg() *switchMsg {
if t.parent == 0 {
return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp}
} else if parent, isIn := t.data.peers[t.parent]; isIn {
@ -341,14 +329,18 @@ func (t *switchTable) getMsg() *switchMsg {
}
}
func (t *lookupTable) getMsg() *switchMsg {
msg := t._msg
msg.Hops = append([]switchMsgHop(nil), t._msg.Hops...)
return &msg
}
// This function checks that the root information in a switchMsg is OK.
// In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout.
func (t *switchTable) checkRoot(msg *switchMsg) bool {
func (t *switchTable) _checkRoot(msg *switchMsg) bool {
// returns false if it's a dropped root, not a better root, or has an older timestamp
// returns true otherwise
// used elsewhere to keep inserting peers into the dht only if root info is OK
t.mutex.RLock()
defer t.mutex.RUnlock()
dropTstamp, isIn := t.drop[msg.Root]
switch {
case isIn && dropTstamp >= msg.TStamp:
@ -364,20 +356,13 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool {
}
}
// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch.
func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.unlockedHandleMsg(msg, fromPort, false)
}
// This updates the switch with information about a peer.
// Then the tricky part, it decides if it should update our own locator as a result.
// That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc...
// There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing.
// It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order.
// Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things.
func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) {
func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) {
// TODO directly use a switchMsg instead of switchMessage + sigs
now := time.Now()
// Set up the sender peerInfo
@ -500,10 +485,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
if peer.port == sender.port {
continue
}
t.unlockedHandleMsg(&peer.msg, peer.port, true)
t._handleMsg(&peer.msg, peer.port, true)
}
// Process the sender last, to avoid keeping them as a parent if at all possible.
t.unlockedHandleMsg(&sender.msg, sender.port, true)
t._handleMsg(&sender.msg, sender.port, true)
case now.Sub(t.time) < switch_throttle:
// We've already gotten an update from this root recently, so ignore this one to avoid flooding.
case sender.locator.tstamp > t.data.locator.tstamp:
@ -521,7 +506,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
}
t.data.locator = sender.locator
t.parent = sender.port
t.core.peers.sendSwitchMsgs(t)
defer t.core.peers.sendSwitchMsgs(t)
}
if true || doUpdate {
defer t._updateTable()
@ -560,7 +545,9 @@ func (t *switchTable) _updateTable() {
time: pinfo.time,
}
}
t.core.peers.updateTables(nil, &newTable) // TODO not be from nil
newTable._msg = *t._getMsg()
t.core.peers.updateTables(t, &newTable)
t.core.router.updateTable(t, &newTable)
}
// Starts the switch worker