From abd8b69979eb846dc12b6b7bcea51aebdf8065f4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 14 Dec 2018 18:15:35 -0600 Subject: [PATCH 1/3] send a switch message immediately when peering, and use OS-level TCP keep-alive (shouldn't matter right now, since we have application-level keep-alive that preempts it, but important later) --- src/yggdrasil/peer.go | 9 ++++----- src/yggdrasil/tcp.go | 24 +++++++++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 67aa805a..e092513b 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -177,6 +177,7 @@ func (p *peer) doSendSwitchMsgs() { func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() + p.doSendSwitchMsgs() for { select { case _, ok := <-p.doSend: @@ -185,11 +186,9 @@ func (p *peer) linkLoop() { } p.sendSwitchMsg() case _ = <-tick.C: - //break // FIXME disabled the below completely to test something - pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line - if pdinfo != nil { - dinfo := *pdinfo - p.core.dht.peers <- &dinfo + dinfo := p.dinfo // FIXME? are pointer reads *always* atomic? + if dinfo != nil { + p.core.dht.peers <- dinfo } } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 5ca66304..5d4bfc71 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -31,14 +31,6 @@ const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense const default_tcp_timeout = 6 * time.Second const tcp_ping_interval = (default_tcp_timeout * 2 / 3) -// Wrapper function for non tcp/ip connections. -func setNoDelay(c net.Conn, delay bool) { - tcp, ok := c.(*net.TCPConn) - if ok { - tcp.SetNoDelay(delay) - } -} - // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core @@ -58,6 +50,20 @@ type tcpInfo struct { remoteAddr string } +// Wrapper function to set additional options for specific connection types. +func (iface *tcpInterface) setExtraOptions(c net.Conn) { + switch sock := c.(type) { + case *net.TCPConn: + sock.SetNoDelay(true) + sock.SetKeepAlive(true) + sock.SetKeepAlivePeriod(iface.tcp_timeout) + panic("DEBUG testing") + // TODO something for socks5 + default: + iface.core.log.Println("Unrecognized connection type: %v", sock) + } +} + // Returns the address of the listener. func (iface *tcpInterface) getAddr() *net.TCPAddr { return iface.serv.Addr().(*net.TCPAddr) @@ -205,6 +211,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { // It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() + iface.setExtraOptions(sock) // Get our keys myLinkPub, myLinkPriv := newBoxKeys() // ephemeral link keys meta := version_getBaseMetadata() @@ -342,7 +349,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { out <- msg } p.close = func() { sock.Close() } - setNoDelay(sock, true) go p.linkLoop() defer func() { // Put all of our cleanup here... From 570e85c2972b375ae85196b39af5b15d2ff40da2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 14 Dec 2018 21:12:25 -0600 Subject: [PATCH 2/3] remove debug code --- src/yggdrasil/tcp.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 5d4bfc71..a7941662 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -57,10 +57,8 @@ func (iface *tcpInterface) setExtraOptions(c net.Conn) { sock.SetNoDelay(true) sock.SetKeepAlive(true) sock.SetKeepAlivePeriod(iface.tcp_timeout) - panic("DEBUG testing") // TODO something for socks5 default: - iface.core.log.Println("Unrecognized connection type: %v", sock) } } From 4875ab89545c3fb1ef20a07f24930b5083410baa Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 14 Dec 2018 21:44:31 -0600 Subject: [PATCH 3/3] peer thread safey for dhtInfo updates --- src/yggdrasil/peer.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index e092513b..e4382a83 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -90,7 +90,7 @@ type peer struct { firstSeen time.Time // To track uptime for getPeers linkOut (chan []byte) // used for protocol traffic (to bypass queues) doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working + dinfo (chan *dhtInfo) // used to keep the DHT working out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -105,6 +105,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKe endpoint: endpoint, firstSeen: now, doSend: make(chan struct{}, 1), + dinfo: make(chan *dhtInfo, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -178,6 +179,7 @@ func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() p.doSendSwitchMsgs() + var dinfo *dhtInfo for { select { case _, ok := <-p.doSend: @@ -185,8 +187,8 @@ func (p *peer) linkLoop() { return } p.sendSwitchMsg() + case dinfo = <-p.dinfo: case _ = <-tick.C: - dinfo := p.dinfo // FIXME? are pointer reads *always* atomic? if dinfo != nil { p.core.dht.peers <- dinfo } @@ -218,8 +220,9 @@ func (p *peer) handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.dinfo == nil { - // Drop traffic until the peer manages to send us at least one good switchMsg + table := p.core.switchTable.getTable() + if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + // Drop traffic if the peer isn't in the switch return } p.core.switchTable.packetIn <- packet @@ -323,9 +326,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.switchTable.handleMsg(&msg, p.port) if !p.core.switchTable.checkRoot(&msg) { // Bad switch message - // Stop forwarding traffic from it - // Stop refreshing it in the DHT - p.dinfo = nil + p.dinfo <- nil return } // Pass a mesage to the dht informing it that this peer (still) exists @@ -334,8 +335,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { key: p.box, coords: loc.getCoords(), } - //p.core.dht.peers <- &dinfo - p.dinfo = &dinfo + p.dinfo <- &dinfo } // This generates the bytes that we sign or check the signature of for a switchMsg.