Start factoring out the admin socket into a separate module (not all functions implemented yet)

This commit is contained in:
Neil Alexander 2019-05-19 17:27:48 +01:00
parent 7ca5a2533d
commit 8ef1978cb1
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
4 changed files with 123 additions and 286 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/kardianos/minwinsvc" "github.com/kardianos/minwinsvc"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/yggdrasil-network/yggdrasil-go/src/admin"
"github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/tuntap" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap"
@ -31,6 +32,7 @@ type node struct {
core Core core Core
tuntap tuntap.TunAdapter tuntap tuntap.TunAdapter
multicast multicast.Multicast multicast multicast.Multicast
admin admin.AdminSocket
} }
func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig { func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig {
@ -184,6 +186,11 @@ func main() {
logger.Errorln("An error occurred during startup") logger.Errorln("An error occurred during startup")
panic(err) panic(err)
} }
// Start the admin socket
n.admin.Init(&n.core, state, logger, nil)
if err := n.admin.Start(); err != nil {
logger.Errorln("An error occurred starting admin socket:", err)
}
// Start the multicast interface // Start the multicast interface
n.multicast.Init(&n.core, state, logger, nil) n.multicast.Init(&n.core, state, logger, nil)
if err := n.multicast.Start(); err != nil { if err := n.multicast.Start(); err != nil {

View File

@ -1,27 +1,27 @@
package yggdrasil package admin
import ( import (
"encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
"os" "os"
"sort"
"strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
) )
// TODO: Add authentication // TODO: Add authentication
type admin struct { type AdminSocket struct {
core *Core core *yggdrasil.Core
log *log.Logger
reconfigure chan chan error reconfigure chan chan error
listenaddr string listenaddr string
listener net.Listener listener net.Listener
@ -46,27 +46,28 @@ type admin_pair struct {
type admin_nodeInfo []admin_pair type admin_nodeInfo []admin_pair
// addHandler is called for each admin function to add the handler and help documentation to the API. // addHandler is called for each admin function to add the handler and help documentation to the API.
func (a *admin) addHandler(name string, args []string, handler func(admin_info) (admin_info, error)) { func (a *AdminSocket) addHandler(name string, args []string, handler func(admin_info) (admin_info, error)) {
a.handlers = append(a.handlers, admin_handlerInfo{name, args, handler}) a.handlers = append(a.handlers, admin_handlerInfo{name, args, handler})
} }
// init runs the initial admin setup. // init runs the initial admin setup.
func (a *admin) init(c *Core) { func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.Logger, options interface{}) {
a.core = c a.core = c
a.log = log
a.reconfigure = make(chan chan error, 1) a.reconfigure = make(chan chan error, 1)
go func() { go func() {
for { for {
e := <-a.reconfigure e := <-a.reconfigure
current, previous := a.core.config.Get() current, previous := state.Get()
if current.AdminListen != previous.AdminListen { if current.AdminListen != previous.AdminListen {
a.listenaddr = current.AdminListen a.listenaddr = current.AdminListen
a.close() a.Stop()
a.start() a.Start()
} }
e <- nil e <- nil
} }
}() }()
current, _ := a.core.config.Get() current, _ := state.Get()
a.listenaddr = current.AdminListen a.listenaddr = current.AdminListen
a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) {
handlers := make(map[string]interface{}) handlers := make(map[string]interface{})
@ -75,64 +76,93 @@ func (a *admin) init(c *Core) {
} }
return admin_info{"list": handlers}, nil return admin_info{"list": handlers}, nil
}) })
a.addHandler("dot", []string{}, func(in admin_info) (admin_info, error) { /*a.addHandler("dot", []string{}, func(in admin_info) (admin_info, error) {
return admin_info{"dot": string(a.getResponse_dot())}, nil return admin_info{"dot": string(a.getResponse_dot())}, nil
}) })*/
a.addHandler("getSelf", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getSelf", []string{}, func(in admin_info) (admin_info, error) {
self := a.getData_getSelf().asMap() ip := c.Address().String()
ip := fmt.Sprint(self["ip"]) return admin_info{
delete(self, "ip") "self": admin_info{
return admin_info{"self": admin_info{ip: self}}, nil ip: admin_info{
"box_pub_key": c.BoxPubKey(),
"build_name": yggdrasil.BuildName(),
"build_version": yggdrasil.BuildVersion(),
"coords": fmt.Sprintf("%v", c.Coords()),
"subnet": c.Subnet().String(),
},
},
}, nil
}) })
a.addHandler("getPeers", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getPeers", []string{}, func(in admin_info) (admin_info, error) {
sort := "ip"
peers := make(admin_info) peers := make(admin_info)
for _, peerdata := range a.getData_getPeers() { for _, p := range a.core.GetPeers() {
p := peerdata.asMap() addr := *address.AddrForNodeID(crypto.GetNodeID(&p.PublicKey))
so := fmt.Sprint(p[sort]) so := net.IP(addr[:]).String()
peers[so] = p peers[so] = admin_info{
delete(peers[so].(map[string]interface{}), sort) "ip": so,
"port": p.Port,
"uptime": p.Uptime.Seconds(),
"bytes_sent": p.BytesSent,
"bytes_recvd": p.BytesRecvd,
"proto": p.Protocol,
"endpoint": p.Endpoint,
"box_pub_key": p.PublicKey,
}
} }
return admin_info{"peers": peers}, nil return admin_info{"peers": peers}, nil
}) })
a.addHandler("getSwitchPeers", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getSwitchPeers", []string{}, func(in admin_info) (admin_info, error) {
sort := "port"
switchpeers := make(admin_info) switchpeers := make(admin_info)
for _, s := range a.getData_getSwitchPeers() { for _, s := range a.core.GetSwitchPeers() {
p := s.asMap() addr := *address.AddrForNodeID(crypto.GetNodeID(&s.PublicKey))
so := fmt.Sprint(p[sort]) so := fmt.Sprint(s.Port)
switchpeers[so] = p switchpeers[so] = admin_info{
delete(switchpeers[so].(map[string]interface{}), sort) "ip": net.IP(addr[:]).String(),
"coords": fmt.Sprintf("%v", s.Coords),
"port": s.Port,
"bytes_sent": s.BytesSent,
"bytes_recvd": s.BytesRecvd,
"proto": s.Protocol,
"endpoint": s.Endpoint,
"box_pub_key": s.PublicKey,
}
} }
return admin_info{"switchpeers": switchpeers}, nil return admin_info{"switchpeers": switchpeers}, nil
}) })
a.addHandler("getSwitchQueues", []string{}, func(in admin_info) (admin_info, error) { /*a.addHandler("getSwitchQueues", []string{}, func(in admin_info) (admin_info, error) {
queues := a.getData_getSwitchQueues() queues := a.core.GetSwitchQueues()
return admin_info{"switchqueues": queues.asMap()}, nil return admin_info{"switchqueues": queues.asMap()}, nil
}) })*/
a.addHandler("getDHT", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getDHT", []string{}, func(in admin_info) (admin_info, error) {
sort := "ip"
dht := make(admin_info) dht := make(admin_info)
for _, d := range a.getData_getDHT() { for _, d := range a.core.GetDHT() {
p := d.asMap() addr := *address.AddrForNodeID(crypto.GetNodeID(&d.PublicKey))
so := fmt.Sprint(p[sort]) so := net.IP(addr[:]).String()
dht[so] = p dht[so] = admin_info{
delete(dht[so].(map[string]interface{}), sort) "coords": fmt.Sprintf("%v", d.Coords),
"last_seen": d.LastSeen.Seconds(),
"box_pub_key": d.PublicKey,
}
} }
return admin_info{"dht": dht}, nil return admin_info{"dht": dht}, nil
}) })
a.addHandler("getSessions", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getSessions", []string{}, func(in admin_info) (admin_info, error) {
sort := "ip"
sessions := make(admin_info) sessions := make(admin_info)
for _, s := range a.getData_getSessions() { for _, s := range a.core.GetSessions() {
p := s.asMap() addr := *address.AddrForNodeID(crypto.GetNodeID(&s.PublicKey))
so := fmt.Sprint(p[sort]) so := net.IP(addr[:]).String()
sessions[so] = p sessions[so] = admin_info{
delete(sessions[so].(map[string]interface{}), sort) "coords": fmt.Sprintf("%v", s.Coords),
"bytes_sent": s.BytesSent,
"bytes_recvd": s.BytesRecvd,
"mtu": s.MTU,
"was_mtu_fixed": s.WasMTUFixed,
"box_pub_key": s.PublicKey,
}
} }
return admin_info{"sessions": sessions}, nil return admin_info{"sessions": sessions}, nil
}) })
a.addHandler("addPeer", []string{"uri", "[interface]"}, func(in admin_info) (admin_info, error) { /*a.addHandler("addPeer", []string{"uri", "[interface]"}, func(in admin_info) (admin_info, error) {
// Set sane defaults // Set sane defaults
intf := "" intf := ""
// Has interface been specified? // Has interface been specified?
@ -168,7 +198,7 @@ func (a *admin) init(c *Core) {
}, errors.New("Failed to remove peer") }, errors.New("Failed to remove peer")
} }
}) })
/* a.addHandler("getTunTap", []string{}, func(in admin_info) (r admin_info, e error) { a.addHandler("getTunTap", []string{}, func(in admin_info) (r admin_info, e error) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
r = admin_info{"none": admin_info{}} r = admin_info{"none": admin_info{}}
@ -215,7 +245,7 @@ func (a *admin) init(c *Core) {
intfs = append(intfs, v.Name) intfs = append(intfs, v.Name)
} }
return admin_info{"multicast_interfaces": intfs}, nil return admin_info{"multicast_interfaces": intfs}, nil
})*/ })
a.addHandler("getAllowedEncryptionPublicKeys", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getAllowedEncryptionPublicKeys", []string{}, func(in admin_info) (admin_info, error) {
return admin_info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil return admin_info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil
}) })
@ -249,7 +279,7 @@ func (a *admin) init(c *Core) {
}, errors.New("Failed to remove allowed key") }, errors.New("Failed to remove allowed key")
} }
}) })
/*a.addHandler("getTunnelRouting", []string{}, func(in admin_info) (admin_info, error) { a.addHandler("getTunnelRouting", []string{}, func(in admin_info) (admin_info, error) {
enabled := false enabled := false
a.core.router.doAdmin(func() { a.core.router.doAdmin(func() {
enabled = a.core.router.cryptokey.isEnabled() enabled = a.core.router.cryptokey.isEnabled()
@ -335,7 +365,7 @@ func (a *admin) init(c *Core) {
} else { } else {
return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, errors.New("Failed to remove route") return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, errors.New("Failed to remove route")
} }
})*/ })
a.addHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in admin_info) (admin_info, error) { a.addHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in admin_info) (admin_info, error) {
if in["target"] == nil { if in["target"] == nil {
in["target"] = "none" in["target"] = "none"
@ -390,11 +420,11 @@ func (a *admin) init(c *Core) {
} else { } else {
return admin_info{}, err return admin_info{}, err
} }
}) })*/
} }
// start runs the admin API socket to listen for / respond to admin API calls. // start runs the admin API socket to listen for / respond to admin API calls.
func (a *admin) start() error { func (a *AdminSocket) Start() error {
if a.listenaddr != "none" && a.listenaddr != "" { if a.listenaddr != "none" && a.listenaddr != "" {
go a.listen() go a.listen()
} }
@ -402,7 +432,7 @@ func (a *admin) start() error {
} }
// cleans up when stopping // cleans up when stopping
func (a *admin) close() error { func (a *AdminSocket) Stop() error {
if a.listener != nil { if a.listener != nil {
return a.listener.Close() return a.listener.Close()
} else { } else {
@ -411,21 +441,21 @@ func (a *admin) close() error {
} }
// listen is run by start and manages API connections. // listen is run by start and manages API connections.
func (a *admin) listen() { func (a *AdminSocket) listen() {
u, err := url.Parse(a.listenaddr) u, err := url.Parse(a.listenaddr)
if err == nil { if err == nil {
switch strings.ToLower(u.Scheme) { switch strings.ToLower(u.Scheme) {
case "unix": case "unix":
if _, err := os.Stat(a.listenaddr[7:]); err == nil { if _, err := os.Stat(a.listenaddr[7:]); err == nil {
a.core.log.Debugln("Admin socket", a.listenaddr[7:], "already exists, trying to clean up") a.log.Debugln("Admin socket", a.listenaddr[7:], "already exists, trying to clean up")
if _, err := net.DialTimeout("unix", a.listenaddr[7:], time.Second*2); err == nil || err.(net.Error).Timeout() { if _, err := net.DialTimeout("unix", a.listenaddr[7:], time.Second*2); err == nil || err.(net.Error).Timeout() {
a.core.log.Errorln("Admin socket", a.listenaddr[7:], "already exists and is in use by another process") a.log.Errorln("Admin socket", a.listenaddr[7:], "already exists and is in use by another process")
os.Exit(1) os.Exit(1)
} else { } else {
if err := os.Remove(a.listenaddr[7:]); err == nil { if err := os.Remove(a.listenaddr[7:]); err == nil {
a.core.log.Debugln(a.listenaddr[7:], "was cleaned up") a.log.Debugln(a.listenaddr[7:], "was cleaned up")
} else { } else {
a.core.log.Errorln(a.listenaddr[7:], "already exists and was not cleaned up:", err) a.log.Errorln(a.listenaddr[7:], "already exists and was not cleaned up:", err)
os.Exit(1) os.Exit(1)
} }
} }
@ -436,7 +466,7 @@ func (a *admin) listen() {
case "@": // maybe abstract namespace case "@": // maybe abstract namespace
default: default:
if err := os.Chmod(a.listenaddr[7:], 0660); err != nil { if err := os.Chmod(a.listenaddr[7:], 0660); err != nil {
a.core.log.Warnln("WARNING:", a.listenaddr[:7], "may have unsafe permissions!") a.log.Warnln("WARNING:", a.listenaddr[:7], "may have unsafe permissions!")
} }
} }
} }
@ -450,10 +480,10 @@ func (a *admin) listen() {
a.listener, err = net.Listen("tcp", a.listenaddr) a.listener, err = net.Listen("tcp", a.listenaddr)
} }
if err != nil { if err != nil {
a.core.log.Errorf("Admin socket failed to listen: %v", err) a.log.Errorf("Admin socket failed to listen: %v", err)
os.Exit(1) os.Exit(1)
} }
a.core.log.Infof("%s admin socket listening on %s", a.log.Infof("%s admin socket listening on %s",
strings.ToUpper(a.listener.Addr().Network()), strings.ToUpper(a.listener.Addr().Network()),
a.listener.Addr().String()) a.listener.Addr().String())
defer a.listener.Close() defer a.listener.Close()
@ -466,7 +496,7 @@ func (a *admin) listen() {
} }
// handleRequest calls the request handler for each request sent to the admin API. // handleRequest calls the request handler for each request sent to the admin API.
func (a *admin) handleRequest(conn net.Conn) { func (a *AdminSocket) handleRequest(conn net.Conn) {
decoder := json.NewDecoder(conn) decoder := json.NewDecoder(conn)
encoder := json.NewEncoder(conn) encoder := json.NewEncoder(conn)
encoder.SetIndent("", " ") encoder.SetIndent("", " ")
@ -480,9 +510,9 @@ func (a *admin) handleRequest(conn net.Conn) {
"status": "error", "status": "error",
"error": "Unrecoverable error, possibly as a result of invalid input types or malformed syntax", "error": "Unrecoverable error, possibly as a result of invalid input types or malformed syntax",
} }
a.core.log.Errorln("Admin socket error:", r) a.log.Errorln("Admin socket error:", r)
if err := encoder.Encode(&send); err != nil { if err := encoder.Encode(&send); err != nil {
a.core.log.Errorln("Admin socket JSON encode error:", err) a.log.Errorln("Admin socket JSON encode error:", err)
} }
conn.Close() conn.Close()
} }
@ -577,7 +607,7 @@ func (n *admin_nodeInfo) toString() string {
} }
// printInfos returns a newline separated list of strings from admin_nodeInfos, e.g. a printable string of info about all peers. // printInfos returns a newline separated list of strings from admin_nodeInfos, e.g. a printable string of info about all peers.
func (a *admin) printInfos(infos []admin_nodeInfo) string { func (a *AdminSocket) printInfos(infos []admin_nodeInfo) string {
var out []string var out []string
for _, info := range infos { for _, info := range infos {
out = append(out, info.toString()) out = append(out, info.toString())
@ -586,8 +616,9 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
return strings.Join(out, "\n") return strings.Join(out, "\n")
} }
/*
// addPeer triggers a connection attempt to a node. // addPeer triggers a connection attempt to a node.
func (a *admin) addPeer(addr string, sintf string) error { func (a *AdminSocket) addPeer(addr string, sintf string) error {
err := a.core.link.call(addr, sintf) err := a.core.link.call(addr, sintf)
if err != nil { if err != nil {
return err return err
@ -596,220 +627,18 @@ func (a *admin) addPeer(addr string, sintf string) error {
} }
// removePeer disconnects an existing node (given by the node's port number). // removePeer disconnects an existing node (given by the node's port number).
func (a *admin) removePeer(p string) error { func (a *AdminSocket) removePeer(p string) error {
iport, err := strconv.Atoi(p) iport, err := strconv.Atoi(p)
if err != nil { if err != nil {
return err return err
} }
a.core.peers.removePeer(switchPort(iport)) a.core.RemovePeer(iport)
return nil
}
// startTunWithMTU creates the tun/tap device, sets its address, and sets the MTU to the provided value.
/*
func (a *admin) startTunWithMTU(ifname string, iftapmode bool, ifmtu int) error {
// Close the TUN first if open
_ = a.core.router.tun.close()
// Then reconfigure and start it
addr := a.core.router.addr
straddr := fmt.Sprintf("%s/%v", net.IP(addr[:]).String(), 8*len(address.GetPrefix())-1)
if ifname != "none" {
err := a.core.router.tun.setup(ifname, iftapmode, straddr, ifmtu)
if err != nil {
return err
}
// If we have open sessions then we need to notify them
// that our MTU has now changed
for _, sinfo := range a.core.sessions.sinfos {
if ifname == "none" {
sinfo.myMTU = 0
} else {
sinfo.myMTU = uint16(ifmtu)
}
a.core.sessions.sendPingPong(sinfo, false)
}
// Aaaaand... go!
go a.core.router.tun.read()
}
go a.core.router.tun.write()
return nil return nil
} }
*/ */
/*
// getData_getSelf returns the self node's info for admin responses.
func (a *admin) getData_getSelf() *admin_nodeInfo {
table := a.core.switchTable.table.Load().(lookupTable)
coords := table.self.getCoords()
nodeid := *crypto.GetNodeID(&a.core.boxPub)
self := admin_nodeInfo{
{"node_id", hex.EncodeToString(nodeid[:])},
{"box_pub_key", hex.EncodeToString(a.core.boxPub[:])},
{"ip", a.core.Address().String()},
{"subnet", a.core.Subnet().String()},
{"coords", fmt.Sprint(coords)},
}
if name := BuildName(); name != "unknown" {
self = append(self, admin_pair{"build_name", name})
}
if version := BuildVersion(); version != "unknown" {
self = append(self, admin_pair{"build_version", version})
}
return &self
}
// getData_getPeers returns info from Core.peers for an admin response.
func (a *admin) getData_getPeers() []admin_nodeInfo {
ports := a.core.peers.ports.Load().(map[switchPort]*peer)
var peerInfos []admin_nodeInfo
var ps []switchPort
for port := range ports {
ps = append(ps, port)
}
sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
for _, port := range ps {
p := ports[port]
addr := *address.AddrForNodeID(crypto.GetNodeID(&p.box))
info := admin_nodeInfo{
{"ip", net.IP(addr[:]).String()},
{"port", port},
{"uptime", int(time.Since(p.firstSeen).Seconds())},
{"bytes_sent", atomic.LoadUint64(&p.bytesSent)},
{"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)},
{"proto", p.intf.info.linkType},
{"endpoint", p.intf.name},
{"box_pub_key", hex.EncodeToString(p.box[:])},
}
peerInfos = append(peerInfos, info)
}
return peerInfos
}
// getData_getSwitchPeers returns info from Core.switchTable for an admin response.
func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
var peerInfos []admin_nodeInfo
table := a.core.switchTable.table.Load().(lookupTable)
peers := a.core.peers.ports.Load().(map[switchPort]*peer)
for _, elem := range table.elems {
peer, isIn := peers[elem.port]
if !isIn {
continue
}
addr := *address.AddrForNodeID(crypto.GetNodeID(&peer.box))
coords := elem.locator.getCoords()
info := admin_nodeInfo{
{"ip", net.IP(addr[:]).String()},
{"coords", fmt.Sprint(coords)},
{"port", elem.port},
{"bytes_sent", atomic.LoadUint64(&peer.bytesSent)},
{"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)},
{"proto", peer.intf.info.linkType},
{"endpoint", peer.intf.info.remote},
{"box_pub_key", hex.EncodeToString(peer.box[:])},
}
peerInfos = append(peerInfos, info)
}
return peerInfos
}
// getData_getSwitchQueues returns info from Core.switchTable for an queue data.
func (a *admin) getData_getSwitchQueues() admin_nodeInfo {
var peerInfos admin_nodeInfo
switchTable := &a.core.switchTable
getSwitchQueues := func() {
queues := make([]map[string]interface{}, 0)
for k, v := range switchTable.queues.bufs {
nexthop := switchTable.bestPortForCoords([]byte(k))
queue := map[string]interface{}{
"queue_id": k,
"queue_size": v.size,
"queue_packets": len(v.packets),
"queue_port": nexthop,
}
queues = append(queues, queue)
}
peerInfos = admin_nodeInfo{
{"queues", queues},
{"queues_count", len(switchTable.queues.bufs)},
{"queues_size", switchTable.queues.size},
{"highest_queues_count", switchTable.queues.maxbufs},
{"highest_queues_size", switchTable.queues.maxsize},
{"maximum_queues_size", switchTable.queueTotalMaxSize},
}
}
a.core.switchTable.doAdmin(getSwitchQueues)
return peerInfos
}
// getData_getDHT returns info from Core.dht for an admin response.
func (a *admin) getData_getDHT() []admin_nodeInfo {
var infos []admin_nodeInfo
getDHT := func() {
now := time.Now()
var dhtInfos []*dhtInfo
for _, v := range a.core.dht.table {
dhtInfos = append(dhtInfos, v)
}
sort.SliceStable(dhtInfos, func(i, j int) bool {
return dht_ordered(&a.core.dht.nodeID, dhtInfos[i].getNodeID(), dhtInfos[j].getNodeID())
})
for _, v := range dhtInfos {
addr := *address.AddrForNodeID(v.getNodeID())
info := admin_nodeInfo{
{"ip", net.IP(addr[:]).String()},
{"coords", fmt.Sprint(v.coords)},
{"last_seen", int(now.Sub(v.recv).Seconds())},
{"box_pub_key", hex.EncodeToString(v.key[:])},
}
infos = append(infos, info)
}
}
a.core.router.doAdmin(getDHT)
return infos
}
// getData_getSessions returns info from Core.sessions for an admin response.
func (a *admin) getData_getSessions() []admin_nodeInfo {
var infos []admin_nodeInfo
getSessions := func() {
for _, sinfo := range a.core.sessions.sinfos {
// TODO? skipped known but timed out sessions?
info := admin_nodeInfo{
{"ip", net.IP(sinfo.theirAddr[:]).String()},
{"coords", fmt.Sprint(sinfo.coords)},
{"mtu", sinfo.getMTU()},
{"was_mtu_fixed", sinfo.wasMTUFixed},
{"bytes_sent", sinfo.bytesSent},
{"bytes_recvd", sinfo.bytesRecvd},
{"box_pub_key", hex.EncodeToString(sinfo.theirPermPub[:])},
}
infos = append(infos, info)
}
}
a.core.router.doAdmin(getSessions)
return infos
}
// getAllowedEncryptionPublicKeys returns the public keys permitted for incoming peer connections.
func (a *admin) getAllowedEncryptionPublicKeys() []string {
return a.core.peers.getAllowedEncryptionPublicKeys()
}
// addAllowedEncryptionPublicKey whitelists a key for incoming peer connections.
func (a *admin) addAllowedEncryptionPublicKey(bstr string) (err error) {
a.core.peers.addAllowedEncryptionPublicKey(bstr)
return nil
}
// removeAllowedEncryptionPublicKey removes a key from the whitelist for incoming peer connections.
// If none are set, an empty list permits all incoming connections.
func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) {
a.core.peers.removeAllowedEncryptionPublicKey(bstr)
return nil
}
// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID. // Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID.
func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) { func (a *AdminSocket) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) {
var key crypto.BoxPubKey var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil { if keyBytes, err := hex.DecodeString(keyString); err != nil {
return dhtRes{}, err return dhtRes{}, err
@ -866,7 +695,7 @@ func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtR
return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
} }
func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) (nodeinfoPayload, error) { func (a *AdminSocket) admin_getNodeInfo(keyString, coordString string, nocache bool) (nodeinfoPayload, error) {
var key crypto.BoxPubKey var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil { if keyBytes, err := hex.DecodeString(keyString); err != nil {
return nodeinfoPayload{}, err return nodeinfoPayload{}, err
@ -915,7 +744,7 @@ func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) (
// getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network.
// This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions.
// The graph is structured as a tree with directed links leading away from the root. // The graph is structured as a tree with directed links leading away from the root.
func (a *admin) getResponse_dot() []byte { func (a *AdminSocket) getResponse_dot() []byte {
self := a.getData_getSelf() self := a.getData_getSelf()
peers := a.getData_getSwitchPeers() peers := a.getData_getSwitchPeers()
dht := a.getData_getDHT() dht := a.getData_getDHT()
@ -1037,3 +866,4 @@ func (a *admin) getResponse_dot() []byte {
put("}\n") put("}\n")
return out return out
} }
*/

View File

@ -36,6 +36,7 @@ type SwitchPeer struct {
BytesRecvd uint64 BytesRecvd uint64
Port uint64 Port uint64
Protocol string Protocol string
Endpoint string
} }
// DHTEntry represents a single DHT entry that has been learned or cached from // DHTEntry represents a single DHT entry that has been learned or cached from
@ -127,6 +128,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd),
Port: uint64(elem.port), Port: uint64(elem.port),
Protocol: peer.intf.info.linkType, Protocol: peer.intf.info.linkType,
Endpoint: peer.intf.info.remote,
} }
copy(info.PublicKey[:], peer.box[:]) copy(info.PublicKey[:], peer.box[:])
switchpeers = append(switchpeers, info) switchpeers = append(switchpeers, info)
@ -289,6 +291,12 @@ func (c *Core) BoxPubKey() string {
return hex.EncodeToString(c.boxPub[:]) return hex.EncodeToString(c.boxPub[:])
} }
// Coords returns the current coordinates of the node.
func (c *Core) Coords() []byte {
table := c.switchTable.table.Load().(lookupTable)
return table.self.getCoords()
}
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128 // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
// address. // address.
func (c *Core) Address() *net.IP { func (c *Core) Address() *net.IP {
@ -359,5 +367,6 @@ func (c *Core) CallPeer(addr string, sintf string) error {
// AddAllowedEncryptionPublicKey adds an allowed public key. This allow peerings // AddAllowedEncryptionPublicKey adds an allowed public key. This allow peerings
// to be restricted only to keys that you have selected. // to be restricted only to keys that you have selected.
func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error {
return c.admin.addAllowedEncryptionPublicKey(boxStr) //return c.admin.addAllowedEncryptionPublicKey(boxStr)
return nil
} }

View File

@ -30,7 +30,6 @@ type Core struct {
sessions sessions sessions sessions
router router router router
dht dht dht dht
admin admin
searches searches searches searches
link link link link
log *log.Logger log *log.Logger
@ -69,7 +68,6 @@ func (c *Core) init() error {
copy(c.sigPub[:], sigPubHex) copy(c.sigPub[:], sigPubHex)
copy(c.sigPriv[:], sigPrivHex) copy(c.sigPriv[:], sigPrivHex)
c.admin.init(c)
c.searches.init(c) c.searches.init(c)
c.dht.init(c) c.dht.init(c)
c.sessions.init(c) c.sessions.init(c)
@ -118,7 +116,6 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
errors := 0 errors := 0
components := []chan chan error{ components := []chan chan error{
c.admin.reconfigure,
c.searches.reconfigure, c.searches.reconfigure,
c.dht.reconfigure, c.dht.reconfigure,
c.sessions.reconfigure, c.sessions.reconfigure,
@ -189,11 +186,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState,
return nil, err return nil, err
} }
if err := c.admin.start(); err != nil {
c.log.Errorln("Failed to start admin socket")
return nil, err
}
go c.addPeerLoop() go c.addPeerLoop()
c.log.Infoln("Startup complete") c.log.Infoln("Startup complete")
@ -203,5 +195,4 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState,
// Stop shuts down the Yggdrasil node. // Stop shuts down the Yggdrasil node.
func (c *Core) Stop() { func (c *Core) Stop() {
c.log.Infoln("Stopping...") c.log.Infoln("Stopping...")
c.admin.close()
} }