more work-in-progress, debugging why things are dropping so often

This commit is contained in:
Arceliar 2020-05-16 18:56:04 -05:00
parent b17a035a05
commit 62b9fab5f8
3 changed files with 15 additions and 23 deletions

View File

@ -63,8 +63,7 @@ type linkInterface struct {
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
isIdle bool // True if the peer actor knows the link is idle isIdle bool // True if the peer actor knows the link is idle
stalled bool // True if we haven't been receiving any response traffic blocked bool // True if we've blocked the peer in the switch
unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up)
} }
func (l *link) init(c *Core) error { func (l *link) init(c *Core) error {
@ -235,6 +234,7 @@ func (intf *linkInterface) handler() error {
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start things // Start things
go intf.peer.start() go intf.peer.start()
intf.Act(nil, intf._notifyIdle)
intf.reader.Act(nil, intf.reader._read) intf.reader.Act(nil, intf.reader._read)
// Wait for the reader to finish // Wait for the reader to finish
// TODO find a way to do this without keeping live goroutines around // TODO find a way to do this without keeping live goroutines around
@ -344,8 +344,9 @@ func (intf *linkInterface) _cancelStallTimer() {
// through other links, if alternatives exist // through other links, if alternatives exist
func (intf *linkInterface) notifyBlockedSend() { func (intf *linkInterface) notifyBlockedSend() {
intf.Act(nil, func() { intf.Act(nil, func() {
if intf.sendTimer != nil { if intf.sendTimer != nil && !intf.blocked {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.blocked = true
intf.link.core.switchTable.blockPeer(intf, intf.peer.port) intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
} }
}) })
@ -365,25 +366,21 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
}) })
} }
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state // Notify the peer that we're ready for more traffic
func (intf *linkInterface) _notifyIdle() { func (intf *linkInterface) _notifyIdle() {
if !intf.isIdle { if !intf.isIdle {
if intf.stalled {
intf.unstalled = false
} else {
intf.isIdle = true intf.isIdle = true
intf.peer.Act(intf, intf.peer._handleIdle) intf.peer.Act(intf, intf.peer._handleIdle)
} }
}
} }
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *linkInterface) notifyStalled() {
intf.Act(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil && !intf.blocked {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.stalled = true intf.blocked = true
intf.link.core.switchTable.blockPeer(intf, intf.peer.port) intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
} }
}) })
@ -406,15 +403,13 @@ func (intf *linkInterface) notifyRead(size int) {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
} }
intf.stalled = false
if !intf.unstalled {
intf._notifyIdle()
intf.unstalled = true
}
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
} }
if intf.blocked {
intf.blocked = false
intf.link.core.switchTable.unblockPeer(intf, intf.peer.port) intf.link.core.switchTable.unblockPeer(intf, intf.peer.port)
}
}) })
} }

View File

@ -55,7 +55,7 @@ func (q *packetQueue) drop() bool {
} }
// Drop the oldest packet from the worst stream // Drop the oldest packet from the worst stream
packet := worstStream.infos[0].packet packet := worstStream.infos[0].packet
if q.size-uint64(len(packet)) < streamMsgSize { if false && q.size-uint64(len(packet)) < streamMsgSize {
// TODO something better // TODO something better
// We don't want to drop *all* packets, so lets save 1 batch worth... // We don't want to drop *all* packets, so lets save 1 batch worth...
return false return false

View File

@ -311,12 +311,9 @@ func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
switch { switch {
case seq != p.seq: case seq != p.seq:
case p.queue.drop(): case p.queue.drop():
p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size)
p.intf.notifyQueued(p.seq) p.intf.notifyQueued(p.seq)
} }
if seq != p.seq {
return
}
}) })
} }