Merge pull request #645 from neilalexander/nodeinfo

Actorise NodeInfo
This commit is contained in:
Neil Alexander 2020-01-05 23:04:26 +00:00 committed by GitHub
commit 8e74881c35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 57 deletions

View File

@ -390,7 +390,6 @@ func (c *Core) SetMaximumSessionMTU(mtu uint16) {
// necessary when, e.g. crawling the network. // necessary when, e.g. crawling the network.
func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) { func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) {
response := make(chan *NodeInfoPayload, 1) response := make(chan *NodeInfoPayload, 1)
sendNodeInfoRequest := func() {
c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) { c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) {
defer func() { recover() }() defer func() { recover() }()
select { select {
@ -399,8 +398,6 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool)
} }
}) })
c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
}
phony.Block(&c.router, sendNodeInfoRequest)
timer := time.AfterFunc(6*time.Second, func() { close(response) }) timer := time.AfterFunc(6*time.Second, func() { close(response) })
defer timer.Stop() defer timer.Stop()
for res := range response { for res := range response {

View File

@ -5,21 +5,19 @@ import (
"errors" "errors"
"runtime" "runtime"
"strings" "strings"
"sync"
"time" "time"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version" "github.com/yggdrasil-network/yggdrasil-go/src/version"
) )
type nodeinfo struct { type nodeinfo struct {
phony.Inbox
core *Core core *Core
myNodeInfo NodeInfoPayload myNodeInfo NodeInfoPayload
myNodeInfoMutex sync.RWMutex
callbacks map[crypto.BoxPubKey]nodeinfoCallback callbacks map[crypto.BoxPubKey]nodeinfoCallback
callbacksMutex sync.Mutex
cache map[crypto.BoxPubKey]nodeinfoCached cache map[crypto.BoxPubKey]nodeinfoCached
cacheMutex sync.RWMutex
} }
type nodeinfoCached struct { type nodeinfoCached struct {
@ -43,35 +41,43 @@ type nodeinfoReqRes struct {
// Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep // Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep
// the cache/callback maps clean of stale entries // the cache/callback maps clean of stale entries
func (m *nodeinfo) init(core *Core) { func (m *nodeinfo) init(core *Core) {
m.Act(m, func() {
m._init(core)
})
}
func (m *nodeinfo) _init(core *Core) {
m.core = core m.core = core
m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback) m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback)
m.cache = make(map[crypto.BoxPubKey]nodeinfoCached) m.cache = make(map[crypto.BoxPubKey]nodeinfoCached)
var f func() m._cleanup()
f = func() { }
m.callbacksMutex.Lock()
func (m *nodeinfo) _cleanup() {
for boxPubKey, callback := range m.callbacks { for boxPubKey, callback := range m.callbacks {
if time.Since(callback.created) > time.Minute { if time.Since(callback.created) > time.Minute {
delete(m.callbacks, boxPubKey) delete(m.callbacks, boxPubKey)
} }
} }
m.callbacksMutex.Unlock()
m.cacheMutex.Lock()
for boxPubKey, cache := range m.cache { for boxPubKey, cache := range m.cache {
if time.Since(cache.created) > time.Hour { if time.Since(cache.created) > time.Hour {
delete(m.cache, boxPubKey) delete(m.cache, boxPubKey)
} }
} }
m.cacheMutex.Unlock() time.AfterFunc(time.Second*30, func() {
time.AfterFunc(time.Second*30, f) m.Act(m, m._cleanup)
} })
go f()
} }
// Add a callback for a nodeinfo lookup // Add a callback for a nodeinfo lookup
func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) { func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) {
m.callbacksMutex.Lock() m.Act(m, func() {
defer m.callbacksMutex.Unlock() m._addCallback(sender, call)
})
}
func (m *nodeinfo) _addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) {
m.callbacks[sender] = nodeinfoCallback{ m.callbacks[sender] = nodeinfoCallback{
created: time.Now(), created: time.Now(),
call: call, call: call,
@ -79,9 +85,7 @@ func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *Node
} }
// Handles the callback, if there is one // Handles the callback, if there is one
func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) { func (m *nodeinfo) _callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) {
m.callbacksMutex.Lock()
defer m.callbacksMutex.Unlock()
if callback, ok := m.callbacks[sender]; ok { if callback, ok := m.callbacks[sender]; ok {
callback.call(&nodeinfo) callback.call(&nodeinfo)
delete(m.callbacks, sender) delete(m.callbacks, sender)
@ -89,16 +93,26 @@ func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) {
} }
// Get the current node's nodeinfo // Get the current node's nodeinfo
func (m *nodeinfo) getNodeInfo() NodeInfoPayload { func (m *nodeinfo) getNodeInfo() (p NodeInfoPayload) {
m.myNodeInfoMutex.RLock() phony.Block(m, func() {
defer m.myNodeInfoMutex.RUnlock() p = m._getNodeInfo()
})
return
}
func (m *nodeinfo) _getNodeInfo() NodeInfoPayload {
return m.myNodeInfo return m.myNodeInfo
} }
// Set the current node's nodeinfo // Set the current node's nodeinfo
func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error { func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) (err error) {
m.myNodeInfoMutex.Lock() phony.Block(m, func() {
defer m.myNodeInfoMutex.Unlock() err = m._setNodeInfo(given, privacy)
})
return
}
func (m *nodeinfo) _setNodeInfo(given interface{}, privacy bool) error {
defaults := map[string]interface{}{ defaults := map[string]interface{}{
"buildname": version.BuildName(), "buildname": version.BuildName(),
"buildversion": version.BuildVersion(), "buildversion": version.BuildVersion(),
@ -134,9 +148,7 @@ func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error {
} }
// Add nodeinfo into the cache for a node // Add nodeinfo into the cache for a node
func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) { func (m *nodeinfo) _addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.cache[key] = nodeinfoCached{ m.cache[key] = nodeinfoCached{
created: time.Now(), created: time.Now(),
payload: payload, payload: payload,
@ -144,9 +156,7 @@ func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPaylo
} }
// Get a nodeinfo entry from the cache // Get a nodeinfo entry from the cache
func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) { func (m *nodeinfo) _getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) {
m.cacheMutex.RLock()
defer m.cacheMutex.RUnlock()
if nodeinfo, ok := m.cache[key]; ok { if nodeinfo, ok := m.cache[key]; ok {
return nodeinfo.payload, nil return nodeinfo.payload, nil
} }
@ -155,21 +165,33 @@ func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, err
// Handles a nodeinfo request/response - called from the router // Handles a nodeinfo request/response - called from the router
func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) { func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) {
m.Act(m, func() {
m._handleNodeInfo(nodeinfo)
})
}
func (m *nodeinfo) _handleNodeInfo(nodeinfo *nodeinfoReqRes) {
if nodeinfo.IsResponse { if nodeinfo.IsResponse {
m.callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo) m._callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
m.addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo) m._addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo)
} else { } else {
m.sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true) m._sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true)
} }
} }
// Send nodeinfo request or response - called from the router // Send nodeinfo request or response - called from the router
func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
m.Act(m, func() {
m._sendNodeInfo(key, coords, isResponse)
})
}
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable) table := m.core.switchTable.table.Load().(lookupTable)
nodeinfo := nodeinfoReqRes{ nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(), SendCoords: table.self.getCoords(),
IsResponse: isResponse, IsResponse: isResponse,
NodeInfo: m.getNodeInfo(), NodeInfo: m._getNodeInfo(),
} }
bs := nodeinfo.encode() bs := nodeinfo.encode()
shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key) shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key)

View File

@ -78,6 +78,7 @@ func (r *router) init(core *Core) {
func (r *router) reconfigure() { func (r *router) reconfigure() {
// Reconfigure the router // Reconfigure the router
current := r.core.config.GetCurrent() current := r.core.config.GetCurrent()
r.core.log.Println("Reloading NodeInfo...")
if err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy); err != nil { if err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy); err != nil {
r.core.log.Errorln("Error reloading NodeInfo:", err) r.core.log.Errorln("Error reloading NodeInfo:", err)
} else { } else {