From b44a0f29f3ed3ed31323027eef864acf20240259 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Feb 2019 22:18:55 -0600 Subject: [PATCH 1/4] send an ack if we receive a packet and don't have any return traffic, keeping a legacy 4-second keep-alive in case there's no traffic at all to send (to be removed later, after nodes have upgraded), ideally we should either remove ReadTimeout or use it for the switch idle timeout instead --- src/util/util.go | 12 ++++++ src/yggdrasil/link.go | 85 +++++++++++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 32 deletions(-) diff --git a/src/util/util.go b/src/util/util.go index 65e6d463..45be3b19 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func Yield() { @@ -44,3 +45,14 @@ func PutBytes(bs []byte) { default: } } + +// This is a workaround to go's broken timer implementation +func TimerStop(t *time.Timer) bool { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return true +} diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 2d2155c9..a6c0ccc3 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,6 +20,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface awdl awdl // AWDL interface support + // TODO timeout (to remove from switch), read from config.ReadTimeout } type linkInfo struct { @@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } - //l.interfaces[intf.name] = &intf - //go intf.start() return &intf, nil } @@ -172,41 +171,49 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) + signalSent := make(chan struct{}, 1) + sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) + defer close(signalSent) interval := 4 * time.Second - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } + tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp + defer util.TimerStop(tcpTimer) + send := func(bs []byte) { + intf.msgIO.writeMsg(bs) + select { + case signalSent <- struct{}{}: + default: } } - defer clearTimer() for { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - intf.msgIO.writeMsg(msg) + send(msg) continue default: } // No protocol traffic to send, so reset the timer - clearTimer() - timer.Reset(interval) + util.TimerStop(tcpTimer) + tcpTimer.Reset(interval) // Now block until something is ready or the timer triggers keepalive traffic select { - case <-timer.C: - intf.msgIO.writeMsg(nil) + case <-tcpTimer.C: + intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case <-sendAck: + intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) case msg := <-intf.peer.linkOut: intf.msgIO.writeMsg(msg) case msg, ok := <-out: if !ok { return } - intf.msgIO.writeMsg(msg) + send(msg) util.PutBytes(msg) select { case signalReady <- struct{}{}: @@ -217,27 +224,23 @@ func (intf *linkInterface) handler() error { }() //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Used to enable/disable activity in the switch - signalAlive := make(chan struct{}, 1) + signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) go func() { var isAlive bool var isReady bool - interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - } - defer clearTimer() + var ackTimerRunning bool + timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + ackTime := time.Second + timer := time.NewTimer(timeout) + defer util.TimerStop(timer) + ackTimer := time.NewTimer(ackTime) + defer util.TimerStop(ackTimer) for { - clearTimer() - timer.Reset(interval) + util.TimerStop(timer) + timer.Reset(timeout) select { - case _, ok := <-signalAlive: + case gotMsg, ok := <-signalAlive: if !ok { return } @@ -249,6 +252,24 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } + if gotMsg && !ackTimerRunning { + util.TimerStop(ackTimer) + ackTimer.Reset(ackTime) + ackTimerRunning = true + } + case _, ok := <-signalSent: + // Stop any running ack timer + if !ok { + return + } + util.TimerStop(ackTimer) + ackTimerRunning = false + case <-ackTimer.C: + // We haven't sent anything in the past ackTimeout, so signal a send of a nil packet + select { + case sendAck <- struct{}{}: + default: + } case _, ok := <-signalReady: if !ok { return @@ -275,7 +296,7 @@ func (intf *linkInterface) handler() error { return err } select { - case signalAlive <- struct{}{}: + case signalAlive <- len(msg) > 0: default: } } From ebbe5f67ad5a988e435aaf3d7df8390a7eab8da5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 2 Feb 2019 22:41:51 -0600 Subject: [PATCH 2/4] don't time out a link unless we were expecting an ack and didn't get one --- src/yggdrasil/link.go | 57 +++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a6c0ccc3..6704c918 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -171,7 +171,7 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) - signalSent := make(chan struct{}, 1) + signalSent := make(chan bool, 1) sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) @@ -182,7 +182,7 @@ func (intf *linkInterface) handler() error { send := func(bs []byte) { intf.msgIO.writeMsg(bs) select { - case signalSent <- struct{}{}: + case signalSent <- len(bs) > 0: default: } } @@ -229,16 +229,15 @@ func (intf *linkInterface) handler() error { go func() { var isAlive bool var isReady bool - var ackTimerRunning bool - timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - ackTime := time.Second - timer := time.NewTimer(timeout) - defer util.TimerStop(timer) - ackTimer := time.NewTimer(ackTime) - defer util.TimerStop(ackTimer) + var sendTimerRunning bool + var recvTimerRunning bool + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + sendTime := time.Second + sendTimer := time.NewTimer(sendTime) + defer util.TimerStop(sendTimer) + recvTimer := time.NewTimer(recvTime) + defer util.TimerStop(recvTimer) for { - util.TimerStop(timer) - timer.Reset(timeout) select { case gotMsg, ok := <-signalAlive: if !ok { @@ -252,23 +251,26 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } - if gotMsg && !ackTimerRunning { - util.TimerStop(ackTimer) - ackTimer.Reset(ackTime) - ackTimerRunning = true + if gotMsg && !sendTimerRunning { + // We got a message + // Start a timer, if it expires then send a 0-sized ack to let them know we're alive + util.TimerStop(sendTimer) + sendTimer.Reset(sendTime) + sendTimerRunning = true } - case _, ok := <-signalSent: + case sentMsg, ok := <-signalSent: // Stop any running ack timer if !ok { return } - util.TimerStop(ackTimer) - ackTimerRunning = false - case <-ackTimer.C: - // We haven't sent anything in the past ackTimeout, so signal a send of a nil packet - select { - case sendAck <- struct{}{}: - default: + util.TimerStop(sendTimer) + sendTimerRunning = false + if sentMsg && !recvTimerRunning { + // We sent a message + // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem + util.TimerStop(recvTimer) + recvTimer.Reset(recvTime) + recvTimerRunning = true } case _, ok := <-signalReady: if !ok { @@ -281,7 +283,14 @@ func (intf *linkInterface) handler() error { // Keep enabled in the switch intf.link.core.switchTable.idleIn <- intf.peer.port } - case <-timer.C: + case <-sendTimer.C: + // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive + select { + case sendAck <- struct{}{}: + default: + } + case <-recvTimer.C: + // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding isAlive = false } } From 5ddf84f32991ae8bda3361cbbbea7c46294e97d3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 3 Feb 2019 15:22:14 -0600 Subject: [PATCH 3/4] remove peers completely after a long switch timeout, this could use some improvement later --- src/yggdrasil/switch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f2adf3fb..db39d010 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -281,6 +281,7 @@ func (t *switchTable) cleanPeers() { if now.Sub(peer.time) > switch_timeout+switch_throttle { // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. delete(t.data.peers, port) + go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe } } if _, isIn := t.data.peers[t.parent]; !isIn { From 2f8dd5dde01c75ff76e01157fb77ddeec1e03053 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 3 Feb 2019 15:50:25 -0600 Subject: [PATCH 4/4] remove race condition in setting peer.close by requiring it as an argument to newPeer --- src/yggdrasil/debug.go | 4 +--- src/yggdrasil/link.go | 9 +++------ src/yggdrasil/peer.go | 3 ++- src/yggdrasil/router.go | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f7319e46..94faba40 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -97,9 +97,7 @@ func (c *Core) DEBUG_getPeers() *peers { } func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { - //in <-chan []byte, - //out chan<- []byte) *peer { - return ps.newPeer(&box, &sig, &link, "(simulator)") //, in, out) + return ps.newPeer(&box, &sig, &link, "(simulator)", nil) } /* diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 6704c918..be98dd92 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -141,7 +141,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) if intf.peer == nil { return errors.New("failed to create peer") } @@ -160,13 +160,10 @@ func (intf *linkInterface) handler() error { themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) - intf.peer.close = func() { - intf.msgIO.close() - intf.link.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - } intf.link.core.log.Infof("Connected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) + defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start the link loop go intf.peer.linkLoop() // Start the writer diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad54fbc3..237d6f61 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -113,7 +113,7 @@ type peer struct { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string) *peer { +func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -123,6 +123,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare firstSeen: now, doSend: make(chan struct{}, 1), dinfo: make(chan *dhtInfo, 1), + close: closer, core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d5059369..99e69828 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,7 @@ func (r *router) init(core *Core) { r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)") + p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { // This is to make very sure it never blocks select {