Convert nodeinfo to actor

This commit is contained in:
Neil Alexander 2020-01-05 22:15:52 +00:00
parent 4b16c325a3
commit 9304873047
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 77 additions and 57 deletions

View File

@ -390,7 +390,6 @@ func (c *Core) SetMaximumSessionMTU(mtu uint16) {
// necessary when, e.g. crawling the network.
func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) {
response := make(chan *NodeInfoPayload, 1)
sendNodeInfoRequest := func() {
c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) {
defer func() { recover() }()
select {
@ -399,8 +398,6 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool)
}
})
c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
}
phony.Block(&c.router, sendNodeInfoRequest)
timer := time.AfterFunc(6*time.Second, func() { close(response) })
defer timer.Stop()
for res := range response {

View File

@ -5,21 +5,19 @@ import (
"errors"
"runtime"
"strings"
"sync"
"time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
)
type nodeinfo struct {
phony.Inbox
core *Core
myNodeInfo NodeInfoPayload
myNodeInfoMutex sync.RWMutex
callbacks map[crypto.BoxPubKey]nodeinfoCallback
callbacksMutex sync.Mutex
cache map[crypto.BoxPubKey]nodeinfoCached
cacheMutex sync.RWMutex
}
type nodeinfoCached struct {
@ -43,35 +41,43 @@ type nodeinfoReqRes struct {
// Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep
// the cache/callback maps clean of stale entries
func (m *nodeinfo) init(core *Core) {
m.Act(m, func() {
m._init(core)
})
}
func (m *nodeinfo) _init(core *Core) {
m.core = core
m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback)
m.cache = make(map[crypto.BoxPubKey]nodeinfoCached)
var f func()
f = func() {
m.callbacksMutex.Lock()
m._cleanup()
}
func (m *nodeinfo) _cleanup() {
for boxPubKey, callback := range m.callbacks {
if time.Since(callback.created) > time.Minute {
delete(m.callbacks, boxPubKey)
}
}
m.callbacksMutex.Unlock()
m.cacheMutex.Lock()
for boxPubKey, cache := range m.cache {
if time.Since(cache.created) > time.Hour {
delete(m.cache, boxPubKey)
}
}
m.cacheMutex.Unlock()
time.AfterFunc(time.Second*30, f)
}
go f()
time.AfterFunc(time.Second*30, func() {
m.Act(m, m._cleanup)
})
}
// Add a callback for a nodeinfo lookup
func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) {
m.callbacksMutex.Lock()
defer m.callbacksMutex.Unlock()
m.Act(m, func() {
m._addCallback(sender, call)
})
}
func (m *nodeinfo) _addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) {
m.callbacks[sender] = nodeinfoCallback{
created: time.Now(),
call: call,
@ -79,9 +85,7 @@ func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *Node
}
// Handles the callback, if there is one
func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) {
m.callbacksMutex.Lock()
defer m.callbacksMutex.Unlock()
func (m *nodeinfo) _callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) {
if callback, ok := m.callbacks[sender]; ok {
callback.call(&nodeinfo)
delete(m.callbacks, sender)
@ -89,16 +93,26 @@ func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) {
}
// Get the current node's nodeinfo
func (m *nodeinfo) getNodeInfo() NodeInfoPayload {
m.myNodeInfoMutex.RLock()
defer m.myNodeInfoMutex.RUnlock()
func (m *nodeinfo) getNodeInfo() (p NodeInfoPayload) {
phony.Block(m, func() {
p = m._getNodeInfo()
})
return
}
func (m *nodeinfo) _getNodeInfo() NodeInfoPayload {
return m.myNodeInfo
}
// Set the current node's nodeinfo
func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error {
m.myNodeInfoMutex.Lock()
defer m.myNodeInfoMutex.Unlock()
func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) (err error) {
phony.Block(m, func() {
err = m._setNodeInfo(given, privacy)
})
return
}
func (m *nodeinfo) _setNodeInfo(given interface{}, privacy bool) error {
defaults := map[string]interface{}{
"buildname": version.BuildName(),
"buildversion": version.BuildVersion(),
@ -134,9 +148,7 @@ func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error {
}
// Add nodeinfo into the cache for a node
func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
func (m *nodeinfo) _addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) {
m.cache[key] = nodeinfoCached{
created: time.Now(),
payload: payload,
@ -144,9 +156,7 @@ func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPaylo
}
// Get a nodeinfo entry from the cache
func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) {
m.cacheMutex.RLock()
defer m.cacheMutex.RUnlock()
func (m *nodeinfo) _getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) {
if nodeinfo, ok := m.cache[key]; ok {
return nodeinfo.payload, nil
}
@ -155,21 +165,33 @@ func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, err
// Handles a nodeinfo request/response - called from the router
func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) {
m.Act(m, func() {
m._handleNodeInfo(nodeinfo)
})
}
func (m *nodeinfo) _handleNodeInfo(nodeinfo *nodeinfoReqRes) {
if nodeinfo.IsResponse {
m.callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
m.addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
m._callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
m._addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
} else {
m.sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true)
m._sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true)
}
}
// Send nodeinfo request or response - called from the router
func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
m.Act(m, func() {
m._sendNodeInfo(key, coords, isResponse)
})
}
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable)
nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(),
IsResponse: isResponse,
NodeInfo: m.getNodeInfo(),
NodeInfo: m._getNodeInfo(),
}
bs := nodeinfo.encode()
shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key)

View File

@ -78,6 +78,7 @@ func (r *router) init(core *Core) {
func (r *router) reconfigure() {
// Reconfigure the router
current := r.core.config.GetCurrent()
r.core.log.Println("Reloading NodeInfo...")
if err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy); err != nil {
r.core.log.Errorln("Error reloading NodeInfo:", err)
} else {