diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index e0620d10..0dd97ec5 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -326,7 +326,6 @@ func (intf *link) handler() error { type linkInterface interface { out([][]byte) linkOut([]byte) - notifyQueued(uint64) close() // These next ones are only used by the API name() string @@ -355,15 +354,6 @@ func (intf *link) linkOut(bs []byte) { }) } -func (intf *link) notifyQueued(seq uint64) { - // This is the part where we want non-nil 'from' fields - intf.Act(intf.peer, func() { - if intf.isSending { - intf.peer.dropFromQueue(intf, seq) - } - }) -} - func (intf *link) close() { intf.Act(nil, func() { intf.msgIO.close() }) } @@ -398,6 +388,7 @@ func (intf *link) notifySending(size int) { intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() + intf.peer.notifyBlocked(intf) }) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 3976c8f6..f04ab280 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -266,23 +266,20 @@ func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) { func (p *peer) _sendPacket(packet []byte) { p.queue.push(packet) - switch { - case p.idle: + if p.idle { p.idle = false p._handleIdle() - case p.drop: + } else if p.drop { for p.queue.size > p.max { p.queue.drop() } - default: - p.intf.notifyQueued(p.seq) } } func (p *peer) _handleIdle() { var packets [][]byte var size uint64 - for size < streamMsgSize { + for { if packet, success := p.queue.pop(); success { packets = append(packets, packet) size += uint64(len(packet)) @@ -297,16 +294,17 @@ func (p *peer) _handleIdle() { p.max = p.queue.size } else { p.idle = true - p.drop = false } + p.drop = false } -func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { +func (p *peer) notifyBlocked(from phony.Actor) { p.Act(from, func() { + seq := p.seq p.Act(nil, func() { if seq == p.seq { p.drop = true - p.max = p.queue.size + streamMsgSize + p.max = 2*p.queue.size + streamMsgSize } }) }) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 2ab38555..d387346e 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -270,14 +270,14 @@ func (intf *routerInterface) out(bss [][]byte) { intf.router._handlePacket(bs) } }) - //intf.router.peer.Act(nil, intf.router.peer._handleIdle) + // This should now immediately make the peer idle again + // So the self-peer shouldn't end up buffering anything + // We let backpressure act as a throttle instead intf.router.peer._handleIdle() } func (intf *routerInterface) linkOut(_ []byte) {} -func (intf *routerInterface) notifyQueued(seq uint64) {} - func (intf *routerInterface) close() {} func (intf *routerInterface) name() string { return "(self)" }