mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-27 12:05:23 +00:00
commit
35e7542889
@ -326,7 +326,6 @@ func (intf *link) handler() error {
|
|||||||
type linkInterface interface {
|
type linkInterface interface {
|
||||||
out([][]byte)
|
out([][]byte)
|
||||||
linkOut([]byte)
|
linkOut([]byte)
|
||||||
notifyQueued(uint64)
|
|
||||||
close()
|
close()
|
||||||
// These next ones are only used by the API
|
// These next ones are only used by the API
|
||||||
name() string
|
name() string
|
||||||
@ -355,15 +354,6 @@ func (intf *link) linkOut(bs []byte) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (intf *link) notifyQueued(seq uint64) {
|
|
||||||
// This is the part where we want non-nil 'from' fields
|
|
||||||
intf.Act(intf.peer, func() {
|
|
||||||
if intf.isSending {
|
|
||||||
intf.peer.dropFromQueue(intf, seq)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (intf *link) close() {
|
func (intf *link) close() {
|
||||||
intf.Act(nil, func() { intf.msgIO.close() })
|
intf.Act(nil, func() { intf.msgIO.close() })
|
||||||
}
|
}
|
||||||
@ -397,18 +387,14 @@ func (intf *link) notifySending(size int) {
|
|||||||
intf.Act(&intf.writer, func() {
|
intf.Act(&intf.writer, func() {
|
||||||
intf.isSending = true
|
intf.isSending = true
|
||||||
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
|
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
|
||||||
intf._cancelStallTimer()
|
if intf.keepAliveTimer != nil {
|
||||||
|
intf.keepAliveTimer.Stop()
|
||||||
|
intf.keepAliveTimer = nil
|
||||||
|
}
|
||||||
|
intf.peer.notifyBlocked(intf)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
|
||||||
func (intf *link) _cancelStallTimer() {
|
|
||||||
if intf.stallTimer != nil {
|
|
||||||
intf.stallTimer.Stop()
|
|
||||||
intf.stallTimer = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This gets called from a time.AfterFunc, and notifies the switch that we appear
|
// This gets called from a time.AfterFunc, and notifies the switch that we appear
|
||||||
// to have gotten blocked on a write, so the switch should start routing traffic
|
// to have gotten blocked on a write, so the switch should start routing traffic
|
||||||
// through other links, if alternatives exist
|
// through other links, if alternatives exist
|
||||||
@ -450,11 +436,13 @@ func (intf *link) _notifyIdle() {
|
|||||||
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
||||||
func (intf *link) notifyStalled() {
|
func (intf *link) notifyStalled() {
|
||||||
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
||||||
if intf.stallTimer != nil && !intf.blocked {
|
if intf.stallTimer != nil {
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
intf.blocked = true
|
if !intf.blocked {
|
||||||
intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
|
intf.blocked = true
|
||||||
|
intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -489,9 +477,9 @@ func (intf *link) notifyRead(size int) {
|
|||||||
// We need to send keep-alive traffic now
|
// We need to send keep-alive traffic now
|
||||||
func (intf *link) notifyDoKeepAlive() {
|
func (intf *link) notifyDoKeepAlive() {
|
||||||
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
||||||
if intf.stallTimer != nil {
|
if intf.keepAliveTimer != nil {
|
||||||
intf.stallTimer.Stop()
|
intf.keepAliveTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.keepAliveTimer = nil
|
||||||
intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic
|
intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -266,23 +266,20 @@ func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) {
|
|||||||
|
|
||||||
func (p *peer) _sendPacket(packet []byte) {
|
func (p *peer) _sendPacket(packet []byte) {
|
||||||
p.queue.push(packet)
|
p.queue.push(packet)
|
||||||
switch {
|
if p.idle {
|
||||||
case p.idle:
|
|
||||||
p.idle = false
|
p.idle = false
|
||||||
p._handleIdle()
|
p._handleIdle()
|
||||||
case p.drop:
|
} else if p.drop {
|
||||||
for p.queue.size > p.max {
|
for p.queue.size > p.max {
|
||||||
p.queue.drop()
|
p.queue.drop()
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
p.intf.notifyQueued(p.seq)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) _handleIdle() {
|
func (p *peer) _handleIdle() {
|
||||||
var packets [][]byte
|
var packets [][]byte
|
||||||
var size uint64
|
var size uint64
|
||||||
for size < streamMsgSize {
|
for {
|
||||||
if packet, success := p.queue.pop(); success {
|
if packet, success := p.queue.pop(); success {
|
||||||
packets = append(packets, packet)
|
packets = append(packets, packet)
|
||||||
size += uint64(len(packet))
|
size += uint64(len(packet))
|
||||||
@ -297,16 +294,17 @@ func (p *peer) _handleIdle() {
|
|||||||
p.max = p.queue.size
|
p.max = p.queue.size
|
||||||
} else {
|
} else {
|
||||||
p.idle = true
|
p.idle = true
|
||||||
p.drop = false
|
|
||||||
}
|
}
|
||||||
|
p.drop = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
|
func (p *peer) notifyBlocked(from phony.Actor) {
|
||||||
p.Act(from, func() {
|
p.Act(from, func() {
|
||||||
|
seq := p.seq
|
||||||
p.Act(nil, func() {
|
p.Act(nil, func() {
|
||||||
if seq == p.seq {
|
if seq == p.seq {
|
||||||
p.drop = true
|
p.drop = true
|
||||||
p.max = p.queue.size + streamMsgSize
|
p.max = 2*p.queue.size + streamMsgSize
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -270,14 +270,14 @@ func (intf *routerInterface) out(bss [][]byte) {
|
|||||||
intf.router._handlePacket(bs)
|
intf.router._handlePacket(bs)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
//intf.router.peer.Act(nil, intf.router.peer._handleIdle)
|
// This should now immediately make the peer idle again
|
||||||
|
// So the self-peer shouldn't end up buffering anything
|
||||||
|
// We let backpressure act as a throttle instead
|
||||||
intf.router.peer._handleIdle()
|
intf.router.peer._handleIdle()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (intf *routerInterface) linkOut(_ []byte) {}
|
func (intf *routerInterface) linkOut(_ []byte) {}
|
||||||
|
|
||||||
func (intf *routerInterface) notifyQueued(seq uint64) {}
|
|
||||||
|
|
||||||
func (intf *routerInterface) close() {}
|
func (intf *routerInterface) close() {}
|
||||||
|
|
||||||
func (intf *routerInterface) name() string { return "(self)" }
|
func (intf *routerInterface) name() string { return "(self)" }
|
||||||
|
Loading…
Reference in New Issue
Block a user