add and use a thread-safe way of reading router internal state, and include active sessions in the admin query

This commit is contained in:
Arceliar 2018-01-21 12:55:45 -06:00
parent 8973d3c86d
commit 625b97c511
5 changed files with 55 additions and 20 deletions

View File

@ -66,31 +66,50 @@ func (a *admin) handleRequest(conn net.Conn) {
peerID := address_addrForNodeID(getNodeID(&peerentry.box)) peerID := address_addrForNodeID(getNodeID(&peerentry.box))
addr := net.IP(peerID[:]).String() addr := net.IP(peerID[:]).String()
var index [mDepth]switchPort var index [mDepth]switchPort
copy(index[:mDepth], tableentry.locator.coords[:]) copy(index[:], tableentry.locator.coords)
m[index] = addr m[index] = addr
} }
} }
} }
// Look up everything we know from DHT getPorts := func(coords []byte) []switchPort {
for i := 0; i < a.core.dht.nBuckets(); i++ { var ports []switchPort
b := a.core.dht.getBucket(i)
for _, v := range b.infos {
var destPorts []switchPort
for offset := 0; ; { for offset := 0; ; {
coord, length := wire_decode_uint64(v.coords[offset:]) coord, length := wire_decode_uint64(coords[offset:])
if length == 0 { if length == 0 {
break break
} }
destPorts = append(destPorts, switchPort(coord)) ports = append(ports, switchPort(coord))
offset += length offset += length
} }
return ports
}
// Look up everything we know from DHT
getDHT := func() {
for i := 0; i < a.core.dht.nBuckets(); i++ {
b := a.core.dht.getBucket(i)
for _, v := range b.infos {
destPorts := getPorts(v.coords)
addr := net.IP(address_addrForNodeID(v.nodeID_hidden)[:]).String() addr := net.IP(address_addrForNodeID(v.nodeID_hidden)[:]).String()
var index [mDepth]switchPort var index [mDepth]switchPort
copy(index[:mDepth], destPorts[:]) copy(index[:], destPorts)
m[index] = addr m[index] = addr
} }
} }
}
a.core.router.doAdmin(getDHT)
// Look up everything we know from active sessions
getSessions := func() {
for _, sinfo := range a.core.sessions.sinfos {
destPorts := getPorts(sinfo.coords)
var index [mDepth]switchPort
copy(index[:], destPorts)
m[index] = net.IP(sinfo.theirAddr[:]).String()
}
}
a.core.router.doAdmin(getSessions)
// Now print it all out // Now print it all out
conn.Write([]byte(fmt.Sprintf("graph {\n"))) conn.Write([]byte(fmt.Sprintf("graph {\n")))

View File

@ -37,7 +37,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
} }
type peer struct { type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by tcp // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
// use get/update methods only! (atomic accessors as float64) // use get/update methods only! (atomic accessors as float64)
bandwidth uint64 bandwidth uint64
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element

View File

@ -35,6 +35,7 @@ type router struct {
recv chan<- []byte // place where the tun pulls received packets from recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht) reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
} }
func (r *router) init(core *Core) { func (r *router) init(core *Core) {
@ -57,6 +58,7 @@ func (r *router) init(core *Core) {
r.core.tun.recv = recv r.core.tun.recv = recv
r.core.tun.send = send r.core.tun.send = send
r.reset = make(chan struct{}, 1) r.reset = make(chan struct{}, 1)
r.admin = make(chan func())
go r.mainLoop() go r.mainLoop()
} }
@ -79,6 +81,8 @@ func (r *router) mainLoop() {
r.core.dht.doMaintenance() r.core.dht.doMaintenance()
util_getBytes() // To slowly drain things util_getBytes() // To slowly drain things
} }
case f := <-r.admin:
f()
} }
} }
} }
@ -302,3 +306,15 @@ func (r *router) handleSearchRes(bs []byte) {
} }
r.core.searches.handleSearchRes(&res) r.core.searches.handleSearchRes(&res)
} }
func (r *router) doAdmin(f func()) {
// Pass this a function that needs to be run by the router's main goroutine
// It will pass the function to the router and wait for the router to finish
done := make(chan struct{})
newF := func() {
f()
close(done)
}
r.admin <- newF
<-done
}

View File

@ -96,7 +96,7 @@ func generateConfig() *nodeConfig {
spub, spriv := core.DEBUG_newSigKeys() spub, spriv := core.DEBUG_newSigKeys()
cfg := nodeConfig{} cfg := nodeConfig{}
cfg.Listen = "[::]:0" cfg.Listen = "[::]:0"
cfg.AdminListen = "[::1]:9001" cfg.AdminListen = "localhost:9001"
cfg.BoxPub = hex.EncodeToString(bpub[:]) cfg.BoxPub = hex.EncodeToString(bpub[:])
cfg.BoxPriv = hex.EncodeToString(bpriv[:]) cfg.BoxPriv = hex.EncodeToString(bpriv[:])
cfg.SigPub = hex.EncodeToString(spub[:]) cfg.SigPub = hex.EncodeToString(spub[:])