Merge pull request #146 from Arceliar/backpressure

Local backpressure improvements
This commit is contained in:
Neil Alexander 2018-06-28 23:04:03 +01:00 committed by GitHub
commit 1a0771b016
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 283 additions and 111 deletions

View File

@ -51,12 +51,12 @@ ip netns exec node4 ip link set lo up
ip netns exec node5 ip link set lo up ip netns exec node5 ip link set lo up
ip netns exec node6 ip link set lo up ip netns exec node6 ip link set lo up
ip netns exec node1 ./run --autoconf --pprof &> /dev/null & ip netns exec node1 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node2 ./run --autoconf --pprof &> /dev/null & ip netns exec node2 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node3 ./run --autoconf --pprof &> /dev/null & ip netns exec node3 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node4 ./run --autoconf --pprof &> /dev/null & ip netns exec node4 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node5 ./run --autoconf --pprof &> /dev/null & ip netns exec node5 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
ip netns exec node6 ./run --autoconf --pprof &> /dev/null & ip netns exec node6 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
echo "Started, to continue you should (possibly w/ sudo):" echo "Started, to continue you should (possibly w/ sudo):"
echo "kill" $(jobs -p) echo "kill" $(jobs -p)

View File

@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err return err
} }
if err := c.switchTable.start(); err != nil {
c.log.Println("Failed to start switch")
return err
}
if err := c.router.start(); err != nil { if err := c.router.start(); err != nil {
c.log.Println("Failed to start router") c.log.Println("Failed to start router")
return err return err

View File

@ -49,6 +49,7 @@ func (c *Core) Init() {
bpub, bpriv := newBoxKeys() bpub, bpriv := newBoxKeys()
spub, spriv := newSigKeys() spub, spriv := newSigKeys()
c.init(bpub, bpriv, spub, spriv) c.init(bpub, bpriv, spub, spriv)
c.switchTable.start()
c.router.start() c.router.start()
} }
@ -140,7 +141,42 @@ func (l *switchLocator) DEBUG_getCoords() []byte {
} }
func (c *Core) DEBUG_switchLookup(dest []byte) switchPort { func (c *Core) DEBUG_switchLookup(dest []byte) switchPort {
return c.switchTable.lookup(dest) return c.switchTable.DEBUG_lookup(dest)
}
// This does the switch layer lookups that decide how to route traffic.
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree.
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over).
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
func (t *switchTable) DEBUG_lookup(dest []byte) switchPort {
table := t.getTable()
myDist := table.self.dist(dest)
if myDist == 0 {
return 0
}
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
ports := t.core.peers.getPorts()
var best switchPort
bestCost := int64(^uint64(0) >> 1)
for _, info := range table.elems {
dist := info.locator.dist(dest)
if !(dist < myDist) {
continue
}
//p, isIn := ports[info.port]
_, isIn := ports[info.port]
if !isIn {
continue
}
cost := int64(dist) // + p.getQueueSize()
if cost < bestCost {
best = info.port
bestCost = cost
}
}
return best
} }
/* /*
@ -480,13 +516,17 @@ func DEBUG_simLinkPeers(p, q *peer) {
} }
}() }()
p.out = func(bs []byte) { p.out = func(bs []byte) {
p.core.switchTable.idleIn <- p.port
go q.handlePacket(bs) go q.handlePacket(bs)
} }
q.out = func(bs []byte) { q.out = func(bs []byte) {
q.core.switchTable.idleIn <- q.port
go p.handlePacket(bs) go p.handlePacket(bs)
} }
go p.linkLoop() go p.linkLoop()
go q.linkLoop() go q.linkLoop()
p.core.switchTable.idleIn <- p.port
q.core.switchTable.idleIn <- q.port
} }
func (c *Core) DEBUG_simFixMTU() { func (c *Core) DEBUG_simFixMTU() {

View File

@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports) ps.ports.Store(ports)
} }
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure. // Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
queueSize int64 // used to track local backpressure
bytesSent uint64 // To track bandwidth usage for getPeers bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element
@ -94,16 +93,6 @@ type peer struct {
close func() // Called when a peer is removed, to close the underlying connection, or via admin api close func() // Called when a peer is removed, to close the underlying connection, or via admin api
} }
// Size of the queue of packets to be sent to the node.
func (p *peer) getQueueSize() int64 {
return atomic.LoadInt64(&p.queueSize)
}
// Used to increment or decrement the queue.
func (p *peer) updateQueueSize(delta int64) {
atomic.AddInt64(&p.queueSize, delta)
}
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer { func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer {
now := time.Now() now := time.Now()
@ -229,19 +218,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
// Drop traffic until the peer manages to send us at least one good switchMsg // Drop traffic until the peer manages to send us at least one good switchMsg
return return
} }
coords, coordLen := wire_decode_coords(packet[pTypeLen:]) p.core.switchTable.packetIn <- packet
if coordLen >= len(packet) {
return
} // No payload
toPort := p.core.switchTable.lookup(coords)
if toPort == p.port {
return
}
to := p.core.peers.getPorts()[toPort]
if to == nil {
return
}
to.sendPacket(packet)
} }
// This just calls p.out(packet) for now. // This just calls p.out(packet) for now.

View File

@ -12,7 +12,6 @@ package yggdrasil
// A little annoying to do with constant changes from backpressure // A little annoying to do with constant changes from backpressure
import ( import (
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -139,7 +138,7 @@ type tableElem struct {
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). // This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
type lookupTable struct { type lookupTable struct {
self switchLocator self switchLocator
elems []tableElem elems map[switchPort]tableElem
} }
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
@ -155,15 +154,17 @@ type switchData struct {
// All the information stored by the switch. // All the information stored by the switch.
type switchTable struct { type switchTable struct {
core *Core core *Core
key sigPubKey // Our own key key sigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated time time.Time // Time when locator.tstamp was last updated
parent switchPort // Port of whatever peer is our parent, or self if we're root parent switchPort // Port of whatever peer is our parent, or self if we're root
drop map[sigPubKey]int64 // Tstamp associated with a dropped root drop map[sigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData mutex sync.RWMutex // Lock for reads/writes of switchData
data switchData data switchData
updater atomic.Value //*sync.Once updater atomic.Value //*sync.Once
table atomic.Value //lookupTable table atomic.Value //lookupTable
packetIn chan []byte // Incoming packets for the worker to handle
idleIn chan switchPort // Incoming idle notifications from peer links
} }
// Initializes the switchTable struct. // Initializes the switchTable struct.
@ -177,6 +178,8 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{}) t.table.Store(lookupTable{})
t.drop = make(map[sigPubKey]int64) t.drop = make(map[sigPubKey]int64)
t.packetIn = make(chan []byte, 1024)
t.idleIn = make(chan switchPort, 1024)
} }
// Safely gets a copy of this node's locator. // Safely gets a copy of this node's locator.
@ -438,6 +441,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) {
return return
} }
////////////////////////////////////////////////////////////////////////////////
// The rest of these are related to the switch worker
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions.
func (t *switchTable) updateTable() { func (t *switchTable) updateTable() {
// WARNING this should only be called from within t.data.updater.Do() // WARNING this should only be called from within t.data.updater.Do()
@ -452,7 +459,7 @@ func (t *switchTable) updateTable() {
defer t.mutex.RUnlock() defer t.mutex.RUnlock()
newTable := lookupTable{ newTable := lookupTable{
self: t.data.locator.clone(), self: t.data.locator.clone(),
elems: make([]tableElem, 0, len(t.data.peers)), elems: make(map[switchPort]tableElem, len(t.data.peers)),
} }
for _, pinfo := range t.data.peers { for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue } //if !pinfo.forward { continue }
@ -461,48 +468,214 @@ func (t *switchTable) updateTable() {
} }
loc := pinfo.locator.clone() loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems = append(newTable.elems, tableElem{ newTable.elems[pinfo.port] = tableElem{
locator: loc, locator: loc,
port: pinfo.port, port: pinfo.port,
}) }
} }
sort.SliceStable(newTable.elems, func(i, j int) bool {
return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
})
t.table.Store(newTable) t.table.Store(newTable)
} }
// This does the switch layer lookups that decide how to route traffic. // Returns a copy of the atomically-updated table used for switch lookups
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. func (t *switchTable) getTable() lookupTable {
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over).
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
func (t *switchTable) lookup(dest []byte) switchPort {
t.updater.Load().(*sync.Once).Do(t.updateTable) t.updater.Load().(*sync.Once).Do(t.updateTable)
table := t.table.Load().(lookupTable) return t.table.Load().(lookupTable)
}
// Starts the switch worker
func (t *switchTable) start() error {
t.core.log.Println("Starting switch")
go t.doWorker()
return nil
}
// Check if a packet should go to the self node
// This means there's no node closer to the destination than us
// This is mainly used to identify packets addressed to us, or that hit a blackhole
func (t *switchTable) selfIsClosest(dest []byte) bool {
table := t.getTable()
myDist := table.self.dist(dest) myDist := table.self.dist(dest)
if myDist == 0 { if myDist == 0 {
return 0 // Skip the iteration step if it's impossible to be closer
return true
} }
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
ports := t.core.peers.getPorts()
var best switchPort
bestCost := int64(^uint64(0) >> 1)
for _, info := range table.elems { for _, info := range table.elems {
dist := info.locator.dist(dest) dist := info.locator.dist(dest)
if !(dist < myDist) { if dist < myDist {
continue return false
} }
p, isIn := ports[info.port] }
if !isIn { return true
continue }
}
cost := int64(dist) + p.getQueueSize() // Returns true if the peer is closer to the destination than ourself
if cost < bestCost { func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
best = info.port table := t.getTable()
bestCost = cost if info, isIn := table.elems[port]; isIn {
theirDist := info.locator.dist(dest)
myDist := table.self.dist(dest)
return theirDist < myDist
} else {
return false
}
}
// Get the coords of a packet without decoding
func switch_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Returns a unique string for each stream of traffic
// Equal to type+coords+handle for traffic packets
// Equal to type+coords+toKey+fromKey for protocol traffic packets
func switch_getPacketStreamID(packet []byte) string {
pType, pTypeLen := wire_decode_uint64(packet)
_, coordLen := wire_decode_coords(packet[pTypeLen:])
end := pTypeLen + coordLen
switch {
case pType == wire_Traffic:
end += handleLen // handle
case pType == wire_ProtocolTraffic:
end += 2 * boxPubKeyLen
default:
end = 0
}
if end > len(packet) {
end = len(packet)
}
return string(packet[:end])
}
// Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet)
ports := t.core.peers.getPorts()
if t.selfIsClosest(coords) {
// TODO? call the router directly, and remove the whole concept of a self peer?
ports[0].sendPacket(packet)
return true
}
table := t.getTable()
myDist := table.self.dist(coords)
var best *peer
bestDist := myDist
for port := range idle {
if to := ports[port]; to != nil {
if info, isIn := table.elems[to.port]; isIn {
dist := info.locator.dist(coords)
if !(dist < bestDist) {
continue
}
best = to
bestDist = dist
}
}
}
if best != nil {
// Send to the best idle next hop
delete(idle, best.port)
best.sendPacket(packet)
return true
} else {
// Didn't find anyone idle to send it to
return false
}
}
// Info about a buffered packet
type switch_packetInfo struct {
bytes []byte
time time.Time // Timestamp of when the packet arrived
}
// Used to keep track of buffered packets
type switch_buffer struct {
packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
count uint64 // Total queue size, including dropped packets
}
func (b *switch_buffer) dropTimedOut() {
// TODO figure out what timeout makes sense
const timeout = 25 * time.Millisecond
now := time.Now()
for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout {
util_putBytes(b.packets[0].bytes)
b.packets = b.packets[1:]
}
}
// Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer) bool {
to := t.core.peers.getPorts()[port]
if to == nil {
return true
}
var best string
var bestSize uint64
for streamID, buf := range buffs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
buf.dropTimedOut()
if len(buf.packets) == 0 {
delete(buffs, streamID)
continue
}
buffs[streamID] = buf
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) {
best = streamID
bestSize = buf.count
}
}
if bestSize != 0 {
buf := buffs[best]
var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.count--
if len(buf.packets) == 0 {
delete(buffs, best)
} else {
buffs[best] = buf
}
to.sendPacket(packet.bytes)
return true
} else {
return false
}
}
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
buffs := make(map[string]switch_buffer) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things
for {
select {
case packet := <-t.packetIn:
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t.handleIn(packet, idle) {
// There's nobody free to take it right now, so queue it for later
streamID := switch_getPacketStreamID(packet)
buf := buffs[streamID]
buf.dropTimedOut()
pinfo := switch_packetInfo{packet, time.Now()}
buf.packets = append(buf.packets, pinfo)
buf.count++
buffs[streamID] = buf
}
case port := <-t.idleIn:
// Try to find something to send to this peer
if !t.handleIdle(port, buffs) {
// Didn't find anything ready to send yet, so stay idle
idle[port] = struct{}{}
}
} }
} }
return best
} }

View File

@ -242,19 +242,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
in := func(bs []byte) { in := func(bs []byte) {
p.handlePacket(bs) p.handlePacket(bs)
} }
out := make(chan []byte, 32) // TODO? what size makes sense out := make(chan []byte, 1)
defer close(out) defer close(out)
go func() { go func() {
var shadow int64 // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
var stack [][]byte
put := func(msg []byte) {
stack = append(stack, msg)
for len(stack) > 32 {
util_putBytes(stack[0])
stack = stack[1:]
shadow++
}
}
send := func(msg []byte) { send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg))) msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg} buf := net.Buffers{tcp_msg[:], msgLen, msg}
@ -266,10 +257,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
timer := time.NewTimer(timerInterval) timer := time.NewTimer(timerInterval)
defer timer.Stop() defer timer.Stop()
for { for {
if shadow != 0 { select {
p.updateQueueSize(-shadow) case msg := <-p.linkOut:
shadow = 0 // Always send outgoing link traffic first, if needed
send(msg)
continue
default:
} }
// Otherwise wait reset the timer and wait for something to do
timer.Stop() timer.Stop()
select { select {
case <-timer.C: case <-timer.C:
@ -285,34 +280,16 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
if !ok { if !ok {
return return
} }
put(msg) send(msg) // Block until the socket write has finished
} // Now inform the switch that we're ready for more traffic
for len(stack) > 0 { p.core.switchTable.idleIn <- p.port
select {
case msg := <-p.linkOut:
send(msg)
case msg, ok := <-out:
if !ok {
return
}
put(msg)
default:
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
send(msg)
p.updateQueueSize(-1)
}
} }
} }
}() }()
p.core.switchTable.idleIn <- p.port // Start in the idle state
p.out = func(msg []byte) { p.out = func(msg []byte) {
defer func() { recover() }() defer func() { recover() }()
select { out <- msg
case out <- msg:
p.updateQueueSize(1)
default:
util_putBytes(msg)
}
} }
p.close = func() { sock.Close() } p.close = func() { sock.Close() }
setNoDelay(sock, true) setNoDelay(sock, true)