simplify how blocking is detected and packets are dequeued

This commit is contained in:
Arceliar 2020-05-27 18:53:14 -05:00
parent 38dcbb1e2f
commit 1df305d31c
3 changed files with 11 additions and 22 deletions

View File

@ -326,7 +326,6 @@ func (intf *link) handler() error {
type linkInterface interface {
out([][]byte)
linkOut([]byte)
notifyQueued(uint64)
close()
// These next ones are only used by the API
name() string
@ -355,15 +354,6 @@ func (intf *link) linkOut(bs []byte) {
})
}
func (intf *link) notifyQueued(seq uint64) {
// This is the part where we want non-nil 'from' fields
intf.Act(intf.peer, func() {
if intf.isSending {
intf.peer.dropFromQueue(intf, seq)
}
})
}
func (intf *link) close() {
intf.Act(nil, func() { intf.msgIO.close() })
}
@ -398,6 +388,7 @@ func (intf *link) notifySending(size int) {
intf.isSending = true
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelStallTimer()
intf.peer.notifyBlocked(intf)
})
}

View File

@ -266,23 +266,20 @@ func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) {
func (p *peer) _sendPacket(packet []byte) {
p.queue.push(packet)
switch {
case p.idle:
if p.idle {
p.idle = false
p._handleIdle()
case p.drop:
} else if p.drop {
for p.queue.size > p.max {
p.queue.drop()
}
default:
p.intf.notifyQueued(p.seq)
}
}
func (p *peer) _handleIdle() {
var packets [][]byte
var size uint64
for size < streamMsgSize {
for {
if packet, success := p.queue.pop(); success {
packets = append(packets, packet)
size += uint64(len(packet))
@ -297,16 +294,17 @@ func (p *peer) _handleIdle() {
p.max = p.queue.size
} else {
p.idle = true
p.drop = false
}
p.drop = false
}
func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
func (p *peer) notifyBlocked(from phony.Actor) {
p.Act(from, func() {
seq := p.seq
p.Act(nil, func() {
if seq == p.seq {
p.drop = true
p.max = p.queue.size + streamMsgSize
p.max = 2*p.queue.size + streamMsgSize
}
})
})

View File

@ -270,14 +270,14 @@ func (intf *routerInterface) out(bss [][]byte) {
intf.router._handlePacket(bs)
}
})
//intf.router.peer.Act(nil, intf.router.peer._handleIdle)
// This should now immediately make the peer idle again
// So the self-peer shouldn't end up buffering anything
// We let backpressure act as a throttle instead
intf.router.peer._handleIdle()
}
func (intf *routerInterface) linkOut(_ []byte) {}
func (intf *routerInterface) notifyQueued(seq uint64) {}
func (intf *routerInterface) close() {}
func (intf *routerInterface) name() string { return "(self)" }