From 0021f3463ff62522523d9afb6d5d62d5236564d4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 20:39:57 -0500 Subject: [PATCH 01/13] slightly better way for the tcp sender goroutine(s) to block waiting for work --- src/yggdrasil/tcp.go | 45 ++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index db57b568..32cfbf37 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 32) // TODO? what size makes sense + out := make(chan []byte, 1) defer close(out) go func() { var shadow int64 @@ -255,13 +255,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { shadow++ } } - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } + send := make(chan []byte) + defer close(send) + go func() { + for msg := range send { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } + }() timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -278,9 +282,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send(nil) // TCP keep-alive traffic + send <- nil // TCP keep-alive traffic case msg := <-p.linkOut: - send(msg) + send <- msg case msg, ok := <-out: if !ok { return @@ -288,31 +292,32 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { put(msg) } for len(stack) > 0 { + // First make sure linkOut gets sent first, if it's non-empty select { case msg := <-p.linkOut: - send(msg) + send <- msg + default: + } + // Then block until we send or receive something + select { + case msg := <-p.linkOut: + send <- msg case msg, ok := <-out: if !ok { return } put(msg) - default: - msg := stack[len(stack)-1] + case send <- stack[len(stack)-1]: stack = stack[:len(stack)-1] - send(msg) p.updateQueueSize(-1) } } } }() p.out = func(msg []byte) { + p.updateQueueSize(1) defer func() { recover() }() - select { - case out <- msg: - p.updateQueueSize(1) - default: - util_putBytes(msg) - } + out <- msg } p.close = func() { sock.Close() } setNoDelay(sock, true) From cceecf4b1aba0799ee3d1453b047d73f802b937a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 23:46:42 -0500 Subject: [PATCH 02/13] larger out queue size, make sure linkOut packets always get sent first --- src/yggdrasil/tcp.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 32cfbf37..fcefd42b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1) + out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack defer close(out) go func() { var shadow int64 @@ -296,6 +296,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: send <- msg + continue default: } // Then block until we send or receive something From 2ae213c2557b6b53cea416a07fce137b0104b097 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 01:10:18 -0500 Subject: [PATCH 03/13] I'll try sorting, that's a good trick --- src/yggdrasil/tcp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index fcefd42b..0df2d526 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "net" + "sort" "sync" "sync/atomic" "time" @@ -249,6 +250,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) + sort.SliceStable(stack, func(i, j int) bool { + // Sort in reverse order, with smallest messages at the end + return len(stack[i]) >= len(stack[j]) + }) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] From 988f4ad2654a824c87e07f8f0a10c8f19b1a708a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 19:08:32 -0500 Subject: [PATCH 04/13] add a dedicated switch worker and start using it for lookups --- src/yggdrasil/core.go | 5 ++++ src/yggdrasil/debug.go | 1 + src/yggdrasil/peer.go | 14 +---------- src/yggdrasil/switch.go | 55 ++++++++++++++++++++++++++++++++++------- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index bd7fecec..6d130d81 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } + if err := c.switchTable.start(); err != nil { + c.log.Println("Failed to start switch") + return err + } + if err := c.router.start(); err != nil { c.log.Println("Failed to start router") return err diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f95bfa3d..c029ae9f 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -49,6 +49,7 @@ func (c *Core) Init() { bpub, bpriv := newBoxKeys() spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) + c.switchTable.start() c.router.start() } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad99750a..d5bdc5a3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -229,19 +229,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { // Drop traffic until the peer manages to send us at least one good switchMsg return } - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - return - } // No payload - toPort := p.core.switchTable.lookup(coords) - if toPort == p.port { - return - } - to := p.core.peers.getPorts()[toPort] - if to == nil { - return - } - to.sendPacket(packet) + p.core.switchTable.packetIn <- packet } // This just calls p.out(packet) for now. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d3b28710..d2026aec 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -155,15 +155,16 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { - core *Core - key sigPubKey // Our own key - time time.Time // Time when locator.tstamp was last updated - parent switchPort // Port of whatever peer is our parent, or self if we're root - drop map[sigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData - data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable + core *Core + key sigPubKey // Our own key + time time.Time // Time when locator.tstamp was last updated + parent switchPort // Port of whatever peer is our parent, or self if we're root + drop map[sigPubKey]int64 // Tstamp associated with a dropped root + mutex sync.RWMutex // Lock for reads/writes of switchData + data switchData + updater atomic.Value //*sync.Once + table atomic.Value //lookupTable + packetIn chan []byte // Incoming packets for the worker to handle } // Initializes the switchTable struct. @@ -177,6 +178,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[sigPubKey]int64) + t.packetIn = make(chan []byte, 1024) } // Safely gets a copy of this node's locator. @@ -438,6 +440,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { return } +//////////////////////////////////////////////////////////////////////////////// + +// The rest of these are related to the switch worker + // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. func (t *switchTable) updateTable() { // WARNING this should only be called from within t.data.updater.Do() @@ -506,3 +512,34 @@ func (t *switchTable) lookup(dest []byte) switchPort { } return best } + +// Starts the switch worker +func (t *switchTable) start() error { + t.core.log.Println("Starting switch") + go t.doWorker() + return nil +} + +func (t *switchTable) handleIn(packet []byte) { + // Get the coords, skipping the first byte (the pType) + _, pTypeLen := wire_decode_uint64(packet) + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { + util_putBytes(packet) + return + } // No payload + toPort := t.lookup(coords) + to := t.core.peers.getPorts()[toPort] + if to == nil { + util_putBytes(packet) + return + } + to.sendPacket(packet) +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { + for packet := range t.packetIn { + t.handleIn(packet) + } +} From 52a0027aea9932babec5beb873d061fbb6489174 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 20:59:26 -0500 Subject: [PATCH 05/13] switch refactoring, setup for a better approximation of local backpressure --- src/yggdrasil/switch.go | 119 ++++++++++++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d2026aec..f74e1c5e 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,7 +12,6 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - "sort" "sync" "sync/atomic" "time" @@ -139,7 +138,7 @@ type tableElem struct { // This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). type lookupTable struct { self switchLocator - elems []tableElem + elems map[switchPort]tableElem } // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. @@ -162,9 +161,10 @@ type switchTable struct { drop map[sigPubKey]int64 // Tstamp associated with a dropped root mutex sync.RWMutex // Lock for reads/writes of switchData data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable - packetIn chan []byte // Incoming packets for the worker to handle + updater atomic.Value //*sync.Once + table atomic.Value //lookupTable + packetIn chan []byte // Incoming packets for the worker to handle + idleIn chan switchPort // Incoming idle notifications from peer links } // Initializes the switchTable struct. @@ -179,6 +179,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.table.Store(lookupTable{}) t.drop = make(map[sigPubKey]int64) t.packetIn = make(chan []byte, 1024) + t.idleIn = make(chan switchPort, 1024) } // Safely gets a copy of this node's locator. @@ -458,7 +459,7 @@ func (t *switchTable) updateTable() { defer t.mutex.RUnlock() newTable := lookupTable{ self: t.data.locator.clone(), - elems: make([]tableElem, 0, len(t.data.peers)), + elems: make(map[switchPort]tableElem, len(t.data.peers)), } for _, pinfo := range t.data.peers { //if !pinfo.forward { continue } @@ -467,17 +468,20 @@ func (t *switchTable) updateTable() { } loc := pinfo.locator.clone() loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link - newTable.elems = append(newTable.elems, tableElem{ + newTable.elems[pinfo.port] = tableElem{ locator: loc, port: pinfo.port, - }) + } } - sort.SliceStable(newTable.elems, func(i, j int) bool { - return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen) - }) t.table.Store(newTable) } +// Returns a copy of the atomically-updated table used for switch lookups +func (t *switchTable) getTable() lookupTable { + t.updater.Load().(*sync.Once).Do(t.updateTable) + return t.table.Load().(lookupTable) +} + // This does the switch layer lookups that decide how to route traffic. // Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. // Traffic must be routed to a node that is closer to the destination via the metric space distance. @@ -485,8 +489,7 @@ func (t *switchTable) updateTable() { // The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. // Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. func (t *switchTable) lookup(dest []byte) switchPort { - t.updater.Load().(*sync.Once).Do(t.updateTable) - table := t.table.Load().(lookupTable) + table := t.getTable() myDist := table.self.dist(dest) if myDist == 0 { return 0 @@ -520,7 +523,7 @@ func (t *switchTable) start() error { return nil } -func (t *switchTable) handleIn(packet []byte) { +func (t *switchTable) handleIn_old(packet []byte) { // Get the coords, skipping the first byte (the pType) _, pTypeLen := wire_decode_uint64(packet) coords, coordLen := wire_decode_coords(packet[pTypeLen:]) @@ -537,9 +540,89 @@ func (t *switchTable) handleIn(packet []byte) { to.sendPacket(packet) } -// The switch worker does routing lookups and sends packets to where they need to be -func (t *switchTable) doWorker() { - for packet := range t.packetIn { - t.handleIn(packet) +// Check if a packet should go to the self node +// This means there's no node closer to the destination than us +// This is mainly used to identify packets addressed to us, or that hit a blackhole +func (t *switchTable) selfIsClosest(dest []byte) bool { + table := t.getTable() + myDist := table.self.dist(dest) + if myDist == 0 { + // Skip the iteration step if it's impossible to be closer + return true + } + for _, info := range table.elems { + dist := info.locator.dist(dest) + if dist < myDist { + return false + } + } + return true +} + +// Returns true if the peer is closer to the destination than ourself +func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { + table := t.getTable() + if info, isIn := table.elems[port]; isIn { + theirDist := info.locator.dist(dest) + myDist := table.self.dist(dest) + return theirDist < myDist + } else { + return false + } +} + +// Handle an incoming packet +// Either send it to ourself, or to the first idle peer that's free +func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { + // Get the coords, skipping the first byte (the pType) + _, pTypeLen := wire_decode_uint64(packet) + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { + util_putBytes(packet) + return true + } // No payload + ports := t.core.peers.getPorts() + if t.selfIsClosest(coords) { + ports[0].sendPacket(packet) + return true + } + for port := range idle { + if to := ports[port]; to != nil { + if t.portIsCloser(coords, port) { + delete(idle, port) + to.sendPacket(packet) + return true + } + } + } + // Didn't find anyone idle to send it to + return false +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { + var packets [][]byte // Should really be a linked list + idle := make(map[switchPort]struct{}) // this is to deduplicate things + for { + select { + case packet := <-t.packetIn: + idle = make(map[switchPort]struct{}) + for port := range t.getTable().elems { + idle[port] = struct{}{} + } + // TODO correcty fill idle, so the above can be removed + if !t.handleIn(packet, idle) { + // There's nobody free to take it now, so queue it + packets = append(packets, packet) + for len(packets) > 32 { + util_putBytes(packets[0]) + packets = packets[1:] + } + } + case port := <-t.idleIn: + // TODO the part that loops over packets and finds something to send + // Didn't find anything to send, so add this port to the idle list + idle[port] = struct{}{} + } } } From 4b83efa218be2735f0a2e4d3090eeed4af4040b4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 21:51:32 -0500 Subject: [PATCH 06/13] more backpressure work, still needs testing --- src/yggdrasil/debug.go | 4 +++ src/yggdrasil/peer.go | 13 +-------- src/yggdrasil/switch.go | 43 ++++++++++++++++++++++-------- src/yggdrasil/tcp.go | 58 +++++++++++------------------------------ 4 files changed, 52 insertions(+), 66 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c029ae9f..5242a6e0 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -481,13 +481,17 @@ func DEBUG_simLinkPeers(p, q *peer) { } }() p.out = func(bs []byte) { + p.core.switchTable.idleIn <- p.port go q.handlePacket(bs) } q.out = func(bs []byte) { + q.core.switchTable.idleIn <- q.port go p.handlePacket(bs) } go p.linkLoop() go q.linkLoop() + p.core.switchTable.idleIn <- p.port + q.core.switchTable.idleIn <- q.port } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d5bdc5a3..f0dd3d0a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { ps.ports.Store(ports) } -// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure. +// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { - queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element @@ -94,16 +93,6 @@ type peer struct { close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -// Size of the queue of packets to be sent to the node. -func (p *peer) getQueueSize() int64 { - return atomic.LoadInt64(&p.queueSize) -} - -// Used to increment or decrement the queue. -func (p *peer) updateQueueSize(delta int64) { - atomic.AddInt64(&p.queueSize, delta) -} - // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer { now := time.Now() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f74e1c5e..4c1f0f8d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -503,11 +503,12 @@ func (t *switchTable) lookup(dest []byte) switchPort { if !(dist < myDist) { continue } - p, isIn := ports[info.port] + //p, isIn := ports[info.port] + _, isIn := ports[info.port] if !isIn { continue } - cost := int64(dist) + p.getQueueSize() + cost := int64(dist) // + p.getQueueSize() if cost < bestCost { best = info.port bestCost = cost @@ -573,6 +574,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free +// Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { // Get the coords, skipping the first byte (the pType) _, pTypeLen := wire_decode_uint64(packet) @@ -599,6 +601,27 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool return false } +// Handles incoming idle notifications +// Loops over packets and sends the newest one that's OK for this peer to send +// Returns true if the peer is no longer idle, false if it should be added to the idle list +func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { + to := t.core.peers.getPorts()[port] + if to == nil { + return true + } + for idx := len(*packets) - 1; idx >= 0; idx-- { + packet := (*packets)[idx] + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + if t.portIsCloser(coords, port) { + to.sendPacket(packet) + *packets = append((*packets)[:idx], (*packets)[idx+1:]...) + return true + } + } + return false +} + // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { var packets [][]byte // Should really be a linked list @@ -606,13 +629,9 @@ func (t *switchTable) doWorker() { for { select { case packet := <-t.packetIn: - idle = make(map[switchPort]struct{}) - for port := range t.getTable().elems { - idle[port] = struct{}{} - } - // TODO correcty fill idle, so the above can be removed + // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { - // There's nobody free to take it now, so queue it + // There's nobody free to take it right now, so queue it for later packets = append(packets, packet) for len(packets) > 32 { util_putBytes(packets[0]) @@ -620,9 +639,11 @@ func (t *switchTable) doWorker() { } } case port := <-t.idleIn: - // TODO the part that loops over packets and finds something to send - // Didn't find anything to send, so add this port to the idle list - idle[port] = struct{}{} + // Try to find something to send to this peer + if !t.handleIdle(port, &packets) { + // Didn't find anything ready to send yet, so stay idle + idle[port] = struct{}{} + } } } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 0df2d526..12147d4e 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,7 +19,6 @@ import ( "fmt" "math/rand" "net" - "sort" "sync" "sync/atomic" "time" @@ -243,26 +242,15 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack + out := make(chan []byte, 1) defer close(out) go func() { - var shadow int64 - var stack [][]byte - put := func(msg []byte) { - stack = append(stack, msg) - sort.SliceStable(stack, func(i, j int) bool { - // Sort in reverse order, with smallest messages at the end - return len(stack[i]) >= len(stack[j]) - }) - for len(stack) > 32 { - util_putBytes(stack[0]) - stack = stack[1:] - shadow++ - } - } + // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic send := make(chan []byte) defer close(send) go func() { + // This goroutine does the actual socket write operations + // The parent goroutine aggregates things for it and feeds them in for msg := range send { msgLen := wire_encode_uint64(uint64(len(msg))) buf := net.Buffers{tcp_msg[:], msgLen, msg} @@ -275,10 +263,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer := time.NewTimer(timerInterval) defer timer.Stop() for { - if shadow != 0 { - p.updateQueueSize(-shadow) - shadow = 0 + select { + case msg := <-p.linkOut: + // Always send outgoing link traffic first, if needed + send <- msg + continue + default: } + // Otherwise wait reset the timer and wait for something to do timer.Stop() select { case <-timer.C: @@ -294,34 +286,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { if !ok { return } - put(msg) - } - for len(stack) > 0 { - // First make sure linkOut gets sent first, if it's non-empty - select { - case msg := <-p.linkOut: - send <- msg - continue - default: - } - // Then block until we send or receive something - select { - case msg := <-p.linkOut: - send <- msg - case msg, ok := <-out: - if !ok { - return - } - put(msg) - case send <- stack[len(stack)-1]: - stack = stack[:len(stack)-1] - p.updateQueueSize(-1) - } + send <- msg // Block until the socket writer has the packet + // Now inform the switch that we're ready for more traffic + p.core.switchTable.idleIn <- p.port } } }() + p.core.switchTable.idleIn <- p.port // Start in the idle state p.out = func(msg []byte) { - p.updateQueueSize(1) defer func() { recover() }() out <- msg } From 0ad801bcfe50fbbd889692fe45ff9f887e5798c0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 23:33:03 -0500 Subject: [PATCH 07/13] more work on backpressure, but still needs more testing --- src/yggdrasil/switch.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4c1f0f8d..8fedebe1 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -588,17 +588,31 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool ports[0].sendPacket(packet) return true } + table := t.getTable() + myDist := table.self.dist(coords) + var best *peer + bestDist := myDist for port := range idle { if to := ports[port]; to != nil { - if t.portIsCloser(coords, port) { - delete(idle, port) - to.sendPacket(packet) - return true + if info, isIn := table.elems[to.port]; isIn { + dist := info.locator.dist(coords) + if !(dist < bestDist) { + continue + } + best = to + bestDist = dist } } } - // Didn't find anyone idle to send it to - return false + if best != nil { + // Send to the best idle next hop + delete(idle, best.port) + best.sendPacket(packet) + return true + } else { + // Didn't find anyone idle to send it to + return false + } } // Handles incoming idle notifications From 189628b381809f7b41d5eb5874c2e2383b22f31a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 23:55:27 -0500 Subject: [PATCH 08/13] cleanup --- src/yggdrasil/debug.go | 37 +++++++++++++++++++++++++++- src/yggdrasil/switch.go | 53 +---------------------------------------- 2 files changed, 37 insertions(+), 53 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 5242a6e0..564d59cb 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -141,7 +141,42 @@ func (l *switchLocator) DEBUG_getCoords() []byte { } func (c *Core) DEBUG_switchLookup(dest []byte) switchPort { - return c.switchTable.lookup(dest) + return c.switchTable.DEBUG_lookup(dest) +} + +// This does the switch layer lookups that decide how to route traffic. +// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. +// Traffic must be routed to a node that is closer to the destination via the metric space distance. +// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over). +// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. +// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. +func (t *switchTable) DEBUG_lookup(dest []byte) switchPort { + table := t.getTable() + myDist := table.self.dist(dest) + if myDist == 0 { + return 0 + } + // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow + ports := t.core.peers.getPorts() + var best switchPort + bestCost := int64(^uint64(0) >> 1) + for _, info := range table.elems { + dist := info.locator.dist(dest) + if !(dist < myDist) { + continue + } + //p, isIn := ports[info.port] + _, isIn := ports[info.port] + if !isIn { + continue + } + cost := int64(dist) // + p.getQueueSize() + if cost < bestCost { + best = info.port + bestCost = cost + } + } + return best } /* diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 8fedebe1..65edf62d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -482,41 +482,6 @@ func (t *switchTable) getTable() lookupTable { return t.table.Load().(lookupTable) } -// This does the switch layer lookups that decide how to route traffic. -// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. -// Traffic must be routed to a node that is closer to the destination via the metric space distance. -// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over). -// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. -// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. -func (t *switchTable) lookup(dest []byte) switchPort { - table := t.getTable() - myDist := table.self.dist(dest) - if myDist == 0 { - return 0 - } - // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow - ports := t.core.peers.getPorts() - var best switchPort - bestCost := int64(^uint64(0) >> 1) - for _, info := range table.elems { - dist := info.locator.dist(dest) - if !(dist < myDist) { - continue - } - //p, isIn := ports[info.port] - _, isIn := ports[info.port] - if !isIn { - continue - } - cost := int64(dist) // + p.getQueueSize() - if cost < bestCost { - best = info.port - bestCost = cost - } - } - return best -} - // Starts the switch worker func (t *switchTable) start() error { t.core.log.Println("Starting switch") @@ -524,23 +489,6 @@ func (t *switchTable) start() error { return nil } -func (t *switchTable) handleIn_old(packet []byte) { - // Get the coords, skipping the first byte (the pType) - _, pTypeLen := wire_decode_uint64(packet) - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - util_putBytes(packet) - return - } // No payload - toPort := t.lookup(coords) - to := t.core.peers.getPorts()[toPort] - if to == nil { - util_putBytes(packet) - return - } - to.sendPacket(packet) -} - // Check if a packet should go to the self node // This means there's no node closer to the destination than us // This is mainly used to identify packets addressed to us, or that hit a blackhole @@ -585,6 +533,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } // No payload ports := t.core.peers.getPorts() if t.selfIsClosest(coords) { + // TODO? call the router directly, and remove the whole concept of a self peer? ports[0].sendPacket(packet) return true } From 9c028e1d0d58bce43238ebddf34cf011c32de0d0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 17:39:43 -0500 Subject: [PATCH 09/13] switch to a separate queue per stream of traffic, FIXME for some reason this makes distance calculations more expensive in handleIdle? --- misc/run-schannel-netns | 12 +++--- src/yggdrasil/switch.go | 83 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/misc/run-schannel-netns b/misc/run-schannel-netns index 35c197c0..9723e73b 100755 --- a/misc/run-schannel-netns +++ b/misc/run-schannel-netns @@ -51,12 +51,12 @@ ip netns exec node4 ip link set lo up ip netns exec node5 ip link set lo up ip netns exec node6 ip link set lo up -ip netns exec node1 ./run --autoconf --pprof &> /dev/null & -ip netns exec node2 ./run --autoconf --pprof &> /dev/null & -ip netns exec node3 ./run --autoconf --pprof &> /dev/null & -ip netns exec node4 ./run --autoconf --pprof &> /dev/null & -ip netns exec node5 ./run --autoconf --pprof &> /dev/null & -ip netns exec node6 ./run --autoconf --pprof &> /dev/null & +ip netns exec node1 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node2 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node3 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node4 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node5 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node6 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & echo "Started, to continue you should (possibly w/ sudo):" echo "kill" $(jobs -p) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 65edf62d..4b3c2118 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -520,17 +520,39 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { } } +// Get the coords of a packet without decoding +func switch_getPacketCoords(packet []byte) []byte { + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + return coords +} + +// Returns a unique string for each stream of traffic +// Equal to type+coords+handle for traffic packets +// Equal to type+coords+toKey+fromKey for protocol traffic packets +func switch_getPacketStreamID(packet []byte) string { + pType, pTypeLen := wire_decode_uint64(packet) + _, coordLen := wire_decode_coords(packet[pTypeLen:]) + end := pTypeLen + coordLen + switch { + case pType == wire_Traffic: + end += handleLen // handle + case pType == wire_ProtocolTraffic: + end += 2 * boxPubKeyLen + default: + end = 0 + } + if end > len(packet) { + end = len(packet) + } + return string(packet[:end]) +} + // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { - // Get the coords, skipping the first byte (the pType) - _, pTypeLen := wire_decode_uint64(packet) - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - util_putBytes(packet) - return true - } // No payload + coords := switch_getPacketCoords(packet) ports := t.core.peers.getPorts() if t.selfIsClosest(coords) { // TODO? call the router directly, and remove the whole concept of a self peer? @@ -564,6 +586,10 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } +/* +FIXME for some reason the new version is a *lot* slower than this one was +It seems to be from the switchLocator.dist(coords) calls + // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list @@ -584,10 +610,45 @@ func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { } return false } +*/ + +// Handles incoming idle notifications +// Loops over packets and sends the newest one that's OK for this peer to send +// Returns true if the peer is no longer idle, false if it should be added to the idle list +func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { + to := t.core.peers.getPorts()[port] + if to == nil { + return true + } + var best string + var bestSize int + for streamID, packets := range stacks { + // Filter over the streams that this node is closer to + packet := packets[len(packets)-1] + if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(packet, port) { + best = streamID + bestSize = len(packets) + } + } + if bestSize != 0 { + packets := stacks[best] + var packet []byte + packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + if len(packets) == 0 { + delete(stacks, best) + } else { + stacks[best] = packets + } + to.sendPacket(packet) + return true + } else { + return false + } +} // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - var packets [][]byte // Should really be a linked list + stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -595,15 +656,17 @@ func (t *switchTable) doWorker() { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later - packets = append(packets, packet) + streamID := switch_getPacketStreamID(packet) + packets := append(stacks[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } + stacks[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, &packets) { + if !t.handleIdle(port, stacks) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } From 03949dcf3f4fd431d98bc619c20bfc84ec35c53e Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 18:05:00 -0500 Subject: [PATCH 10/13] fix my terrible bug, I have no idea why the old one even worked --- src/yggdrasil/switch.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4b3c2118..e5db8ca9 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -624,8 +624,10 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo var bestSize int for streamID, packets := range stacks { // Filter over the streams that this node is closer to + // Keep the one with the smallest queue packet := packets[len(packets)-1] - if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(packet, port) { + coords := switch_getPacketCoords(packet) + if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID bestSize = len(packets) } From 4ad24465573c2e0a61c5e912d705c1e067a75421 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 18:21:00 -0500 Subject: [PATCH 11/13] cleanup --- src/yggdrasil/switch.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index e5db8ca9..ae1c391d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -586,32 +586,6 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } -/* -FIXME for some reason the new version is a *lot* slower than this one was -It seems to be from the switchLocator.dist(coords) calls - -// Handles incoming idle notifications -// Loops over packets and sends the newest one that's OK for this peer to send -// Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - for idx := len(*packets) - 1; idx >= 0; idx-- { - packet := (*packets)[idx] - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - if t.portIsCloser(coords, port) { - to.sendPacket(packet) - *packets = append((*packets)[:idx], (*packets)[idx+1:]...) - return true - } - } - return false -} -*/ - // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list From 7695a3fcbfabc56d2b70d313086c956b75a0bf03 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 20:20:07 -0500 Subject: [PATCH 12/13] try using a simpler FIFO order for each backpressure buffer, since there are other mechanisms to penalize the flooding node, leads to better TCP throughput without affecting traffic between other nodes (does affect traffic in the same session, but there's hypothetically workarounds to that) --- src/yggdrasil/switch.go | 22 +++++++++++----------- src/yggdrasil/tcp.go | 28 +++++++++++----------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ae1c391d..6c5fd21d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,17 +589,17 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string var bestSize int - for streamID, packets := range stacks { + for streamID, packets := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[len(packets)-1] + packet := packets[0] coords := switch_getPacketCoords(packet) if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID @@ -607,13 +607,13 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo } } if bestSize != 0 { - packets := stacks[best] + packets := buffs[best] var packet []byte - packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + packet, packets = packets[0], packets[1:] if len(packets) == 0 { - delete(stacks, best) + delete(buffs, best) } else { - stacks[best] = packets + buffs[best] = packets } to.sendPacket(packet) return true @@ -624,7 +624,7 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) + buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -633,16 +633,16 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(stacks[streamID], packet) + packets := append(buffs[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } - stacks[streamID] = packets + buffs[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, stacks) { + if !t.handleIdle(port, buffs) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 12147d4e..80ae2847 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -246,19 +246,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer close(out) go func() { // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := make(chan []byte) - defer close(send) - go func() { - // This goroutine does the actual socket write operations - // The parent goroutine aggregates things for it and feeds them in - for msg := range send { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } - }() + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -266,7 +260,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: // Always send outgoing link traffic first, if needed - send <- msg + send(msg) continue default: } @@ -279,14 +273,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send <- nil // TCP keep-alive traffic + send(nil) // TCP keep-alive traffic case msg := <-p.linkOut: - send <- msg + send(msg) case msg, ok := <-out: if !ok { return } - send <- msg // Block until the socket writer has the packet + send(msg) // Block until the socket write has finished // Now inform the switch that we're ready for more traffic p.core.switchTable.idleIn <- p.port } From b63b534fa787a69779fdce1aa4628b6563382401 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 25 Jun 2018 18:12:18 -0500 Subject: [PATCH 13/13] drop packets that have been queued for longer than some timeout (currently 25ms) instead of using fixed length queues --- src/yggdrasil/switch.go | 72 +++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6c5fd21d..5b72620c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -586,36 +586,66 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } +// Info about a buffered packet +type switch_packetInfo struct { + bytes []byte + time time.Time // Timestamp of when the packet arrived +} + +// Used to keep track of buffered packets +type switch_buffer struct { + packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large + count uint64 // Total queue size, including dropped packets +} + +func (b *switch_buffer) dropTimedOut() { + // TODO figure out what timeout makes sense + const timeout = 25 * time.Millisecond + now := time.Now() + for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { + util_putBytes(b.packets[0].bytes) + b.packets = b.packets[1:] + } +} + // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string - var bestSize int - for streamID, packets := range buffs { + var bestSize uint64 + for streamID, buf := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[0] - coords := switch_getPacketCoords(packet) - if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { + buf.dropTimedOut() + if len(buf.packets) == 0 { + delete(buffs, streamID) + continue + } + buffs[streamID] = buf + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) { best = streamID - bestSize = len(packets) + bestSize = buf.count } } if bestSize != 0 { - packets := buffs[best] - var packet []byte - packet, packets = packets[0], packets[1:] - if len(packets) == 0 { + buf := buffs[best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.count-- + if len(buf.packets) == 0 { delete(buffs, best) } else { - buffs[best] = packets + buffs[best] = buf } - to.sendPacket(packet) + to.sendPacket(packet.bytes) return true } else { return false @@ -624,8 +654,8 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) boo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) - idle := make(map[switchPort]struct{}) // this is to deduplicate things + buffs := make(map[string]switch_buffer) // Packets per PacketStreamID (string) + idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { case packet := <-t.packetIn: @@ -633,12 +663,12 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(buffs[streamID], packet) - for len(packets) > 32 { - util_putBytes(packets[0]) - packets = packets[1:] - } - buffs[streamID] = packets + buf := buffs[streamID] + buf.dropTimedOut() + pinfo := switch_packetInfo{packet, time.Now()} + buf.packets = append(buf.packets, pinfo) + buf.count++ + buffs[streamID] = buf } case port := <-t.idleIn: // Try to find something to send to this peer