send an ack if we receive a packet and don't have any return traffic, keeping a legacy 4-second keep-alive in case there's no traffic at all to send (to be removed later, after nodes have upgraded), ideally we should either remove ReadTimeout or use it for the switch idle timeout instead

This commit is contained in:
Arceliar 2019-02-02 22:18:55 -06:00
parent 6d83d970bb
commit b44a0f29f3
2 changed files with 65 additions and 32 deletions

View File

@ -3,6 +3,7 @@ package util
// These are misc. utility functions that didn't really fit anywhere else
import "runtime"
import "time"
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
func Yield() {
@ -44,3 +45,14 @@ func PutBytes(bs []byte) {
default:
}
}
// This is a workaround to go's broken timer implementation
func TimerStop(t *time.Timer) bool {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
return true
}

View File

@ -20,6 +20,7 @@ type link struct {
mutex sync.RWMutex // protects interfaces below
interfaces map[linkInfo]*linkInterface
awdl awdl // AWDL interface support
// TODO timeout (to remove from switch), read from config.ReadTimeout
}
type linkInfo struct {
@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
incoming: incoming,
force: force,
}
//l.interfaces[intf.name] = &intf
//go intf.start()
return &intf, nil
}
@ -172,41 +171,49 @@ func (intf *linkInterface) handler() error {
go intf.peer.linkLoop()
// Start the writer
signalReady := make(chan struct{}, 1)
signalSent := make(chan struct{}, 1)
sendAck := make(chan struct{}, 1)
go func() {
defer close(signalReady)
defer close(signalSent)
interval := 4 * time.Second
timer := time.NewTimer(interval)
clearTimer := func() {
if !timer.Stop() {
tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
defer util.TimerStop(tcpTimer)
send := func(bs []byte) {
intf.msgIO.writeMsg(bs)
select {
case <-timer.C:
case signalSent <- struct{}{}:
default:
}
}
}
defer clearTimer()
for {
// First try to send any link protocol traffic
select {
case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg)
send(msg)
continue
default:
}
// No protocol traffic to send, so reset the timer
clearTimer()
timer.Reset(interval)
util.TimerStop(tcpTimer)
tcpTimer.Reset(interval)
// Now block until something is ready or the timer triggers keepalive traffic
select {
case <-timer.C:
intf.msgIO.writeMsg(nil)
case <-tcpTimer.C:
intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
case <-sendAck:
intf.link.core.log.Debugf("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg)
case msg, ok := <-out:
if !ok {
return
}
intf.msgIO.writeMsg(msg)
send(msg)
util.PutBytes(msg)
select {
case signalReady <- struct{}{}:
@ -217,27 +224,23 @@ func (intf *linkInterface) handler() error {
}()
//intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle
// Used to enable/disable activity in the switch
signalAlive := make(chan struct{}, 1)
signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
defer close(signalAlive)
go func() {
var isAlive bool
var isReady bool
interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
timer := time.NewTimer(interval)
clearTimer := func() {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}
defer clearTimer()
var ackTimerRunning bool
timeout := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
ackTime := time.Second
timer := time.NewTimer(timeout)
defer util.TimerStop(timer)
ackTimer := time.NewTimer(ackTime)
defer util.TimerStop(ackTimer)
for {
clearTimer()
timer.Reset(interval)
util.TimerStop(timer)
timer.Reset(timeout)
select {
case _, ok := <-signalAlive:
case gotMsg, ok := <-signalAlive:
if !ok {
return
}
@ -249,6 +252,24 @@ func (intf *linkInterface) handler() error {
intf.link.core.switchTable.idleIn <- intf.peer.port
}
}
if gotMsg && !ackTimerRunning {
util.TimerStop(ackTimer)
ackTimer.Reset(ackTime)
ackTimerRunning = true
}
case _, ok := <-signalSent:
// Stop any running ack timer
if !ok {
return
}
util.TimerStop(ackTimer)
ackTimerRunning = false
case <-ackTimer.C:
// We haven't sent anything in the past ackTimeout, so signal a send of a nil packet
select {
case sendAck <- struct{}{}:
default:
}
case _, ok := <-signalReady:
if !ok {
return
@ -275,7 +296,7 @@ func (intf *linkInterface) handler() error {
return err
}
select {
case signalAlive <- struct{}{}:
case signalAlive <- len(msg) > 0:
default:
}
}