From 5dfa01a0e81f4a982306b9506484b059f204126c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 21 Jun 2018 20:31:30 -0500 Subject: [PATCH 01/21] periodically clean up timed-out sessions and old signatures, instead of trying to do it when creating new sessions or adding new signatures --- src/yggdrasil/router.go | 2 ++ src/yggdrasil/session.go | 24 ++++++++++++++++-------- src/yggdrasil/signature.go | 26 +++++++++++++++----------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 5a6eb455..d2a8c43b 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -101,6 +101,8 @@ func (r *router) mainLoop() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() + r.core.sessions.cleanup() + r.core.sigs.cleanup() util_getBytes() // To slowly drain things } case f := <-r.admin: diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index b36349ac..8631ff22 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -89,7 +89,8 @@ func (s *sessionInfo) timedout() bool { // Sessions are indexed by handle. // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { - core *Core + core *Core + lastCleanup time.Time // Maps known permanent keys to their shared key, used by DHT a lot permShared map[boxPubKey]*boxSharedKey // Maps (secret) handle onto session info @@ -111,6 +112,7 @@ func (ss *sessions) init(core *Core) { ss.byTheirPerm = make(map[boxPubKey]*handle) ss.addrToPerm = make(map[address]*boxPubKey) ss.subnetToPerm = make(map[subnet]*boxPubKey) + ss.lastCleanup = time.Now() } // Gets the session corresponding to a given handle. @@ -202,13 +204,6 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { sinfo.send = make(chan []byte, 32) sinfo.recv = make(chan *wire_trafficPacket, 32) go sinfo.doWorker() - // Do some cleanup - // Time thresholds almost certainly could use some adjusting - for _, s := range ss.sinfos { - if s.timedout() { - s.close() - } - } ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle @@ -217,6 +212,19 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { return &sinfo } +func (ss *sessions) cleanup() { + // Time thresholds almost certainly could use some adjusting + if time.Since(ss.lastCleanup) < time.Minute { + return + } + for _, s := range ss.sinfos { + if s.timedout() { + s.close() + } + } + ss.lastCleanup = time.Now() +} + // Closes a session, removing it from sessions maps and killing the worker goroutine. func (sinfo *sessionInfo) close() { delete(sinfo.core.sessions.sinfos, sinfo.myHandle) diff --git a/src/yggdrasil/signature.go b/src/yggdrasil/signature.go index 374183a0..203c9adc 100644 --- a/src/yggdrasil/signature.go +++ b/src/yggdrasil/signature.go @@ -71,16 +71,20 @@ func (m *sigManager) isChecked(key *sigPubKey, sig *sigBytes, bs []byte) bool { func (m *sigManager) putChecked(key *sigPubKey, newsig *sigBytes, bs []byte) { m.mutex.Lock() defer m.mutex.Unlock() - now := time.Now() - if time.Since(m.lastCleaned) > 60*time.Second { - // Since we have the write lock anyway, do some cleanup - for s, k := range m.checked { - if time.Since(k.time) > 60*time.Second { - delete(m.checked, s) - } - } - m.lastCleaned = now - } - k := knownSig{key: *key, sig: *newsig, bs: bs, time: now} + k := knownSig{key: *key, sig: *newsig, bs: bs, time: time.Now()} m.checked[*newsig] = k } + +func (m *sigManager) cleanup() { + m.mutex.Lock() + defer m.mutex.Unlock() + if time.Since(m.lastCleaned) < time.Minute { + return + } + for s, k := range m.checked { + if time.Since(k.time) > time.Minute { + delete(m.checked, s) + } + } + m.lastCleaned = time.Now() +} From f68f779bee033fe732252230851ee0cb150de908 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 22 Jun 2018 22:26:17 +0100 Subject: [PATCH 02/21] Change box_pub_key to key in admin API --- src/yggdrasil/admin.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 0a9194ef..bd5eb0c8 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -189,34 +189,34 @@ func (a *admin) init(c *Core, listenaddr string) { a.addHandler("getAllowedEncryptionPublicKeys", []string{}, func(in admin_info) (admin_info, error) { return admin_info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil }) - a.addHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in admin_info) (admin_info, error) { - if a.addAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { + a.addHandler("addAllowedEncryptionPublicKey", []string{"key"}, func(in admin_info) (admin_info, error) { + if a.addAllowedEncryptionPublicKey(in["key"].(string)) == nil { return admin_info{ "added": []string{ - in["box_pub_key"].(string), + in["key"].(string), }, }, nil } else { return admin_info{ "not_added": []string{ - in["box_pub_key"].(string), + in["key"].(string), }, - }, errors.New("Failed to add allowed box pub key") + }, errors.New("Failed to add allowed key") } }) - a.addHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in admin_info) (admin_info, error) { - if a.removeAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { + a.addHandler("removeAllowedEncryptionPublicKey", []string{"key"}, func(in admin_info) (admin_info, error) { + if a.removeAllowedEncryptionPublicKey(in["key"].(string)) == nil { return admin_info{ "removed": []string{ - in["box_pub_key"].(string), + in["key"].(string), }, }, nil } else { return admin_info{ "not_removed": []string{ - in["box_pub_key"].(string), + in["key"].(string), }, - }, errors.New("Failed to remove allowed box pub key") + }, errors.New("Failed to remove allowed key") } }) } From 0021f3463ff62522523d9afb6d5d62d5236564d4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 20:39:57 -0500 Subject: [PATCH 03/21] slightly better way for the tcp sender goroutine(s) to block waiting for work --- src/yggdrasil/tcp.go | 45 ++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index db57b568..32cfbf37 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 32) // TODO? what size makes sense + out := make(chan []byte, 1) defer close(out) go func() { var shadow int64 @@ -255,13 +255,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { shadow++ } } - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } + send := make(chan []byte) + defer close(send) + go func() { + for msg := range send { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } + }() timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -278,9 +282,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send(nil) // TCP keep-alive traffic + send <- nil // TCP keep-alive traffic case msg := <-p.linkOut: - send(msg) + send <- msg case msg, ok := <-out: if !ok { return @@ -288,31 +292,32 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { put(msg) } for len(stack) > 0 { + // First make sure linkOut gets sent first, if it's non-empty select { case msg := <-p.linkOut: - send(msg) + send <- msg + default: + } + // Then block until we send or receive something + select { + case msg := <-p.linkOut: + send <- msg case msg, ok := <-out: if !ok { return } put(msg) - default: - msg := stack[len(stack)-1] + case send <- stack[len(stack)-1]: stack = stack[:len(stack)-1] - send(msg) p.updateQueueSize(-1) } } } }() p.out = func(msg []byte) { + p.updateQueueSize(1) defer func() { recover() }() - select { - case out <- msg: - p.updateQueueSize(1) - default: - util_putBytes(msg) - } + out <- msg } p.close = func() { sock.Close() } setNoDelay(sock, true) From cceecf4b1aba0799ee3d1453b047d73f802b937a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 22 Jun 2018 23:46:42 -0500 Subject: [PATCH 04/21] larger out queue size, make sure linkOut packets always get sent first --- src/yggdrasil/tcp.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 32cfbf37..fcefd42b 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1) + out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack defer close(out) go func() { var shadow int64 @@ -296,6 +296,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: send <- msg + continue default: } // Then block until we send or receive something From 2ae213c2557b6b53cea416a07fce137b0104b097 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 01:10:18 -0500 Subject: [PATCH 05/21] I'll try sorting, that's a good trick --- src/yggdrasil/tcp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index fcefd42b..0df2d526 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "net" + "sort" "sync" "sync/atomic" "time" @@ -249,6 +250,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) + sort.SliceStable(stack, func(i, j int) bool { + // Sort in reverse order, with smallest messages at the end + return len(stack[i]) >= len(stack[j]) + }) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] From 988f4ad2654a824c87e07f8f0a10c8f19b1a708a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 19:08:32 -0500 Subject: [PATCH 06/21] add a dedicated switch worker and start using it for lookups --- src/yggdrasil/core.go | 5 ++++ src/yggdrasil/debug.go | 1 + src/yggdrasil/peer.go | 14 +---------- src/yggdrasil/switch.go | 55 ++++++++++++++++++++++++++++++++++------- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index bd7fecec..6d130d81 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } + if err := c.switchTable.start(); err != nil { + c.log.Println("Failed to start switch") + return err + } + if err := c.router.start(); err != nil { c.log.Println("Failed to start router") return err diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f95bfa3d..c029ae9f 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -49,6 +49,7 @@ func (c *Core) Init() { bpub, bpriv := newBoxKeys() spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) + c.switchTable.start() c.router.start() } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ad99750a..d5bdc5a3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -229,19 +229,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { // Drop traffic until the peer manages to send us at least one good switchMsg return } - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - return - } // No payload - toPort := p.core.switchTable.lookup(coords) - if toPort == p.port { - return - } - to := p.core.peers.getPorts()[toPort] - if to == nil { - return - } - to.sendPacket(packet) + p.core.switchTable.packetIn <- packet } // This just calls p.out(packet) for now. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d3b28710..d2026aec 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -155,15 +155,16 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { - core *Core - key sigPubKey // Our own key - time time.Time // Time when locator.tstamp was last updated - parent switchPort // Port of whatever peer is our parent, or self if we're root - drop map[sigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData - data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable + core *Core + key sigPubKey // Our own key + time time.Time // Time when locator.tstamp was last updated + parent switchPort // Port of whatever peer is our parent, or self if we're root + drop map[sigPubKey]int64 // Tstamp associated with a dropped root + mutex sync.RWMutex // Lock for reads/writes of switchData + data switchData + updater atomic.Value //*sync.Once + table atomic.Value //lookupTable + packetIn chan []byte // Incoming packets for the worker to handle } // Initializes the switchTable struct. @@ -177,6 +178,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[sigPubKey]int64) + t.packetIn = make(chan []byte, 1024) } // Safely gets a copy of this node's locator. @@ -438,6 +440,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { return } +//////////////////////////////////////////////////////////////////////////////// + +// The rest of these are related to the switch worker + // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. func (t *switchTable) updateTable() { // WARNING this should only be called from within t.data.updater.Do() @@ -506,3 +512,34 @@ func (t *switchTable) lookup(dest []byte) switchPort { } return best } + +// Starts the switch worker +func (t *switchTable) start() error { + t.core.log.Println("Starting switch") + go t.doWorker() + return nil +} + +func (t *switchTable) handleIn(packet []byte) { + // Get the coords, skipping the first byte (the pType) + _, pTypeLen := wire_decode_uint64(packet) + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { + util_putBytes(packet) + return + } // No payload + toPort := t.lookup(coords) + to := t.core.peers.getPorts()[toPort] + if to == nil { + util_putBytes(packet) + return + } + to.sendPacket(packet) +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { + for packet := range t.packetIn { + t.handleIn(packet) + } +} From 52a0027aea9932babec5beb873d061fbb6489174 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 20:59:26 -0500 Subject: [PATCH 07/21] switch refactoring, setup for a better approximation of local backpressure --- src/yggdrasil/switch.go | 119 ++++++++++++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d2026aec..f74e1c5e 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,7 +12,6 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - "sort" "sync" "sync/atomic" "time" @@ -139,7 +138,7 @@ type tableElem struct { // This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). type lookupTable struct { self switchLocator - elems []tableElem + elems map[switchPort]tableElem } // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. @@ -162,9 +161,10 @@ type switchTable struct { drop map[sigPubKey]int64 // Tstamp associated with a dropped root mutex sync.RWMutex // Lock for reads/writes of switchData data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable - packetIn chan []byte // Incoming packets for the worker to handle + updater atomic.Value //*sync.Once + table atomic.Value //lookupTable + packetIn chan []byte // Incoming packets for the worker to handle + idleIn chan switchPort // Incoming idle notifications from peer links } // Initializes the switchTable struct. @@ -179,6 +179,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.table.Store(lookupTable{}) t.drop = make(map[sigPubKey]int64) t.packetIn = make(chan []byte, 1024) + t.idleIn = make(chan switchPort, 1024) } // Safely gets a copy of this node's locator. @@ -458,7 +459,7 @@ func (t *switchTable) updateTable() { defer t.mutex.RUnlock() newTable := lookupTable{ self: t.data.locator.clone(), - elems: make([]tableElem, 0, len(t.data.peers)), + elems: make(map[switchPort]tableElem, len(t.data.peers)), } for _, pinfo := range t.data.peers { //if !pinfo.forward { continue } @@ -467,17 +468,20 @@ func (t *switchTable) updateTable() { } loc := pinfo.locator.clone() loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link - newTable.elems = append(newTable.elems, tableElem{ + newTable.elems[pinfo.port] = tableElem{ locator: loc, port: pinfo.port, - }) + } } - sort.SliceStable(newTable.elems, func(i, j int) bool { - return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen) - }) t.table.Store(newTable) } +// Returns a copy of the atomically-updated table used for switch lookups +func (t *switchTable) getTable() lookupTable { + t.updater.Load().(*sync.Once).Do(t.updateTable) + return t.table.Load().(lookupTable) +} + // This does the switch layer lookups that decide how to route traffic. // Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. // Traffic must be routed to a node that is closer to the destination via the metric space distance. @@ -485,8 +489,7 @@ func (t *switchTable) updateTable() { // The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. // Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. func (t *switchTable) lookup(dest []byte) switchPort { - t.updater.Load().(*sync.Once).Do(t.updateTable) - table := t.table.Load().(lookupTable) + table := t.getTable() myDist := table.self.dist(dest) if myDist == 0 { return 0 @@ -520,7 +523,7 @@ func (t *switchTable) start() error { return nil } -func (t *switchTable) handleIn(packet []byte) { +func (t *switchTable) handleIn_old(packet []byte) { // Get the coords, skipping the first byte (the pType) _, pTypeLen := wire_decode_uint64(packet) coords, coordLen := wire_decode_coords(packet[pTypeLen:]) @@ -537,9 +540,89 @@ func (t *switchTable) handleIn(packet []byte) { to.sendPacket(packet) } -// The switch worker does routing lookups and sends packets to where they need to be -func (t *switchTable) doWorker() { - for packet := range t.packetIn { - t.handleIn(packet) +// Check if a packet should go to the self node +// This means there's no node closer to the destination than us +// This is mainly used to identify packets addressed to us, or that hit a blackhole +func (t *switchTable) selfIsClosest(dest []byte) bool { + table := t.getTable() + myDist := table.self.dist(dest) + if myDist == 0 { + // Skip the iteration step if it's impossible to be closer + return true + } + for _, info := range table.elems { + dist := info.locator.dist(dest) + if dist < myDist { + return false + } + } + return true +} + +// Returns true if the peer is closer to the destination than ourself +func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { + table := t.getTable() + if info, isIn := table.elems[port]; isIn { + theirDist := info.locator.dist(dest) + myDist := table.self.dist(dest) + return theirDist < myDist + } else { + return false + } +} + +// Handle an incoming packet +// Either send it to ourself, or to the first idle peer that's free +func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { + // Get the coords, skipping the first byte (the pType) + _, pTypeLen := wire_decode_uint64(packet) + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { + util_putBytes(packet) + return true + } // No payload + ports := t.core.peers.getPorts() + if t.selfIsClosest(coords) { + ports[0].sendPacket(packet) + return true + } + for port := range idle { + if to := ports[port]; to != nil { + if t.portIsCloser(coords, port) { + delete(idle, port) + to.sendPacket(packet) + return true + } + } + } + // Didn't find anyone idle to send it to + return false +} + +// The switch worker does routing lookups and sends packets to where they need to be +func (t *switchTable) doWorker() { + var packets [][]byte // Should really be a linked list + idle := make(map[switchPort]struct{}) // this is to deduplicate things + for { + select { + case packet := <-t.packetIn: + idle = make(map[switchPort]struct{}) + for port := range t.getTable().elems { + idle[port] = struct{}{} + } + // TODO correcty fill idle, so the above can be removed + if !t.handleIn(packet, idle) { + // There's nobody free to take it now, so queue it + packets = append(packets, packet) + for len(packets) > 32 { + util_putBytes(packets[0]) + packets = packets[1:] + } + } + case port := <-t.idleIn: + // TODO the part that loops over packets and finds something to send + // Didn't find anything to send, so add this port to the idle list + idle[port] = struct{}{} + } } } From 4b83efa218be2735f0a2e4d3090eeed4af4040b4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 21:51:32 -0500 Subject: [PATCH 08/21] more backpressure work, still needs testing --- src/yggdrasil/debug.go | 4 +++ src/yggdrasil/peer.go | 13 +-------- src/yggdrasil/switch.go | 43 ++++++++++++++++++++++-------- src/yggdrasil/tcp.go | 58 +++++++++++------------------------------ 4 files changed, 52 insertions(+), 66 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c029ae9f..5242a6e0 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -481,13 +481,17 @@ func DEBUG_simLinkPeers(p, q *peer) { } }() p.out = func(bs []byte) { + p.core.switchTable.idleIn <- p.port go q.handlePacket(bs) } q.out = func(bs []byte) { + q.core.switchTable.idleIn <- q.port go p.handlePacket(bs) } go p.linkLoop() go q.linkLoop() + p.core.switchTable.idleIn <- p.port + q.core.switchTable.idleIn <- q.port } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d5bdc5a3..f0dd3d0a 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { ps.ports.Store(ports) } -// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure. +// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { - queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element @@ -94,16 +93,6 @@ type peer struct { close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -// Size of the queue of packets to be sent to the node. -func (p *peer) getQueueSize() int64 { - return atomic.LoadInt64(&p.queueSize) -} - -// Used to increment or decrement the queue. -func (p *peer) updateQueueSize(delta int64) { - atomic.AddInt64(&p.queueSize, delta) -} - // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer { now := time.Now() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f74e1c5e..4c1f0f8d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -503,11 +503,12 @@ func (t *switchTable) lookup(dest []byte) switchPort { if !(dist < myDist) { continue } - p, isIn := ports[info.port] + //p, isIn := ports[info.port] + _, isIn := ports[info.port] if !isIn { continue } - cost := int64(dist) + p.getQueueSize() + cost := int64(dist) // + p.getQueueSize() if cost < bestCost { best = info.port bestCost = cost @@ -573,6 +574,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free +// Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { // Get the coords, skipping the first byte (the pType) _, pTypeLen := wire_decode_uint64(packet) @@ -599,6 +601,27 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool return false } +// Handles incoming idle notifications +// Loops over packets and sends the newest one that's OK for this peer to send +// Returns true if the peer is no longer idle, false if it should be added to the idle list +func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { + to := t.core.peers.getPorts()[port] + if to == nil { + return true + } + for idx := len(*packets) - 1; idx >= 0; idx-- { + packet := (*packets)[idx] + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + if t.portIsCloser(coords, port) { + to.sendPacket(packet) + *packets = append((*packets)[:idx], (*packets)[idx+1:]...) + return true + } + } + return false +} + // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { var packets [][]byte // Should really be a linked list @@ -606,13 +629,9 @@ func (t *switchTable) doWorker() { for { select { case packet := <-t.packetIn: - idle = make(map[switchPort]struct{}) - for port := range t.getTable().elems { - idle[port] = struct{}{} - } - // TODO correcty fill idle, so the above can be removed + // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { - // There's nobody free to take it now, so queue it + // There's nobody free to take it right now, so queue it for later packets = append(packets, packet) for len(packets) > 32 { util_putBytes(packets[0]) @@ -620,9 +639,11 @@ func (t *switchTable) doWorker() { } } case port := <-t.idleIn: - // TODO the part that loops over packets and finds something to send - // Didn't find anything to send, so add this port to the idle list - idle[port] = struct{}{} + // Try to find something to send to this peer + if !t.handleIdle(port, &packets) { + // Didn't find anything ready to send yet, so stay idle + idle[port] = struct{}{} + } } } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 0df2d526..12147d4e 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,7 +19,6 @@ import ( "fmt" "math/rand" "net" - "sort" "sync" "sync/atomic" "time" @@ -243,26 +242,15 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack + out := make(chan []byte, 1) defer close(out) go func() { - var shadow int64 - var stack [][]byte - put := func(msg []byte) { - stack = append(stack, msg) - sort.SliceStable(stack, func(i, j int) bool { - // Sort in reverse order, with smallest messages at the end - return len(stack[i]) >= len(stack[j]) - }) - for len(stack) > 32 { - util_putBytes(stack[0]) - stack = stack[1:] - shadow++ - } - } + // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic send := make(chan []byte) defer close(send) go func() { + // This goroutine does the actual socket write operations + // The parent goroutine aggregates things for it and feeds them in for msg := range send { msgLen := wire_encode_uint64(uint64(len(msg))) buf := net.Buffers{tcp_msg[:], msgLen, msg} @@ -275,10 +263,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer := time.NewTimer(timerInterval) defer timer.Stop() for { - if shadow != 0 { - p.updateQueueSize(-shadow) - shadow = 0 + select { + case msg := <-p.linkOut: + // Always send outgoing link traffic first, if needed + send <- msg + continue + default: } + // Otherwise wait reset the timer and wait for something to do timer.Stop() select { case <-timer.C: @@ -294,34 +286,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { if !ok { return } - put(msg) - } - for len(stack) > 0 { - // First make sure linkOut gets sent first, if it's non-empty - select { - case msg := <-p.linkOut: - send <- msg - continue - default: - } - // Then block until we send or receive something - select { - case msg := <-p.linkOut: - send <- msg - case msg, ok := <-out: - if !ok { - return - } - put(msg) - case send <- stack[len(stack)-1]: - stack = stack[:len(stack)-1] - p.updateQueueSize(-1) - } + send <- msg // Block until the socket writer has the packet + // Now inform the switch that we're ready for more traffic + p.core.switchTable.idleIn <- p.port } } }() + p.core.switchTable.idleIn <- p.port // Start in the idle state p.out = func(msg []byte) { - p.updateQueueSize(1) defer func() { recover() }() out <- msg } From 0ad801bcfe50fbbd889692fe45ff9f887e5798c0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 23:33:03 -0500 Subject: [PATCH 09/21] more work on backpressure, but still needs more testing --- src/yggdrasil/switch.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4c1f0f8d..8fedebe1 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -588,17 +588,31 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool ports[0].sendPacket(packet) return true } + table := t.getTable() + myDist := table.self.dist(coords) + var best *peer + bestDist := myDist for port := range idle { if to := ports[port]; to != nil { - if t.portIsCloser(coords, port) { - delete(idle, port) - to.sendPacket(packet) - return true + if info, isIn := table.elems[to.port]; isIn { + dist := info.locator.dist(coords) + if !(dist < bestDist) { + continue + } + best = to + bestDist = dist } } } - // Didn't find anyone idle to send it to - return false + if best != nil { + // Send to the best idle next hop + delete(idle, best.port) + best.sendPacket(packet) + return true + } else { + // Didn't find anyone idle to send it to + return false + } } // Handles incoming idle notifications From 189628b381809f7b41d5eb5874c2e2383b22f31a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 23 Jun 2018 23:55:27 -0500 Subject: [PATCH 10/21] cleanup --- src/yggdrasil/debug.go | 37 +++++++++++++++++++++++++++- src/yggdrasil/switch.go | 53 +---------------------------------------- 2 files changed, 37 insertions(+), 53 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 5242a6e0..564d59cb 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -141,7 +141,42 @@ func (l *switchLocator) DEBUG_getCoords() []byte { } func (c *Core) DEBUG_switchLookup(dest []byte) switchPort { - return c.switchTable.lookup(dest) + return c.switchTable.DEBUG_lookup(dest) +} + +// This does the switch layer lookups that decide how to route traffic. +// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. +// Traffic must be routed to a node that is closer to the destination via the metric space distance. +// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over). +// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. +// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. +func (t *switchTable) DEBUG_lookup(dest []byte) switchPort { + table := t.getTable() + myDist := table.self.dist(dest) + if myDist == 0 { + return 0 + } + // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow + ports := t.core.peers.getPorts() + var best switchPort + bestCost := int64(^uint64(0) >> 1) + for _, info := range table.elems { + dist := info.locator.dist(dest) + if !(dist < myDist) { + continue + } + //p, isIn := ports[info.port] + _, isIn := ports[info.port] + if !isIn { + continue + } + cost := int64(dist) // + p.getQueueSize() + if cost < bestCost { + best = info.port + bestCost = cost + } + } + return best } /* diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 8fedebe1..65edf62d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -482,41 +482,6 @@ func (t *switchTable) getTable() lookupTable { return t.table.Load().(lookupTable) } -// This does the switch layer lookups that decide how to route traffic. -// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. -// Traffic must be routed to a node that is closer to the destination via the metric space distance. -// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over). -// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. -// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. -func (t *switchTable) lookup(dest []byte) switchPort { - table := t.getTable() - myDist := table.self.dist(dest) - if myDist == 0 { - return 0 - } - // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow - ports := t.core.peers.getPorts() - var best switchPort - bestCost := int64(^uint64(0) >> 1) - for _, info := range table.elems { - dist := info.locator.dist(dest) - if !(dist < myDist) { - continue - } - //p, isIn := ports[info.port] - _, isIn := ports[info.port] - if !isIn { - continue - } - cost := int64(dist) // + p.getQueueSize() - if cost < bestCost { - best = info.port - bestCost = cost - } - } - return best -} - // Starts the switch worker func (t *switchTable) start() error { t.core.log.Println("Starting switch") @@ -524,23 +489,6 @@ func (t *switchTable) start() error { return nil } -func (t *switchTable) handleIn_old(packet []byte) { - // Get the coords, skipping the first byte (the pType) - _, pTypeLen := wire_decode_uint64(packet) - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - util_putBytes(packet) - return - } // No payload - toPort := t.lookup(coords) - to := t.core.peers.getPorts()[toPort] - if to == nil { - util_putBytes(packet) - return - } - to.sendPacket(packet) -} - // Check if a packet should go to the self node // This means there's no node closer to the destination than us // This is mainly used to identify packets addressed to us, or that hit a blackhole @@ -585,6 +533,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } // No payload ports := t.core.peers.getPorts() if t.selfIsClosest(coords) { + // TODO? call the router directly, and remove the whole concept of a self peer? ports[0].sendPacket(packet) return true } From 9c028e1d0d58bce43238ebddf34cf011c32de0d0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 17:39:43 -0500 Subject: [PATCH 11/21] switch to a separate queue per stream of traffic, FIXME for some reason this makes distance calculations more expensive in handleIdle? --- misc/run-schannel-netns | 12 +++--- src/yggdrasil/switch.go | 83 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/misc/run-schannel-netns b/misc/run-schannel-netns index 35c197c0..9723e73b 100755 --- a/misc/run-schannel-netns +++ b/misc/run-schannel-netns @@ -51,12 +51,12 @@ ip netns exec node4 ip link set lo up ip netns exec node5 ip link set lo up ip netns exec node6 ip link set lo up -ip netns exec node1 ./run --autoconf --pprof &> /dev/null & -ip netns exec node2 ./run --autoconf --pprof &> /dev/null & -ip netns exec node3 ./run --autoconf --pprof &> /dev/null & -ip netns exec node4 ./run --autoconf --pprof &> /dev/null & -ip netns exec node5 ./run --autoconf --pprof &> /dev/null & -ip netns exec node6 ./run --autoconf --pprof &> /dev/null & +ip netns exec node1 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node2 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node3 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node4 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node5 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & +ip netns exec node6 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null & echo "Started, to continue you should (possibly w/ sudo):" echo "kill" $(jobs -p) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 65edf62d..4b3c2118 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -520,17 +520,39 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { } } +// Get the coords of a packet without decoding +func switch_getPacketCoords(packet []byte) []byte { + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + return coords +} + +// Returns a unique string for each stream of traffic +// Equal to type+coords+handle for traffic packets +// Equal to type+coords+toKey+fromKey for protocol traffic packets +func switch_getPacketStreamID(packet []byte) string { + pType, pTypeLen := wire_decode_uint64(packet) + _, coordLen := wire_decode_coords(packet[pTypeLen:]) + end := pTypeLen + coordLen + switch { + case pType == wire_Traffic: + end += handleLen // handle + case pType == wire_ProtocolTraffic: + end += 2 * boxPubKeyLen + default: + end = 0 + } + if end > len(packet) { + end = len(packet) + } + return string(packet[:end]) +} + // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { - // Get the coords, skipping the first byte (the pType) - _, pTypeLen := wire_decode_uint64(packet) - coords, coordLen := wire_decode_coords(packet[pTypeLen:]) - if coordLen >= len(packet) { - util_putBytes(packet) - return true - } // No payload + coords := switch_getPacketCoords(packet) ports := t.core.peers.getPorts() if t.selfIsClosest(coords) { // TODO? call the router directly, and remove the whole concept of a self peer? @@ -564,6 +586,10 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } +/* +FIXME for some reason the new version is a *lot* slower than this one was +It seems to be from the switchLocator.dist(coords) calls + // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list @@ -584,10 +610,45 @@ func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { } return false } +*/ + +// Handles incoming idle notifications +// Loops over packets and sends the newest one that's OK for this peer to send +// Returns true if the peer is no longer idle, false if it should be added to the idle list +func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { + to := t.core.peers.getPorts()[port] + if to == nil { + return true + } + var best string + var bestSize int + for streamID, packets := range stacks { + // Filter over the streams that this node is closer to + packet := packets[len(packets)-1] + if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(packet, port) { + best = streamID + bestSize = len(packets) + } + } + if bestSize != 0 { + packets := stacks[best] + var packet []byte + packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + if len(packets) == 0 { + delete(stacks, best) + } else { + stacks[best] = packets + } + to.sendPacket(packet) + return true + } else { + return false + } +} // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - var packets [][]byte // Should really be a linked list + stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -595,15 +656,17 @@ func (t *switchTable) doWorker() { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later - packets = append(packets, packet) + streamID := switch_getPacketStreamID(packet) + packets := append(stacks[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } + stacks[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, &packets) { + if !t.handleIdle(port, stacks) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } From 03949dcf3f4fd431d98bc619c20bfc84ec35c53e Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 18:05:00 -0500 Subject: [PATCH 12/21] fix my terrible bug, I have no idea why the old one even worked --- src/yggdrasil/switch.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4b3c2118..e5db8ca9 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -624,8 +624,10 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo var bestSize int for streamID, packets := range stacks { // Filter over the streams that this node is closer to + // Keep the one with the smallest queue packet := packets[len(packets)-1] - if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(packet, port) { + coords := switch_getPacketCoords(packet) + if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID bestSize = len(packets) } From 4ad24465573c2e0a61c5e912d705c1e067a75421 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 18:21:00 -0500 Subject: [PATCH 13/21] cleanup --- src/yggdrasil/switch.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index e5db8ca9..ae1c391d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -586,32 +586,6 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } -/* -FIXME for some reason the new version is a *lot* slower than this one was -It seems to be from the switchLocator.dist(coords) calls - -// Handles incoming idle notifications -// Loops over packets and sends the newest one that's OK for this peer to send -// Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - for idx := len(*packets) - 1; idx >= 0; idx-- { - packet := (*packets)[idx] - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - if t.portIsCloser(coords, port) { - to.sendPacket(packet) - *packets = append((*packets)[:idx], (*packets)[idx+1:]...) - return true - } - } - return false -} -*/ - // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list From 7695a3fcbfabc56d2b70d313086c956b75a0bf03 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 20:20:07 -0500 Subject: [PATCH 14/21] try using a simpler FIFO order for each backpressure buffer, since there are other mechanisms to penalize the flooding node, leads to better TCP throughput without affecting traffic between other nodes (does affect traffic in the same session, but there's hypothetically workarounds to that) --- src/yggdrasil/switch.go | 22 +++++++++++----------- src/yggdrasil/tcp.go | 28 +++++++++++----------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ae1c391d..6c5fd21d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,17 +589,17 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string var bestSize int - for streamID, packets := range stacks { + for streamID, packets := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[len(packets)-1] + packet := packets[0] coords := switch_getPacketCoords(packet) if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID @@ -607,13 +607,13 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo } } if bestSize != 0 { - packets := stacks[best] + packets := buffs[best] var packet []byte - packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + packet, packets = packets[0], packets[1:] if len(packets) == 0 { - delete(stacks, best) + delete(buffs, best) } else { - stacks[best] = packets + buffs[best] = packets } to.sendPacket(packet) return true @@ -624,7 +624,7 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) + buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -633,16 +633,16 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(stacks[streamID], packet) + packets := append(buffs[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } - stacks[streamID] = packets + buffs[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, stacks) { + if !t.handleIdle(port, buffs) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 12147d4e..80ae2847 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -246,19 +246,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer close(out) go func() { // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := make(chan []byte) - defer close(send) - go func() { - // This goroutine does the actual socket write operations - // The parent goroutine aggregates things for it and feeds them in - for msg := range send { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } - }() + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -266,7 +260,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: // Always send outgoing link traffic first, if needed - send <- msg + send(msg) continue default: } @@ -279,14 +273,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send <- nil // TCP keep-alive traffic + send(nil) // TCP keep-alive traffic case msg := <-p.linkOut: - send <- msg + send(msg) case msg, ok := <-out: if !ok { return } - send <- msg // Block until the socket writer has the packet + send(msg) // Block until the socket write has finished // Now inform the switch that we're ready for more traffic p.core.switchTable.idleIn <- p.port } From 11acb0129d238597bdb2608241b44be3f76536ce Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jun 2018 14:17:07 +0100 Subject: [PATCH 15/21] Use alien to generate RPMs --- .circleci/config.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1f774364..f6f96479 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,6 +11,7 @@ jobs: steps: - checkout + - run: name: Create artifact upload directory and set variables command: | @@ -19,13 +20,19 @@ jobs: echo 'export CIVERSION=$(sh contrib/semver/version.sh | cut -c 2-)' >> $BASH_ENV - run: - name: Build for Linux (including Debian packages) + name: Install alien + command: | + sudo apt-get install -y alien + + - run: + name: Build for Linux (including Debian packages and RPMs) command: | PKGARCH=amd64 sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-amd64 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-amd64; PKGARCH=i386 sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-i386 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-i386; PKGARCH=mipsel sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mipsel && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mipsel; PKGARCH=mips sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mips && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mips; PKGARCH=armhf sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-armh && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-armhf; + alien --to-rpm yggdrasil*.deb --scripts && mv *.rpm /tmp/upload/; mv *.deb /tmp/upload/ - run: From a7d1f2127114522ee8854f677e88bbf2a11716ef Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jun 2018 14:19:22 +0100 Subject: [PATCH 16/21] Run alien as root so package permissions are right --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f6f96479..d7a305fe 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: PKGARCH=mipsel sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mipsel && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mipsel; PKGARCH=mips sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mips && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mips; PKGARCH=armhf sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-armh && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-armhf; - alien --to-rpm yggdrasil*.deb --scripts && mv *.rpm /tmp/upload/; + sudo alien --to-rpm yggdrasil*.deb --scripts && mv *.rpm /tmp/upload/; mv *.deb /tmp/upload/ - run: From 3d0b39f05a705b32b2a60da9382fa5b833aa172b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jun 2018 14:21:31 +0100 Subject: [PATCH 17/21] Keep version number --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index d7a305fe..3dbe743d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: PKGARCH=mipsel sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mipsel && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mipsel; PKGARCH=mips sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mips && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mips; PKGARCH=armhf sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-armh && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-armhf; - sudo alien --to-rpm yggdrasil*.deb --scripts && mv *.rpm /tmp/upload/; + sudo alien --to-rpm yggdrasil*.deb --scripts --keep-version && mv *.rpm /tmp/upload/; mv *.deb /tmp/upload/ - run: From b63b534fa787a69779fdce1aa4628b6563382401 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 25 Jun 2018 18:12:18 -0500 Subject: [PATCH 18/21] drop packets that have been queued for longer than some timeout (currently 25ms) instead of using fixed length queues --- src/yggdrasil/switch.go | 72 +++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6c5fd21d..5b72620c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -586,36 +586,66 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } +// Info about a buffered packet +type switch_packetInfo struct { + bytes []byte + time time.Time // Timestamp of when the packet arrived +} + +// Used to keep track of buffered packets +type switch_buffer struct { + packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large + count uint64 // Total queue size, including dropped packets +} + +func (b *switch_buffer) dropTimedOut() { + // TODO figure out what timeout makes sense + const timeout = 25 * time.Millisecond + now := time.Now() + for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { + util_putBytes(b.packets[0].bytes) + b.packets = b.packets[1:] + } +} + // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string - var bestSize int - for streamID, packets := range buffs { + var bestSize uint64 + for streamID, buf := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[0] - coords := switch_getPacketCoords(packet) - if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { + buf.dropTimedOut() + if len(buf.packets) == 0 { + delete(buffs, streamID) + continue + } + buffs[streamID] = buf + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) { best = streamID - bestSize = len(packets) + bestSize = buf.count } } if bestSize != 0 { - packets := buffs[best] - var packet []byte - packet, packets = packets[0], packets[1:] - if len(packets) == 0 { + buf := buffs[best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.count-- + if len(buf.packets) == 0 { delete(buffs, best) } else { - buffs[best] = packets + buffs[best] = buf } - to.sendPacket(packet) + to.sendPacket(packet.bytes) return true } else { return false @@ -624,8 +654,8 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) boo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) - idle := make(map[switchPort]struct{}) // this is to deduplicate things + buffs := make(map[string]switch_buffer) // Packets per PacketStreamID (string) + idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { case packet := <-t.packetIn: @@ -633,12 +663,12 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(buffs[streamID], packet) - for len(packets) > 32 { - util_putBytes(packets[0]) - packets = packets[1:] - } - buffs[streamID] = packets + buf := buffs[streamID] + buf.dropTimedOut() + pinfo := switch_packetInfo{packet, time.Now()} + buf.packets = append(buf.packets, pinfo) + buf.count++ + buffs[streamID] = buf } case port := <-t.idleIn: // Try to find something to send to this peer From dd6ca6e4b6d3e9a994c1709ad6295c561e6f4114 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 28 Jun 2018 18:47:10 -0500 Subject: [PATCH 19/21] Add changelog --- CHANGELOG.md | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..a5615c50 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,87 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + + + +## [Unreleased] +### Added +- Begin keeping changelog (incomplete and possibly inaccurate information before this point). +- Build RPMs in CircleCI using alien. + +### Changed +- Local backpressure improvements. +- Change `box_pub_key` to `key` in admin API. +- Session cleanup. + +## [0.2.2] - 2018-06-21 +### Added +- Add yggdrasilconf for testing with vyatta-yggdrasil. +- Add a randomized retry delay after TCP disconnects, to prevent synchronization livelocks. + +### Changed +- Update build script ot strip by default, allow debug `-d` and UPX `-u` flags. +- Start pprof in debug builds based on an environment variable (e.g. `PPROFLISTEN=localhost:6060`), instead of a flag. + +### Fixed +- Fix typo in big-endian BOM. + +## [0.2.1] - 2018-06-15 +### Changed +- The address range was moved from `fd00::/8` to `200::/7`. + +### Fixed +- UTF-16 conversion for configuration files. +- Fixes to the Debian package control file. +- Fixes to the launchd service for macOS. +- Fixes to the DHT and switch. + +## [0.2.0] - 2018-06-13 +### Added +- Exchange version information during connection setup, to prevent connections with incompatible versions. + +### Changed +- Wire format changes (backwards incompatible). +- Less maintenance traffic per peer. +- Exponential back-off for DHT maintenance traffic (less maintenance traffic for known good peers). +- Iterative DHT (added some time between v0.1.0 and here). +- Use local queue sizes for a sort of local-only backpressure routing, instead of the removed bandwidth estimates, when deciding where to send a packet. + +### Removed +- UDP peering, this may be added again if/when a better implementation appears. +- Per peer bandwidth estimation. + +## [0.1.0] - 2018-02-01 +### Added +- Adopt semantic versioning. + +### Changed +- Wire format changes (backwards incompatible). +- Many other undocumented changes leading up to this release and before the next one. + +## [0.0.1] - 2017-12-28 +### Added +- First commit. +- Initial public release. + From 1fced2bdf01b5af0e19ee0b4886a5fb05a831add Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 29 Jun 2018 08:35:39 +0100 Subject: [PATCH 20/21] Update changelog --- CHANGELOG.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5615c50..5bb20764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,31 +28,32 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added - Begin keeping changelog (incomplete and possibly inaccurate information before this point). -- Build RPMs in CircleCI using alien. +- Build RPMs in CircleCI using alien. This provides package support for Fedora, Red Hat Enterprise Linux, CentOS and other RPM-based distributions. ### Changed - Local backpressure improvements. -- Change `box_pub_key` to `key` in admin API. +- Change `box_pub_key` to `key` in admin API for simplicity. - Session cleanup. ## [0.2.2] - 2018-06-21 ### Added -- Add yggdrasilconf for testing with vyatta-yggdrasil. +- Add `yggdrasilconf` utility for testing with the `vyatta-yggdrasil` package. - Add a randomized retry delay after TCP disconnects, to prevent synchronization livelocks. ### Changed -- Update build script ot strip by default, allow debug `-d` and UPX `-u` flags. +- Update build script to strip by default, which significantly reduces the size of the binary. +- Add debug `-d` and UPX `-u` flags to the `build` script. - Start pprof in debug builds based on an environment variable (e.g. `PPROFLISTEN=localhost:6060`), instead of a flag. ### Fixed -- Fix typo in big-endian BOM. +- Fix typo in big-endian BOM so that both little-endian and big-endian UTF-16 files are detected correctly. ## [0.2.1] - 2018-06-15 ### Changed -- The address range was moved from `fd00::/8` to `200::/7`. +- The address range was moved from `fd00::/8` to `200::/7`. This range was chosen as it is marked as deprecated. The change prevents overlap with other ULA privately assigned ranges. ### Fixed -- UTF-16 conversion for configuration files. +- UTF-16 detection conversion for configuration files, which can particularly be a problem on Windows 10 if a configuration file is generated from within PowerShell. - Fixes to the Debian package control file. - Fixes to the launchd service for macOS. - Fixes to the DHT and switch. @@ -70,7 +71,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Removed - UDP peering, this may be added again if/when a better implementation appears. -- Per peer bandwidth estimation. +- Per peer bandwidth estimation, as this has been replaced with an early local backpressure implementation. ## [0.1.0] - 2018-02-01 ### Added @@ -84,4 +85,3 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - First commit. - Initial public release. - From af99cebf118d7fc8a34b7eabc15b037408416a75 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 29 Jun 2018 23:20:58 +0100 Subject: [PATCH 21/21] Update changelog version --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bb20764..8d8b7d48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - in case of vulnerabilities. --> -## [Unreleased] +## [0.2.3] - 2018-06-29 ### Added - Begin keeping changelog (incomplete and possibly inaccurate information before this point). - Build RPMs in CircleCI using alien. This provides package support for Fedora, Red Hat Enterprise Linux, CentOS and other RPM-based distributions.