more backpressure work, still needs testing

This commit is contained in:
Arceliar 2018-06-23 21:51:32 -05:00
parent 52a0027aea
commit 4b83efa218
4 changed files with 52 additions and 66 deletions

View File

@ -481,13 +481,17 @@ func DEBUG_simLinkPeers(p, q *peer) {
}
}()
p.out = func(bs []byte) {
p.core.switchTable.idleIn <- p.port
go q.handlePacket(bs)
}
q.out = func(bs []byte) {
q.core.switchTable.idleIn <- q.port
go p.handlePacket(bs)
}
go p.linkLoop()
go q.linkLoop()
p.core.switchTable.idleIn <- p.port
q.core.switchTable.idleIn <- q.port
}
func (c *Core) DEBUG_simFixMTU() {

View File

@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure.
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct {
queueSize int64 // used to track local backpressure
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
@ -94,16 +93,6 @@ type peer struct {
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
}
// Size of the queue of packets to be sent to the node.
func (p *peer) getQueueSize() int64 {
return atomic.LoadInt64(&p.queueSize)
}
// Used to increment or decrement the queue.
func (p *peer) updateQueueSize(delta int64) {
atomic.AddInt64(&p.queueSize, delta)
}
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer {
now := time.Now()

View File

@ -503,11 +503,12 @@ func (t *switchTable) lookup(dest []byte) switchPort {
if !(dist < myDist) {
continue
}
p, isIn := ports[info.port]
//p, isIn := ports[info.port]
_, isIn := ports[info.port]
if !isIn {
continue
}
cost := int64(dist) + p.getQueueSize()
cost := int64(dist) // + p.getQueueSize()
if cost < bestCost {
best = info.port
bestCost = cost
@ -573,6 +574,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
// Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
// Get the coords, skipping the first byte (the pType)
_, pTypeLen := wire_decode_uint64(packet)
@ -599,6 +601,27 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool
return false
}
// Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool {
to := t.core.peers.getPorts()[port]
if to == nil {
return true
}
for idx := len(*packets) - 1; idx >= 0; idx-- {
packet := (*packets)[idx]
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
if t.portIsCloser(coords, port) {
to.sendPacket(packet)
*packets = append((*packets)[:idx], (*packets)[idx+1:]...)
return true
}
}
return false
}
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
var packets [][]byte // Should really be a linked list
@ -606,13 +629,9 @@ func (t *switchTable) doWorker() {
for {
select {
case packet := <-t.packetIn:
idle = make(map[switchPort]struct{})
for port := range t.getTable().elems {
idle[port] = struct{}{}
}
// TODO correcty fill idle, so the above can be removed
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t.handleIn(packet, idle) {
// There's nobody free to take it now, so queue it
// There's nobody free to take it right now, so queue it for later
packets = append(packets, packet)
for len(packets) > 32 {
util_putBytes(packets[0])
@ -620,9 +639,11 @@ func (t *switchTable) doWorker() {
}
}
case port := <-t.idleIn:
// TODO the part that loops over packets and finds something to send
// Didn't find anything to send, so add this port to the idle list
idle[port] = struct{}{}
// Try to find something to send to this peer
if !t.handleIdle(port, &packets) {
// Didn't find anything ready to send yet, so stay idle
idle[port] = struct{}{}
}
}
}
}

View File

@ -19,7 +19,6 @@ import (
"fmt"
"math/rand"
"net"
"sort"
"sync"
"sync/atomic"
"time"
@ -243,26 +242,15 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
in := func(bs []byte) {
p.handlePacket(bs)
}
out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack
out := make(chan []byte, 1)
defer close(out)
go func() {
var shadow int64
var stack [][]byte
put := func(msg []byte) {
stack = append(stack, msg)
sort.SliceStable(stack, func(i, j int) bool {
// Sort in reverse order, with smallest messages at the end
return len(stack[i]) >= len(stack[j])
})
for len(stack) > 32 {
util_putBytes(stack[0])
stack = stack[1:]
shadow++
}
}
// This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
send := make(chan []byte)
defer close(send)
go func() {
// This goroutine does the actual socket write operations
// The parent goroutine aggregates things for it and feeds them in
for msg := range send {
msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg}
@ -275,10 +263,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
timer := time.NewTimer(timerInterval)
defer timer.Stop()
for {
if shadow != 0 {
p.updateQueueSize(-shadow)
shadow = 0
select {
case msg := <-p.linkOut:
// Always send outgoing link traffic first, if needed
send <- msg
continue
default:
}
// Otherwise wait reset the timer and wait for something to do
timer.Stop()
select {
case <-timer.C:
@ -294,34 +286,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
if !ok {
return
}
put(msg)
}
for len(stack) > 0 {
// First make sure linkOut gets sent first, if it's non-empty
select {
case msg := <-p.linkOut:
send <- msg
continue
default:
}
// Then block until we send or receive something
select {
case msg := <-p.linkOut:
send <- msg
case msg, ok := <-out:
if !ok {
return
}
put(msg)
case send <- stack[len(stack)-1]:
stack = stack[:len(stack)-1]
p.updateQueueSize(-1)
}
send <- msg // Block until the socket writer has the packet
// Now inform the switch that we're ready for more traffic
p.core.switchTable.idleIn <- p.port
}
}
}()
p.core.switchTable.idleIn <- p.port // Start in the idle state
p.out = func(msg []byte) {
p.updateQueueSize(1)
defer func() { recover() }()
out <- msg
}