Try to tidy up a bit, move checks for if we are already calling/connected

Something I noticed when working on reconfigure support for the "Listen"
option is that we have some rather huge weaknesses in our multicasting
design. Right now if we change our Listen address, it's not really
possible for remote nodes to know whether they are still connected to
us, so they start connecting in response to our changed beacons. They
can't know that they already know about us until *after* the handshake
but this registers in the local client log as repeated Connect/Disconnects
even though the existing peerings never actually drop.
This commit is contained in:
Neil Alexander 2018-12-30 21:11:16 +00:00
parent 80c9a1bc12
commit cd86c33850
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944

View File

@ -53,8 +53,8 @@ type tcpInterface struct {
type tcpInfo struct { type tcpInfo struct {
box crypto.BoxPubKey box crypto.BoxPubKey
sig crypto.SigPubKey sig crypto.SigPubKey
localAddr string
remoteAddr string remoteAddr string
remotePort string
} }
// Wrapper function to set additional options for specific connection types. // Wrapper function to set additional options for specific connection types.
@ -150,6 +150,22 @@ func (iface *tcpInterface) listener() {
} }
} }
// Checks if we already have a connection to this node
func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool {
iface.mutex.Lock()
defer iface.mutex.Unlock()
_, isIn := iface.conns[info]
return isIn
}
// Checks if we already are calling this address
func (iface *tcpInterface) isAlreadyCalling(saddr string) bool {
iface.mutex.Lock()
defer iface.mutex.Unlock()
_, isIn := iface.calls[saddr]
return isIn
}
// Checks if a connection already exists. // Checks if a connection already exists.
// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. // If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address.
// If the dial is successful, it launches the handler. // If the dial is successful, it launches the handler.
@ -161,25 +177,18 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
if sintf != "" { if sintf != "" {
callname = fmt.Sprintf("%s/%s", saddr, sintf) callname = fmt.Sprintf("%s/%s", saddr, sintf)
} }
quit := false if iface.isAlreadyCalling(saddr) {
iface.mutex.Lock()
if _, isIn := iface.calls[callname]; isIn {
quit = true
} else {
iface.calls[callname] = struct{}{}
defer func() {
// Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock()
delete(iface.calls, callname)
iface.mutex.Unlock()
}()
}
iface.mutex.Unlock()
if quit {
return return
} }
iface.calls[callname] = struct{}{}
defer func() {
// Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock()
delete(iface.calls, callname)
iface.mutex.Unlock()
}()
var conn net.Conn var conn net.Conn
var err error var err error
if socksaddr != nil { if socksaddr != nil {
@ -284,9 +293,19 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
// TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node?
return return
} }
remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String())
localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String())
if e1 != nil || e2 != nil {
return
}
info := tcpInfo{ // used as a map key, so don't include ephemeral link key info := tcpInfo{ // used as a map key, so don't include ephemeral link key
box: meta.box, box: meta.box,
sig: meta.sig, sig: meta.sig,
localAddr: localAddr,
remoteAddr: remoteAddr,
}
if iface.isAlreadyConnected(info) {
return
} }
// Quit the parent call if this is a connection to ourself // Quit the parent call if this is a connection to ourself
equiv := func(k1, k2 []byte) bool { equiv := func(k1, k2 []byte) bool {
@ -297,14 +316,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
} }
return true return true
} }
if equiv(info.box[:], iface.core.boxPub[:]) { if equiv(meta.box[:], iface.core.boxPub[:]) {
return return
} }
if equiv(info.sig[:], iface.core.sigPub[:]) { if equiv(meta.sig[:], iface.core.sigPub[:]) {
return return
} }
// Check if we're authorized to connect to this key / IP // Check if we're authorized to connect to this key / IP
if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&info.box) { if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
// Allow unauthorized peers if they're link-local // Allow unauthorized peers if they're link-local
raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
raddr := net.ParseIP(raddrStr) raddr := net.ParseIP(raddrStr)
@ -313,14 +332,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
} }
} }
// Check if we already have a connection to this node, close and block if yes // Check if we already have a connection to this node, close and block if yes
info.remoteAddr, info.remotePort, _ = net.SplitHostPort(sock.RemoteAddr().String())
iface.mutex.Lock() iface.mutex.Lock()
if blockChan, isIn := iface.conns[info]; isIn { /*if blockChan, isIn := iface.conns[info]; isIn {
iface.mutex.Unlock() iface.mutex.Unlock()
sock.Close() sock.Close()
<-blockChan <-blockChan
return return
} }*/
blockChan := make(chan struct{}) blockChan := make(chan struct{})
iface.conns[info] = blockChan iface.conns[info] = blockChan
iface.mutex.Unlock() iface.mutex.Unlock()
@ -332,7 +350,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}() }()
// Note that multiple connections to the same node are allowed // Note that multiple connections to the same node are allowed
// E.g. over different interfaces // E.g. over different interfaces
p := iface.core.peers.newPeer(&info.box, &info.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String())
p.linkOut = make(chan []byte, 1) p.linkOut = make(chan []byte, 1)
in := func(bs []byte) { in := func(bs []byte) {
p.handlePacket(bs) p.handlePacket(bs)
@ -394,7 +412,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}() }()
us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) us, _, _ := net.SplitHostPort(sock.LocalAddr().String())
them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
themNodeID := crypto.GetNodeID(&info.box) themNodeID := crypto.GetNodeID(&meta.box)
themAddr := address.AddrForNodeID(themNodeID) themAddr := address.AddrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them) themString := fmt.Sprintf("%s@%s", themAddrString, them)