diff --git a/src/util/util.go b/src/util/util.go index 65e6d463..45be3b19 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func Yield() { @@ -44,3 +45,14 @@ func PutBytes(bs []byte) { default: } } + +// This is a workaround to go's broken timer implementation +func TimerStop(t *time.Timer) bool { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return true +} diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 2d2155c9..a6c0ccc3 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -20,6 +20,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface awdl awdl // AWDL interface support + // TODO timeout (to remove from switch), read from config.ReadTimeout } type linkInfo struct { @@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } - //l.interfaces[intf.name] = &intf - //go intf.start() return &intf, nil } @@ -172,41 +171,49 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer signalReady := make(chan struct{}, 1) + signalSent := make(chan struct{}, 1) + sendAck := make(chan struct{}, 1) go func() { defer close(signalReady) + defer close(signalSent) interval := 4 * time.Second - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } + tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp + defer util.TimerStop(tcpTimer) + send := func(bs []byte) { + intf.msgIO.writeMsg(bs) + select { + case signalSent <- struct{}{}: + default: } } - defer clearTimer() for { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - intf.msgIO.writeMsg(msg) + send(msg) continue default: } // No protocol traffic to send, so reset the timer - clearTimer() - timer.Reset(interval) + util.TimerStop(tcpTimer) + tcpTimer.Reset(interval) // Now block until something is ready or the timer triggers keepalive traffic select { - case <-timer.C: - intf.msgIO.writeMsg(nil) + case <-tcpTimer.C: + intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case <-sendAck: + intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) case msg := <-intf.peer.linkOut: intf.msgIO.writeMsg(msg) case msg, ok := <-out: if !ok { return } - intf.msgIO.writeMsg(msg) + send(msg) util.PutBytes(msg) select { case signalReady <- struct{}{}: @@ -217,27 +224,23 @@ func (intf *linkInterface) handler() error { }() //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle // Used to enable/disable activity in the switch - signalAlive := make(chan struct{}, 1) + signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive defer close(signalAlive) go func() { var isAlive bool var isReady bool - interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed - timer := time.NewTimer(interval) - clearTimer := func() { - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - } - defer clearTimer() + var ackTimerRunning bool + timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + ackTime := time.Second + timer := time.NewTimer(timeout) + defer util.TimerStop(timer) + ackTimer := time.NewTimer(ackTime) + defer util.TimerStop(ackTimer) for { - clearTimer() - timer.Reset(interval) + util.TimerStop(timer) + timer.Reset(timeout) select { - case _, ok := <-signalAlive: + case gotMsg, ok := <-signalAlive: if !ok { return } @@ -249,6 +252,24 @@ func (intf *linkInterface) handler() error { intf.link.core.switchTable.idleIn <- intf.peer.port } } + if gotMsg && !ackTimerRunning { + util.TimerStop(ackTimer) + ackTimer.Reset(ackTime) + ackTimerRunning = true + } + case _, ok := <-signalSent: + // Stop any running ack timer + if !ok { + return + } + util.TimerStop(ackTimer) + ackTimerRunning = false + case <-ackTimer.C: + // We haven't sent anything in the past ackTimeout, so signal a send of a nil packet + select { + case sendAck <- struct{}{}: + default: + } case _, ok := <-signalReady: if !ok { return @@ -275,7 +296,7 @@ func (intf *linkInterface) handler() error { return err } select { - case signalAlive <- struct{}{}: + case signalAlive <- len(msg) > 0: default: } }