Merge pull request #322 from Arceliar/link

Idle link logic changes
This commit is contained in:
Arceliar 2019-02-03 15:57:45 -06:00 committed by GitHub
commit a49a9bbf18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 44 deletions

View File

@ -3,6 +3,7 @@ package util
// These are misc. utility functions that didn't really fit anywhere else // These are misc. utility functions that didn't really fit anywhere else
import "runtime" import "runtime"
import "time"
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
func Yield() { func Yield() {
@ -44,3 +45,14 @@ func PutBytes(bs []byte) {
default: default:
} }
} }
// This is a workaround to go's broken timer implementation
func TimerStop(t *time.Timer) bool {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
return true
}

View File

@ -97,9 +97,7 @@ func (c *Core) DEBUG_getPeers() *peers {
} }
func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer {
//in <-chan []byte, return ps.newPeer(&box, &sig, &link, "(simulator)", nil)
//out chan<- []byte) *peer {
return ps.newPeer(&box, &sig, &link, "(simulator)") //, in, out)
} }
/* /*

View File

@ -20,6 +20,7 @@ type link struct {
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects interfaces below
interfaces map[linkInfo]*linkInterface interfaces map[linkInfo]*linkInterface
awdl awdl // AWDL interface support awdl awdl // AWDL interface support
// TODO timeout (to remove from switch), read from config.ReadTimeout
} }
type linkInfo struct { type linkInfo struct {
@ -78,8 +79,6 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
incoming: incoming, incoming: incoming,
force: force, force: force,
} }
//l.interfaces[intf.name] = &intf
//go intf.start()
return &intf, nil return &intf, nil
} }
@ -142,7 +141,7 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Unlock() intf.link.mutex.Unlock()
// Create peer // Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link) shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() })
if intf.peer == nil { if intf.peer == nil {
return errors.New("failed to create peer") return errors.New("failed to create peer")
} }
@ -161,52 +160,57 @@ func (intf *linkInterface) handler() error {
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
intf.peer.close = func() {
intf.msgIO.close()
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
intf.link.core.log.Infof("Connected %s: %s, source %s", intf.link.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
defer intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start the link loop // Start the link loop
go intf.peer.linkLoop() go intf.peer.linkLoop()
// Start the writer // Start the writer
signalReady := make(chan struct{}, 1) signalReady := make(chan struct{}, 1)
signalSent := make(chan bool, 1)
sendAck := make(chan struct{}, 1)
go func() { go func() {
defer close(signalReady) defer close(signalReady)
defer close(signalSent)
interval := 4 * time.Second interval := 4 * time.Second
timer := time.NewTimer(interval) tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
clearTimer := func() { defer util.TimerStop(tcpTimer)
if !timer.Stop() { send := func(bs []byte) {
select { intf.msgIO.writeMsg(bs)
case <-timer.C: select {
default: case signalSent <- len(bs) > 0:
} default:
} }
} }
defer clearTimer()
for { for {
// First try to send any link protocol traffic // First try to send any link protocol traffic
select { select {
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg) send(msg)
continue continue
default: default:
} }
// No protocol traffic to send, so reset the timer // No protocol traffic to send, so reset the timer
clearTimer() util.TimerStop(tcpTimer)
timer.Reset(interval) tcpTimer.Reset(interval)
// Now block until something is ready or the timer triggers keepalive traffic // Now block until something is ready or the timer triggers keepalive traffic
select { select {
case <-timer.C: case <-tcpTimer.C:
intf.msgIO.writeMsg(nil) intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
case <-sendAck:
intf.link.core.log.Debugf("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg) intf.msgIO.writeMsg(msg)
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
return return
} }
intf.msgIO.writeMsg(msg) send(msg)
util.PutBytes(msg) util.PutBytes(msg)
select { select {
case signalReady <- struct{}{}: case signalReady <- struct{}{}:
@ -217,27 +221,22 @@ func (intf *linkInterface) handler() error {
}() }()
//intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle
// Used to enable/disable activity in the switch // Used to enable/disable activity in the switch
signalAlive := make(chan struct{}, 1) signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
defer close(signalAlive) defer close(signalAlive)
go func() { go func() {
var isAlive bool var isAlive bool
var isReady bool var isReady bool
interval := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed var sendTimerRunning bool
timer := time.NewTimer(interval) var recvTimerRunning bool
clearTimer := func() { recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
if !timer.Stop() { sendTime := time.Second
select { sendTimer := time.NewTimer(sendTime)
case <-timer.C: defer util.TimerStop(sendTimer)
default: recvTimer := time.NewTimer(recvTime)
} defer util.TimerStop(recvTimer)
}
}
defer clearTimer()
for { for {
clearTimer()
timer.Reset(interval)
select { select {
case _, ok := <-signalAlive: case gotMsg, ok := <-signalAlive:
if !ok { if !ok {
return return
} }
@ -249,6 +248,27 @@ func (intf *linkInterface) handler() error {
intf.link.core.switchTable.idleIn <- intf.peer.port intf.link.core.switchTable.idleIn <- intf.peer.port
} }
} }
if gotMsg && !sendTimerRunning {
// We got a message
// Start a timer, if it expires then send a 0-sized ack to let them know we're alive
util.TimerStop(sendTimer)
sendTimer.Reset(sendTime)
sendTimerRunning = true
}
case sentMsg, ok := <-signalSent:
// Stop any running ack timer
if !ok {
return
}
util.TimerStop(sendTimer)
sendTimerRunning = false
if sentMsg && !recvTimerRunning {
// We sent a message
// Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem
util.TimerStop(recvTimer)
recvTimer.Reset(recvTime)
recvTimerRunning = true
}
case _, ok := <-signalReady: case _, ok := <-signalReady:
if !ok { if !ok {
return return
@ -260,7 +280,14 @@ func (intf *linkInterface) handler() error {
// Keep enabled in the switch // Keep enabled in the switch
intf.link.core.switchTable.idleIn <- intf.peer.port intf.link.core.switchTable.idleIn <- intf.peer.port
} }
case <-timer.C: case <-sendTimer.C:
// We haven't sent anything, so signal a send of a 0 packet to let them know we're alive
select {
case sendAck <- struct{}{}:
default:
}
case <-recvTimer.C:
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
isAlive = false isAlive = false
} }
} }
@ -275,7 +302,7 @@ func (intf *linkInterface) handler() error {
return err return err
} }
select { select {
case signalAlive <- struct{}{}: case signalAlive <- len(msg) > 0:
default: default:
} }
} }

View File

@ -113,7 +113,7 @@ type peer struct {
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string) *peer { func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
sig: *sig, sig: *sig,
@ -123,6 +123,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
firstSeen: now, firstSeen: now,
doSend: make(chan struct{}, 1), doSend: make(chan struct{}, 1),
dinfo: make(chan *dhtInfo, 1), dinfo: make(chan *dhtInfo, 1),
close: closer,
core: ps.core} core: ps.core}
ps.mutex.Lock() ps.mutex.Lock()
defer ps.mutex.Unlock() defer ps.mutex.Unlock()

View File

@ -67,7 +67,7 @@ func (r *router) init(core *Core) {
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this... in := make(chan []byte, 32) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)") p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil)
p.out = func(packet []byte) { p.out = func(packet []byte) {
// This is to make very sure it never blocks // This is to make very sure it never blocks
select { select {

View File

@ -281,6 +281,7 @@ func (t *switchTable) cleanPeers() {
if now.Sub(peer.time) > switch_timeout+switch_throttle { if now.Sub(peer.time) > switch_timeout+switch_throttle {
// Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding.
delete(t.data.peers, port) delete(t.data.peers, port)
go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe
} }
} }
if _, isIn := t.data.peers[t.parent]; !isIn { if _, isIn := t.data.peers[t.parent]; !isIn {