diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a190679f..c5654132 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -23,11 +23,14 @@ type tcpInterface struct { serv *net.TCPListener mutex sync.Mutex // Protecting the below calls map[string]struct{} + conns map[tcpInfo](chan struct{}) } -type tcpKeys struct { - box boxPubKey - sig sigPubKey +type tcpInfo struct { + box boxPubKey + sig sigPubKey + localAddr string // net.IPAddr.String(), not TCPAddr, don't care about port + remoteAddr string } func (iface *tcpInterface) init(core *Core, addr string) { @@ -41,6 +44,7 @@ func (iface *tcpInterface) init(core *Core, addr string) { panic(err) } iface.calls = make(map[string]struct{}) + iface.conns = make(map[tcpInfo](chan struct{})) go iface.listener() } @@ -102,8 +106,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { if n < len(keys) { /*panic("Partial key packet?") ;*/ return } - ks := tcpKeys{} - if !tcp_chop_keys(&ks.box, &ks.sig, &keys) { /*panic("Invalid key packet?") ;*/ + info := tcpInfo{} + if !tcp_chop_keys(&info.box, &info.sig, &keys) { /*panic("Invalid key packet?") ;*/ return } // Quit the parent call if this is a connection to ourself @@ -115,16 +119,40 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } return true } - if equiv(ks.box[:], iface.core.boxPub[:]) { + if equiv(info.box[:], iface.core.boxPub[:]) { return } // testing - if equiv(ks.sig[:], iface.core.sigPub[:]) { + if equiv(info.sig[:], iface.core.sigPub[:]) { return } + // Check if we already have a connection to this node, close and block if yes + local := sock.LocalAddr().(*net.TCPAddr) + laddr := net.IPAddr{ + IP: local.IP, + Zone: local.Zone, + } + info.localAddr = laddr.String() + remote := sock.RemoteAddr().(*net.TCPAddr) + raddr := net.IPAddr{ + IP: remote.IP, + Zone: remote.Zone, + } + info.remoteAddr = raddr.String() + iface.mutex.Lock() + if blockChan, isIn := iface.conns[info]; isIn { + iface.mutex.Unlock() + sock.Close() + <-blockChan + return + } + blockChan := make(chan struct{}) + iface.conns[info] = blockChan + iface.mutex.Unlock() + defer close(blockChan) // Note that multiple connections to the same node are allowed // E.g. over different interfaces linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&ks.box, &ks.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) in := func(bs []byte) { p.handlePacket(bs, linkIn) } @@ -197,8 +225,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { p.core.peers.mutex.Unlock() close(linkIn) }() - them := sock.RemoteAddr() - themNodeID := getNodeID(&ks.box) + them := sock.RemoteAddr().(*net.TCPAddr) + themNodeID := getNodeID(&info.box) themAddr := address_addrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them)