possibly fix deadlock from race in peer linkloop goroutine, add some related debug code to the admin

This commit is contained in:
Arceliar 2018-02-07 17:48:30 -06:00
parent 63aadf6e88
commit e9adf327b0
4 changed files with 47 additions and 39 deletions

View File

@ -41,11 +41,14 @@ func (a *admin) init(c *Core, listenaddr string) {
*out = a.getResponse_dot() *out = a.getResponse_dot()
}) })
a.addHandler("getSelf", nil, func(out *[]byte, _ ...string) { a.addHandler("getSelf", nil, func(out *[]byte, _ ...string) {
*out = []byte(a.printInfos(a.getData_getPeers())) *out = []byte(a.printInfos([]admin_nodeInfo{*a.getData_getSelf()}))
}) })
a.addHandler("getPeers", nil, func(out *[]byte, _ ...string) { a.addHandler("getPeers", nil, func(out *[]byte, _ ...string) {
*out = []byte(a.printInfos(a.getData_getPeers())) *out = []byte(a.printInfos(a.getData_getPeers()))
}) })
a.addHandler("getSwitchPeers", nil, func(out *[]byte, _ ...string) {
*out = []byte(a.printInfos(a.getData_getSwitchPeers()))
})
a.addHandler("getDHT", nil, func(out *[]byte, _ ...string) { a.addHandler("getDHT", nil, func(out *[]byte, _ ...string) {
*out = []byte(a.printInfos(a.getData_getDHT())) *out = []byte(a.printInfos(a.getData_getDHT()))
}) })
@ -152,6 +155,26 @@ func (a *admin) getData_getSelf() *admin_nodeInfo {
} }
func (a *admin) getData_getPeers() []admin_nodeInfo { func (a *admin) getData_getPeers() []admin_nodeInfo {
ports := a.core.peers.ports.Load().(map[switchPort]*peer)
var peerInfos []admin_nodeInfo
var ps []switchPort
for port := range ports {
ps = append(ps, port)
}
sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
for _, port := range ps {
p := ports[port]
addr := *address_addrForNodeID(getNodeID(&p.box))
info := admin_nodeInfo{
{"IP", net.IP(addr[:]).String()},
{"port", fmt.Sprint(port)},
}
peerInfos = append(peerInfos, info)
}
return peerInfos
}
func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
var peerInfos []admin_nodeInfo var peerInfos []admin_nodeInfo
table := a.core.switchTable.table.Load().(lookupTable) table := a.core.switchTable.table.Load().(lookupTable)
peers := a.core.peers.ports.Load().(map[switchPort]*peer) peers := a.core.peers.ports.Load().(map[switchPort]*peer)
@ -211,7 +234,7 @@ func (a *admin) getData_getSessions() []admin_nodeInfo {
func (a *admin) getResponse_dot() []byte { func (a *admin) getResponse_dot() []byte {
self := a.getData_getSelf().asMap() self := a.getData_getSelf().asMap()
myAddr := self["IP"] myAddr := self["IP"]
peers := a.getData_getPeers() peers := a.getData_getSwitchPeers()
dht := a.getData_getDHT() dht := a.getData_getDHT()
sessions := a.getData_getSessions() sessions := a.getData_getSessions()
// Map of coords onto IP // Map of coords onto IP

View File

@ -41,17 +41,15 @@ type router struct {
func (r *router) init(core *Core) { func (r *router) init(core *Core) {
r.core = core r.core = core
r.addr = *address_addrForNodeID(&r.core.dht.nodeID) r.addr = *address_addrForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this... in := make(chan []byte, 32) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub) //, out, in) p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub) //, out, in)
p.out = func(packet []byte) { p.out = func(packet []byte) {
// This is to make very sure it never blocks // This is to make very sure it never blocks
for { select {
select { case in <- packet:
case in <- packet: return
return default:
default: util_putBytes(packet)
util_putBytes(<-in)
}
} }
} }
r.in = in r.in = in
@ -147,13 +145,10 @@ func (r *router) sendPacket(bs []byte) {
fallthrough fallthrough
//default: go func() { sinfo.send<-bs }() //default: go func() { sinfo.send<-bs }()
default: default:
for { select {
select { case sinfo.send <- bs:
case sinfo.send <- bs: default:
return util_putBytes(bs)
default:
util_putBytes(<-sinfo.send)
}
} }
} }
} }
@ -191,7 +186,6 @@ func (r *router) handleIn(packet []byte) {
case wire_ProtocolTraffic: case wire_ProtocolTraffic:
r.handleProto(packet) r.handleProto(packet)
default: /*panic("Should not happen in testing") ;*/ default: /*panic("Should not happen in testing") ;*/
return
} }
} }
@ -206,13 +200,10 @@ func (r *router) handleTraffic(packet []byte) {
return return
} }
//go func () { sinfo.recv<-&p }() //go func () { sinfo.recv<-&p }()
for { select {
select { case sinfo.recv <- &p:
case sinfo.recv <- &p: default:
return util_putBytes(p.payload)
default:
util_putBytes((<-sinfo.recv).payload)
}
} }
} }

View File

@ -176,13 +176,10 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
}() }()
p.out = func(msg []byte) { p.out = func(msg []byte) {
defer func() { recover() }() defer func() { recover() }()
for { select {
select { case out <- msg:
case out <- msg: default:
return util_putBytes(msg)
default:
util_putBytes(<-out)
}
} }
} }
sock.SetNoDelay(true) sock.SetNoDelay(true)

View File

@ -224,13 +224,10 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
} }
conn.peer.out = func(msg []byte) { conn.peer.out = func(msg []byte) {
defer func() { recover() }() defer func() { recover() }()
for { select {
select { case conn.out <- msg:
case conn.out <- msg: default:
return util_putBytes(msg)
default:
util_putBytes(<-conn.out)
}
} }
} }
go func() { go func() {