From 62b9fab5f822f018940062ca8914852f41a8ce0b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 16 May 2020 18:56:04 -0500 Subject: [PATCH] more work-in-progress, debugging why things are dropping so often --- src/yggdrasil/link.go | 31 +++++++++++++------------------ src/yggdrasil/packetqueue.go | 2 +- src/yggdrasil/peer.go | 5 +---- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 3b3cfdb6..80989507 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -63,8 +63,7 @@ type linkInterface struct { stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it isIdle bool // True if the peer actor knows the link is idle - stalled bool // True if we haven't been receiving any response traffic - unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) + blocked bool // True if we've blocked the peer in the switch } func (l *link) init(c *Core) error { @@ -235,6 +234,7 @@ func (intf *linkInterface) handler() error { strings.ToUpper(intf.info.linkType), themString, intf.info.local) // Start things go intf.peer.start() + intf.Act(nil, intf._notifyIdle) intf.reader.Act(nil, intf.reader._read) // Wait for the reader to finish // TODO find a way to do this without keeping live goroutines around @@ -344,8 +344,9 @@ func (intf *linkInterface) _cancelStallTimer() { // through other links, if alternatives exist func (intf *linkInterface) notifyBlockedSend() { intf.Act(nil, func() { - if intf.sendTimer != nil { + if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. + intf.blocked = true intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) @@ -365,25 +366,21 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { }) } -// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state +// Notify the peer that we're ready for more traffic func (intf *linkInterface) _notifyIdle() { if !intf.isIdle { - if intf.stalled { - intf.unstalled = false - } else { - intf.isIdle = true - intf.peer.Act(intf, intf.peer._handleIdle) - } + intf.isIdle = true + intf.peer.Act(intf, intf.peer._handleIdle) } } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds func (intf *linkInterface) notifyStalled() { intf.Act(nil, func() { // Sent from a time.AfterFunc - if intf.stallTimer != nil { + if intf.stallTimer != nil && !intf.blocked { intf.stallTimer.Stop() intf.stallTimer = nil - intf.stalled = true + intf.blocked = true intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) @@ -406,15 +403,13 @@ func (intf *linkInterface) notifyRead(size int) { intf.stallTimer.Stop() intf.stallTimer = nil } - intf.stalled = false - if !intf.unstalled { - intf._notifyIdle() - intf.unstalled = true - } if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) } - intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + if intf.blocked { + intf.blocked = false + intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) + } }) } diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index e37d5bb3..caabe671 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -55,7 +55,7 @@ func (q *packetQueue) drop() bool { } // Drop the oldest packet from the worst stream packet := worstStream.infos[0].packet - if q.size-uint64(len(packet)) < streamMsgSize { + if false && q.size-uint64(len(packet)) < streamMsgSize { // TODO something better // We don't want to drop *all* packets, so lets save 1 batch worth... return false diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 0c195c6d..f88eb8bf 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -311,12 +311,9 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { switch { case seq != p.seq: case p.queue.drop(): + p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) p.intf.notifyQueued(p.seq) } - if seq != p.seq { - return - } - }) }