From f308e81bf3e906e3c02d43a12b953269a5f82f09 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 18 Feb 2020 20:13:39 -0600 Subject: [PATCH 1/9] in the switch, keep a separate set of queues per peer instead of a global queue --- src/yggdrasil/api.go | 17 ++--- src/yggdrasil/link.go | 19 ------ src/yggdrasil/switch.go | 140 +++++++++++++++++++++------------------- 3 files changed, 82 insertions(+), 94 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 82d0aa93..4a6ae417 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -207,15 +207,16 @@ func (c *Core) GetSwitchQueues() SwitchQueues { HighestSize: switchTable.queues.maxsize, MaximumSize: switchTable.queues.totalMaxSize, } - for k, v := range switchTable.queues.bufs { - nexthop := switchTable.bestPortForCoords([]byte(k)) - queue := SwitchQueue{ - ID: k, - Size: v.size, - Packets: uint64(len(v.packets)), - Port: uint64(nexthop), + for port, pbuf := range switchTable.queues.bufs { + for k, v := range pbuf { + queue := SwitchQueue{ + ID: k, + Size: v.size, + Packets: uint64(len(v.packets)), + Port: uint64(port), + } + switchqueues.Queues = append(switchqueues.Queues, queue) } - switchqueues.Queues = append(switchqueues.Queues, queue) } } phony.Block(&c.switchTable, getSwitchQueues) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 157ea525..fb40fc08 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -282,13 +282,6 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { }) } -// called by an AfterFunc if we seem to be blocked in a send syscall for a long time -func (intf *linkInterface) _notifySyscall() { - intf.link.core.switchTable.Act(intf, func() { - intf.link.core.switchTable._sendingIn(intf.peer.port) - }) -} - // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelStallTimer() { if intf.stallTimer != nil { @@ -402,19 +395,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool size += len(bs) } w.intf.notifySending(size, isLinkTraffic) - // start a timer that will fire if we get stuck in writeMsgs for an oddly long time - var once sync.Once - timer := time.AfterFunc(time.Millisecond, func() { - // 1 ms is kind of arbitrary - // the rationale is that this should be very long compared to a syscall - // but it's still short compared to end-to-end latency or human perception - once.Do(func() { - w.intf.Act(nil, w.intf._notifySyscall) - }) - }) w.intf.msgIO.writeMsgs(bss) - // Make sure we either stop the timer from doing anything or wait until it's done - once.Do(func() { timer.Stop() }) w.intf.notifySent(size, isLinkTraffic) // Cleanup for _, bs := range bss { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 653b12f1..899d143d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -177,7 +177,6 @@ type switchTable struct { phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor - sending map[switchPort]struct{} // peers known to be blocked in a send (somehow) } // Minimum allowed total size of switch queues. @@ -202,9 +201,8 @@ func (t *switchTable) init(core *Core) { t.queues.totalMaxSize = SwitchQueueTotalMinSize } core.config.Mutex.RUnlock() - t.queues.bufs = make(map[string]switch_buffer) + t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) - t.sending = make(map[switchPort]struct{}) }) } @@ -666,27 +664,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) - if len(closer) == 0 { - // TODO? call the router directly, and remove the whole concept of a self peer? - self := t.core.peers.getPorts()[0] - self.sendPacketsFrom(t, [][]byte{packet}) - return true - } var best *closerInfo ports := t.core.peers.getPorts() for _, cinfo := range closer { to := ports[cinfo.elem.port] - //_, isIdle := idle[cinfo.elem.port] - _, isSending := sending[cinfo.elem.port] var update bool switch { case to == nil: // no port was found, ignore it - case isSending: - // the port is busy, ignore it case best == nil: // this is the first idle port we've found, so select it until we find a // better candidate port to use instead @@ -715,15 +703,20 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen best = &b } } - if best != nil { - if _, isIdle := idle[best.elem.port]; isIdle { - delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - return true - } + if best == nil { + // No closer peers + // TODO? call the router directly, and remove the whole concept of a self peer? + self := t.core.peers.getPorts()[0] + self.sendPacketsFrom(t, [][]byte{packet}) + return true, 0 } - // Didn't find anyone idle to send it to - return false + if _, isIdle := idle[best.elem.port]; isIdle { + delete(idle, best.elem.port) + ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + return true, best.elem.port + } + // Best node isn't idle, so return port and let the packet be buffered + return false, best.elem.port } // Info about a buffered packet @@ -740,24 +733,29 @@ type switch_buffer struct { type switch_buffers struct { totalMaxSize uint64 - bufs map[string]switch_buffer // Buffers indexed by StreamID - size uint64 // Total size of all buffers, in bytes + bufs map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID + size uint64 // Total size of all buffers, in bytes maxbufs int maxsize uint64 closer []closerInfo // Scratch space } func (b *switch_buffers) _cleanup(t *switchTable) { - for streamID, buf := range b.bufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + // Remove queues for which we have no next hop + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if len(t.getCloser(coords)) == 0 { + for _, packet := range buf.packets { + util.PutBytes(packet.bytes) + } + b.size -= buf.size + delete(pbufs, streamID) } - b.size -= buf.size - delete(b.bufs, streamID) + } + if len(pbufs) == 0 { + delete(b.bufs, port) } } @@ -765,23 +763,28 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Drop a random queue target := rand.Uint64() % b.size var size uint64 // running total - for streamID, buf := range b.bufs { - size += buf.size - if size < target { - continue + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + size += buf.size + if size < target { + continue + } + var packet switch_packetInfo + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + b.size -= uint64(len(packet.bytes)) + util.PutBytes(packet.bytes) + if len(buf.packets) == 0 { + delete(pbufs, streamID) + if len(pbufs) == 0 { + delete(b.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[streamID] = buf + } + break } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(b.bufs, streamID) - } else { - // Need to update the map, since buf was retrieved by value - b.bufs[streamID] = buf - } - break } } } @@ -799,32 +802,35 @@ func (t *switchTable) _handleIdle(port switchPort) bool { var psize int t.queues._cleanup(t) now := time.Now() + pbufs := t.queues.bufs[port] for psize < 65535 { var best string var bestPriority float64 - for streamID, buf := range t.queues.bufs { + for streamID, buf := range pbufs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority && t.portIsCloser(coords, port) { + if priority >= bestPriority { best = streamID bestPriority = priority } } if best != "" { - buf := t.queues.bufs[best] + buf := pbufs[best] var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] buf.size -= uint64(len(packet.bytes)) t.queues.size -= uint64(len(packet.bytes)) if len(buf.packets) == 0 { - delete(t.queues.bufs, best) + delete(pbufs, best) + if len(pbufs) == 0 { + delete(t.queues.bufs, port) + } } else { // Need to update the map, since buf was retrieved by value - t.queues.bufs[best] = buf + pbufs[best] = buf } packets = append(packets, packet.bytes) psize += len(packet.bytes) @@ -848,11 +854,14 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t._handleIn(bytes, t.idle, t.sending) { + if sent, best := t._handleIn(bytes, t.idle); !sent { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) - buf, bufExists := t.queues.bufs[streamID] + if _, isIn := t.queues.bufs[best]; !isIn { + t.queues.bufs[best] = make(map[string]switch_buffer) + } + buf, bufExists := t.queues.bufs[best][streamID] buf.packets = append(buf.packets, packet) buf.size += uint64(len(packet.bytes)) t.queues.size += uint64(len(packet.bytes)) @@ -860,13 +869,17 @@ func (t *switchTable) _packetIn(bytes []byte) { if t.queues.size > t.queues.maxsize { t.queues.maxsize = t.queues.size } - t.queues.bufs[streamID] = buf + t.queues.bufs[best][streamID] = buf if !bufExists { // Keep a track of the max total queue count. Only recalculate this // when the queue is new because otherwise repeating len(dict) might // cause unnecessary processing overhead - if len(t.queues.bufs) > t.queues.maxbufs { - t.queues.maxbufs = len(t.queues.bufs) + var count int + for _, pbufs := range t.queues.bufs { + count += len(pbufs) + } + if count > t.queues.maxbufs { + t.queues.maxbufs = count } } t.queues._cleanup(t) @@ -875,15 +888,8 @@ func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _idleIn(port switchPort) { // Try to find something to send to this peer - delete(t.sending, port) if !t._handleIdle(port) { // Didn't find anything ready to send yet, so stay idle t.idle[port] = struct{}{} } } - -func (t *switchTable) _sendingIn(port switchPort) { - if _, isIn := t.idle[port]; !isIn { - t.sending[port] = struct{}{} - } -} From 48098799958d5450c894821a1d768f475cb7c4d7 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 10 Mar 2020 01:03:07 -0500 Subject: [PATCH 2/9] refactor switch code so calling lookupTable.lookup does most of the important work --- src/yggdrasil/switch.go | 109 +++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 62 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d4dd045a..ce5e3db6 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -596,14 +596,14 @@ func (t *switchTable) getCloser(dest []byte) []closerInfo { // Skip the iteration step if it's impossible to be closer return nil } - t.queues.closer = t.queues.closer[:0] + var closer []closerInfo for _, info := range table.elems { dist := info.locator.dist(dest) if dist < myDist { - t.queues.closer = append(t.queues.closer, closerInfo{info, dist}) + closer = append(closer, closerInfo{info, dist}) } } - return t.queues.closer + return closer } // Returns true if the peer is closer to the destination than ourself @@ -645,20 +645,41 @@ func switch_getFlowLabelFromCoords(in []byte) []byte { return []byte{} } -// Find the best port for a given set of coords -func (t *switchTable) bestPortForCoords(coords []byte) switchPort { - table := t.getTable() - var best switchPort - bestDist := table.self.dist(coords) - for to, elem := range table.elems { - dist := elem.locator.dist(coords) - if !(dist < bestDist) { +// Find the best port to forward to for a given set of coords +func (t *lookupTable) lookup(coords []byte) switchPort { + var bestPort switchPort + myDist := t.self.dist(coords) + bestDist := myDist + var bestElem tableElem + for _, info := range t.elems { + dist := info.locator.dist(coords) + if dist >= myDist { continue } - best = to - bestDist = dist + var update bool + switch { + case dist < bestDist: + // Closer to destination + update = true + case dist > bestDist: + // Further from destination + case info.locator.tstamp > bestElem.locator.tstamp: + // Newer root update + update = true + case info.locator.tstamp < bestElem.locator.tstamp: + // Older root update + case info.time.Before(bestElem.time): + // Received root update via this peer sooner + update = true + default: + } + if update { + bestPort = info.port + bestDist = dist + bestElem = info + } } - return best + return bestPort } // Handle an incoming packet @@ -666,57 +687,22 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { coords := switch_getPacketCoords(packet) - closer := t.getCloser(coords) - var best *closerInfo + table := t.getTable() + port := table.lookup(coords) ports := t.core.peers.getPorts() - for _, cinfo := range closer { - to := ports[cinfo.elem.port] - var update bool - switch { - case to == nil: - // no port was found, ignore it - case best == nil: - // this is the first idle port we've found, so select it until we find a - // better candidate port to use instead - update = true - case cinfo.dist < best.dist: - // the port takes a shorter path/is more direct than our current - // candidate, so select that instead - update = true - case cinfo.dist > best.dist: - // the port takes a longer path/is less direct than our current candidate, - // ignore it - case cinfo.elem.locator.tstamp > best.elem.locator.tstamp: - // has a newer tstamp from the root, so presumably a better path - update = true - case cinfo.elem.locator.tstamp < best.elem.locator.tstamp: - // has a n older tstamp, so presumably a worse path - case cinfo.elem.time.Before(best.elem.time): - // same tstamp, but got it earlier, so presumably a better path - //t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time) - update = true - default: - // the search for a port has finished - } - if update { - b := cinfo // because cinfo gets mutated by the iteration - best = &b - } - } - if best == nil { - // No closer peers - // TODO? call the router directly, and remove the whole concept of a self peer? - self := t.core.peers.getPorts()[0] - self.sendPacketsFrom(t, [][]byte{packet}) + peer := ports[port] + if peer == nil { + // FIXME hack, if the peer disappeared durring a race then don't buffer return true, 0 } - if _, isIdle := idle[best.elem.port]; isIdle { - delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - return true, best.elem.port + if _, isIdle := idle[port]; isIdle || port == 0 { + // Either no closer peers, or the closest peer is idle + delete(idle, port) + peer.sendPacketsFrom(t, [][]byte{packet}) + return true, port } - // Best node isn't idle, so return port and let the packet be buffered - return false, best.elem.port + // There's a closer peer, but it's not idle, so buffer it + return false, port } // Info about a buffered packet @@ -737,7 +723,6 @@ type switch_buffers struct { size uint64 // Total size of all buffers, in bytes maxbufs int maxsize uint64 - closer []closerInfo // Scratch space } func (b *switch_buffers) _cleanup(t *switchTable) { From e926a3be6d2b9d475dd6b7a5677ec4442033ff74 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 00:23:38 -0500 Subject: [PATCH 3/9] work in progress actorizing core.peers and replacing switch worker with per-peer switch-generated lookupTable --- contrib/ansible/genkeys.go | 4 +- src/yggdrasil/api.go | 23 ++-- src/yggdrasil/link.go | 7 +- src/yggdrasil/nodeinfo.go | 4 +- src/yggdrasil/peer.go | 119 ++++++++++-------- src/yggdrasil/router.go | 6 +- src/yggdrasil/switch.go | 244 +++++++++++++++++++------------------ 7 files changed, 221 insertions(+), 186 deletions(-) diff --git a/contrib/ansible/genkeys.go b/contrib/ansible/genkeys.go index 1d7c222d..681431b5 100644 --- a/contrib/ansible/genkeys.go +++ b/contrib/ansible/genkeys.go @@ -12,9 +12,9 @@ import ( "net" "os" + "github.com/cheggaaa/pb/v3" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/cheggaaa/pb/v3" ) var numHosts = flag.Int("hosts", 1, "number of host vars to generate") @@ -30,7 +30,7 @@ type keySet struct { func main() { flag.Parse() - bar := pb.StartNew(*keyTries * 2 + *numHosts) + bar := pb.StartNew(*keyTries*2 + *numHosts) if *numHosts > *keyTries { println("Can't generate less keys than hosts.") diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 4a6ae417..15e2acd6 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -110,7 +110,8 @@ type Session struct { // there is exactly one entry then this node is not connected to any other nodes // and is therefore isolated. func (c *Core) GetPeers() []Peer { - ports := c.peers.ports.Load().(map[switchPort]*peer) + var ports map[switchPort]*peer + phony.Block(&c.peers, func() { ports = c.peers.ports }) var peers []Peer var ps []switchPort for port := range ports { @@ -143,10 +144,14 @@ func (c *Core) GetPeers() []Peer { // isolated or not connected to any peers. func (c *Core) GetSwitchPeers() []SwitchPeer { var switchpeers []SwitchPeer - table := c.switchTable.table.Load().(lookupTable) - peers := c.peers.ports.Load().(map[switchPort]*peer) + var table *lookupTable + var ports map[switchPort]*peer + phony.Block(&c.peers, func() { + table = c.peers.table + ports = c.peers.ports + }) for _, elem := range table.elems { - peer, isIn := peers[elem.port] + peer, isIn := ports[elem.port] if !isIn { continue } @@ -325,8 +330,8 @@ func (c *Core) EncryptionPublicKey() string { // connected to any other nodes (effectively making you the root of a // single-node network). func (c *Core) Coords() []uint64 { - table := c.switchTable.table.Load().(lookupTable) - return wire_coordsBytestoUint64s(table.self.getCoords()) + loc := c.switchTable.getLocator() + return wire_coordsBytestoUint64s(loc.getCoords()) } // Address gets the IPv6 address of the Yggdrasil node. This is always a /128 @@ -490,7 +495,11 @@ func (c *Core) CallPeer(addr string, sintf string) error { // DisconnectPeer disconnects a peer once. This should be specified as a port // number. func (c *Core) DisconnectPeer(port uint64) error { - c.peers.removePeer(switchPort(port)) + c.peers.Act(nil, func() { + if p, isIn := c.peers.ports[switchPort(port)]; isIn { + p.Act(&c.peers, p._removeSelf) + } + }) return nil } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index fb40fc08..fa6563f1 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -217,13 +217,16 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + phony.Block(&intf.link.core.peers, func() { + // FIXME don't use phony.Block, it's bad practice, even if it's safe here + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + }) if intf.peer == nil { return errors.New("failed to create peer") } defer func() { // More cleanup can go here - intf.link.core.peers.removePeer(intf.peer.port) + intf.peer.Act(nil, intf.peer._removeSelf) }() intf.peer.out = func(msgs [][]byte) { intf.writer.sendFrom(intf.peer, msgs, false) diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index fc6250d6..745756fe 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -187,9 +187,9 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse } func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { - table := m.core.switchTable.table.Load().(lookupTable) + loc := m.core.switchTable.getLocator() nodeinfo := nodeinfoReqRes{ - SendCoords: table.self.getCoords(), + SendCoords: loc.getCoords(), IsResponse: isResponse, NodeInfo: m._getNodeInfo(), } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4206857e..7fa2b317 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -6,8 +6,6 @@ package yggdrasil import ( "encoding/hex" - "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -21,17 +19,17 @@ import ( // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { + phony.Inbox core *Core - mutex sync.Mutex // Synchronize writes to atomic - ports atomic.Value //map[switchPort]*peer, use CoW semantics + ports map[switchPort]*peer // use CoW semantics, share updated version with each peer + table *lookupTable // Sent from switch, share updated version with each peer } // Initializes the peers struct. func (ps *peers) init(c *Core) { - ps.mutex.Lock() - defer ps.mutex.Unlock() - ps.putPorts(make(map[switchPort]*peer)) ps.core = c + ps.ports = make(map[switchPort]*peer) + ps.table = new(lookupTable) } func (ps *peers) reconfigure() { @@ -80,16 +78,6 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } -// Atomically gets a map[switchPort]*peer of known peers. -func (ps *peers) getPorts() map[switchPort]*peer { - return ps.ports.Load().(map[switchPort]*peer) -} - -// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time). -func (ps *peers) putPorts(ports map[switchPort]*peer) { - ps.ports.Store(ports) -} - // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox @@ -110,10 +98,31 @@ type peer struct { // The below aren't actually useful internally, they're just gathered for getPeers statistics bytesSent uint64 bytesRecvd uint64 + ports map[switchPort]*peer + table *lookupTable +} + +func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { + ps.Act(from, func() { + ps.table = table + ps._updatePeers() + }) +} + +func (ps *peers) _updatePeers() { + ports := ps.ports + table := ps.table + for _, peer := range ps.ports { + p := peer // peer is mutated during iteration + p.Act(ps, func() { + p.ports = ports + p.table = table + }) + } } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -125,9 +134,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare core: ps.core, intf: intf, } - ps.mutex.Lock() - defer ps.mutex.Unlock() - oldPorts := ps.getPorts() + oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { newPorts[k] = v @@ -139,46 +146,49 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare break } } - ps.putPorts(newPorts) + ps.ports = newPorts + ps._updatePeers() return &p } -// Removes a peer for a given port, if one exists. -func (ps *peers) removePeer(port switchPort) { - if port == 0 { - return - } // Can't remove self peer - phony.Block(&ps.core.router, func() { - ps.core.switchTable.forgetPeer(port) +func (p *peer) _removeSelf() { + p.core.peers.Act(p, func() { + p.core.peers._removePeer(p) }) - ps.mutex.Lock() - oldPorts := ps.getPorts() - p, isIn := oldPorts[port] +} + +// Removes a peer for a given port, if one exists. +func (ps *peers) _removePeer(p *peer) { + if q := ps.ports[p.port]; p.port == 0 || q != p { + return + } // Can't remove self peer or nonexistant peer + ps.core.switchTable.forgetPeer(p.port) + oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { newPorts[k] = v } - delete(newPorts, port) - ps.putPorts(newPorts) - ps.mutex.Unlock() - if isIn { - if p.close != nil { - p.close() - } - close(p.done) + delete(newPorts, p.port) + if p.close != nil { + p.close() } + close(p.done) + ps.ports = newPorts + ps._updatePeers() } // If called, sends a notification to each peer that they should send a new switch message. // Mainly called by the switch after an update. func (ps *peers) sendSwitchMsgs(from phony.Actor) { - ports := ps.getPorts() - for _, p := range ports { - if p.port == 0 { - continue + ps.Act(from, func() { + for _, peer := range ps.ports { + p := peer + if p.port == 0 { + continue + } + p.Act(ps, p._sendSwitchMsg) } - p.Act(from, p._sendSwitchMsg) - } + }) } // This must be launched in a separate goroutine by whatever sets up the peer struct. @@ -236,12 +246,16 @@ func (p *peer) _handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { - table := p.core.switchTable.getTable() - if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + if _, isIn := p.table.elems[p.port]; !isIn && p.port != 0 { // Drop traffic if the peer isn't in the switch return } - p.core.switchTable.packetInFrom(p, packet) + coords := switch_getPacketCoords(packet) + next := p.table.lookup(coords) + if nPeer, isIn := p.ports[next]; isIn { + nPeer.sendPacketsFrom(p, [][]byte{packet}) + } + //p.core.switchTable.packetInFrom(p, packet) } func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { @@ -259,6 +273,7 @@ func (p *peer) _sendPackets(packets [][]byte) { size += len(packet) } p.bytesSent += uint64(size) + // FIXME need to manage queues here or else things can block! p.out(packets) } @@ -335,7 +350,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) { return } if len(msg.Hops) < 1 { - p.core.peers.removePeer(p.port) + p._removeSelf() + return } var loc switchLocator prevKey := msg.Root @@ -346,7 +362,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) { loc.coords = append(loc.coords, hop.Port) bs := getBytesForSig(&hop.Next, &sigMsg) if !crypto.Verify(&prevKey, bs, &hop.Sig) { - p.core.peers.removePeer(p.port) + p._removeSelf() + return } prevKey = hop.Next } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b08a12d3..ac4d655d 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -61,7 +61,11 @@ func (r *router) init(core *Core) { linkType: "self", }, } - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) + var p *peer + phony.Block(&r.core.peers, func() { + // FIXME don't block here! + p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) + }) p.out = func(packets [][]byte) { r.handlePackets(p, packets) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ce5e3db6..33f2a1bd 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,13 +12,12 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - "math/rand" + //"math/rand" "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" + //"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -172,8 +171,6 @@ type switchTable struct { mutex sync.RWMutex // Lock for reads/writes of switchData parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // - updater atomic.Value // *sync.Once - table atomic.Value // lookupTable phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor @@ -190,8 +187,6 @@ func (t *switchTable) init(core *Core) { locator := switchLocator{root: t.key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} - t.updater.Store(&sync.Once{}) - t.table.Store(lookupTable{}) t.drop = make(map[crypto.SigPubKey]int64) phony.Block(t, func() { core.config.Mutex.RLock() @@ -204,6 +199,7 @@ func (t *switchTable) init(core *Core) { t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) }) + t.updateTable() } func (t *switchTable) reconfigure() { @@ -254,7 +250,7 @@ func (t *switchTable) cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - t.updater.Store(&sync.Once{}) + t.updateTable() t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} @@ -292,7 +288,7 @@ func (t *switchTable) forgetPeer(port switchPort) { t.mutex.Lock() defer t.mutex.Unlock() delete(t.data.peers, port) - t.updater.Store(&sync.Once{}) + defer t.updateTable() if port != t.parent { return } @@ -528,7 +524,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { - t.updater.Store(&sync.Once{}) + t.updateTable() } return } @@ -566,13 +562,7 @@ func (t *switchTable) updateTable() { time: pinfo.time, } } - t.table.Store(newTable) -} - -// Returns a copy of the atomically-updated table used for switch lookups -func (t *switchTable) getTable() lookupTable { - t.updater.Load().(*sync.Once).Do(t.updateTable) - return t.table.Load().(lookupTable) + t.core.peers.updateTables(nil, &newTable) // TODO not be from nil } // Starts the switch worker @@ -589,6 +579,7 @@ type closerInfo struct { // Return a map of ports onto distance, keeping only ports closer to the destination than this node // If the map is empty (or nil), then no peer is closer +/* func (t *switchTable) getCloser(dest []byte) []closerInfo { table := t.getTable() myDist := table.self.dist(dest) @@ -605,8 +596,10 @@ func (t *switchTable) getCloser(dest []byte) []closerInfo { } return closer } +*/ // Returns true if the peer is closer to the destination than ourself +/* func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { table := t.getTable() if info, isIn := table.elems[port]; isIn { @@ -617,6 +610,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { return false } } +*/ // Get the coords of a packet without decoding func switch_getPacketCoords(packet []byte) []byte { @@ -686,23 +680,26 @@ func (t *lookupTable) lookup(coords []byte) switchPort { // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { - coords := switch_getPacketCoords(packet) - table := t.getTable() - port := table.lookup(coords) - ports := t.core.peers.getPorts() - peer := ports[port] - if peer == nil { - // FIXME hack, if the peer disappeared durring a race then don't buffer - return true, 0 - } - if _, isIdle := idle[port]; isIdle || port == 0 { - // Either no closer peers, or the closest peer is idle - delete(idle, port) - peer.sendPacketsFrom(t, [][]byte{packet}) - return true, port - } - // There's a closer peer, but it's not idle, so buffer it - return false, port + /* + coords := switch_getPacketCoords(packet) + table := t.getTable() + port := table.lookup(coords) + ports := t.core.peers.getPorts() + peer := ports[port] + if peer == nil { + // FIXME hack, if the peer disappeared durring a race then don't buffer + return true, 0 + } + if _, isIdle := idle[port]; isIdle || port == 0 { + // Either no closer peers, or the closest peer is idle + delete(idle, port) + peer.sendPacketsFrom(t, [][]byte{packet}) + return true, port + } + // There's a closer peer, but it's not idle, so buffer it + return false, port + */ + return true, 0 } // Info about a buffered packet @@ -726,52 +723,54 @@ type switch_buffers struct { } func (b *switch_buffers) _cleanup(t *switchTable) { - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) - } - b.size -= buf.size - delete(pbufs, streamID) - } - } - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } - - for b.size > b.totalMaxSize { - // Drop a random queue - target := rand.Uint64() % b.size - var size uint64 // running total + /* for port, pbufs := range b.bufs { for streamID, buf := range pbufs { - size += buf.size - if size < target { - continue - } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(pbufs, streamID) - if len(pbufs) == 0 { - delete(b.bufs, port) + // Remove queues for which we have no next hop + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if len(t.getCloser(coords)) == 0 { + for _, packet := range buf.packets { + util.PutBytes(packet.bytes) } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[streamID] = buf + b.size -= buf.size + delete(pbufs, streamID) } - break + } + if len(pbufs) == 0 { + delete(b.bufs, port) } } - } + + for b.size > b.totalMaxSize { + // Drop a random queue + target := rand.Uint64() % b.size + var size uint64 // running total + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + size += buf.size + if size < target { + continue + } + var packet switch_packetInfo + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + b.size -= uint64(len(packet.bytes)) + util.PutBytes(packet.bytes) + if len(buf.packets) == 0 { + delete(pbufs, streamID) + if len(pbufs) == 0 { + delete(b.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[streamID] = buf + } + break + } + } + } + */ } // Handles incoming idle notifications @@ -779,57 +778,60 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Returns true if the peer is no longer idle, false if it should be added to the idle list func (t *switchTable) _handleIdle(port switchPort) bool { // TODO? only send packets for which this is the best next hop that isn't currently blocked sending - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - var packets [][]byte - var psize int - t.queues._cleanup(t) - now := time.Now() - pbufs := t.queues.bufs[port] - for psize < 65535 { - var best *string - var bestPriority float64 - for streamID, buf := range pbufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority { - b := streamID // copy since streamID is mutated in the loop - best = &b - bestPriority = priority - } + /* + to := t.core.peers.getPorts()[port] + if to == nil { + return true } - if best != nil { - buf := pbufs[*best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(pbufs, *best) - if len(pbufs) == 0 { - delete(t.queues.bufs, port) + var packets [][]byte + var psize int + t.queues._cleanup(t) + now := time.Now() + pbufs := t.queues.bufs[port] + for psize < 65535 { + var best *string + var bestPriority float64 + for streamID, buf := range pbufs { + // Filter over the streams that this node is closer to + // Keep the one with the smallest queue + packet := buf.packets[0] + priority := float64(now.Sub(packet.time)) / float64(buf.size) + if priority >= bestPriority { + b := streamID // copy since streamID is mutated in the loop + best = &b + bestPriority = priority } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[*best] = buf - } - packets = append(packets, packet.bytes) - psize += len(packet.bytes) - } else { - // Finished finding packets - break + if best != nil { + buf := pbufs[*best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + t.queues.size -= uint64(len(packet.bytes)) + if len(buf.packets) == 0 { + delete(pbufs, *best) + if len(pbufs) == 0 { + delete(t.queues.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[*best] = buf + + } + packets = append(packets, packet.bytes) + psize += len(packet.bytes) + } else { + // Finished finding packets + break + } } - } - if len(packets) > 0 { - to.sendPacketsFrom(t, packets) - return true - } + if len(packets) > 0 { + to.sendPacketsFrom(t, packets) + return true + } + return false + */ return false } From d47797088f52ebfb32ee292b8f6563634863b7d0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 00:48:41 -0500 Subject: [PATCH 4/9] fix shutdown deadlock --- src/yggdrasil/core.go | 2 ++ src/yggdrasil/switch.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 98a5c6e1..dcb5bc7a 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -195,8 +195,10 @@ func (c *Core) _stop() { c.addPeerTimer.Stop() } c.link.stop() + /* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown for _, peer := range c.GetPeers() { c.DisconnectPeer(peer.Port) } + */ c.log.Infoln("Stopped") } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 33f2a1bd..7ccb6c94 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -250,7 +250,7 @@ func (t *switchTable) cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - t.updateTable() + defer t.updateTable() t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} @@ -524,7 +524,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { - t.updateTable() + defer t.updateTable() } return } From 15b850be6e6e1bd02753edbcd0155ac08928149d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 01:38:32 -0500 Subject: [PATCH 5/9] fix deadlock when running updateTable in the switch --- src/yggdrasil/switch.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 7ccb6c94..ab2e1194 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -199,7 +199,7 @@ func (t *switchTable) init(core *Core) { t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) }) - t.updateTable() + t._updateTable() } func (t *switchTable) reconfigure() { @@ -250,7 +250,7 @@ func (t *switchTable) cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - defer t.updateTable() + defer t._updateTable() t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} @@ -288,7 +288,7 @@ func (t *switchTable) forgetPeer(port switchPort) { t.mutex.Lock() defer t.mutex.Unlock() delete(t.data.peers, port) - defer t.updateTable() + defer t._updateTable() if port != t.parent { return } @@ -524,7 +524,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { - defer t.updateTable() + defer t._updateTable() } return } @@ -534,7 +534,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep // The rest of these are related to the switch worker // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. -func (t *switchTable) updateTable() { +func (t *switchTable) _updateTable() { // WARNING this should only be called from within t.data.updater.Do() // It relies on the sync.Once for synchronization with messages and lookups // TODO use a pre-computed faster lookup table @@ -543,8 +543,6 @@ func (t *switchTable) updateTable() { // Each struct has stores the best port to forward to, and a next coord map // Move to struct, then iterate over coord maps until you dead end // The last port before the dead end should be the closest - t.mutex.RLock() - defer t.mutex.RUnlock() newTable := lookupTable{ self: t.data.locator.clone(), elems: make(map[switchPort]tableElem, len(t.data.peers)), From 9834f222db65efab838a1a2403b8e039109742f2 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 29 Mar 2020 19:01:50 -0500 Subject: [PATCH 6/9] more work in progress actorizing the remaining parts of the switch --- src/yggdrasil/api.go | 7 ++- src/yggdrasil/dht.go | 12 ++-- src/yggdrasil/link.go | 4 +- src/yggdrasil/nodeinfo.go | 3 +- src/yggdrasil/peer.go | 37 +++++++----- src/yggdrasil/router.go | 18 +++++- src/yggdrasil/search.go | 3 +- src/yggdrasil/session.go | 5 +- src/yggdrasil/switch.go | 123 +++++++++++++++++--------------------- 9 files changed, 111 insertions(+), 101 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 15e2acd6..a722dc52 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -330,8 +330,11 @@ func (c *Core) EncryptionPublicKey() string { // connected to any other nodes (effectively making you the root of a // single-node network). func (c *Core) Coords() []uint64 { - loc := c.switchTable.getLocator() - return wire_coordsBytestoUint64s(loc.getCoords()) + var coords []byte + phony.Block(&c.router, func() { + coords = c.router.table.self.getCoords() + }) + return wire_coordsBytestoUint64s(coords) } // Address gets the IPv6 address of the Yggdrasil node. This is always a /128 diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 8efc549f..56d03ed1 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -186,11 +186,9 @@ func dht_ordered(first, second, third *crypto.NodeID) bool { // Update info about the node that sent the request. func (t *dht) handleReq(req *dhtReq) { // Send them what they asked for - loc := t.router.core.switchTable.getLocator() - coords := loc.getCoords() res := dhtRes{ Key: t.router.core.boxPub, - Coords: coords, + Coords: t.router.table.self.getCoords(), Dest: req.Dest, Infos: t.lookup(&req.Dest, false), } @@ -300,11 +298,9 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) { if target == nil { target = &t.nodeID } - loc := t.router.core.switchTable.getLocator() - coords := loc.getCoords() req := dhtReq{ Key: t.router.core.boxPub, - Coords: coords, + Coords: t.router.table.self.getCoords(), Dest: *target, } t.sendReq(&req, info) @@ -378,7 +374,7 @@ func (t *dht) getImportant() []*dhtInfo { }) // Keep the ones that are no further than the closest seen so far minDist := ^uint64(0) - loc := t.router.core.switchTable.getLocator() + loc := t.router.table.self important := infos[:0] for _, info := range infos { dist := uint64(loc.dist(info.coords)) @@ -416,7 +412,7 @@ func (t *dht) isImportant(ninfo *dhtInfo) bool { } important := t.getImportant() // Check if ninfo is of equal or greater importance to what we already know - loc := t.router.core.switchTable.getLocator() + loc := t.router.table.self ndist := uint64(loc.dist(ninfo.coords)) minDist := ^uint64(0) for _, info := range important { diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index fa6563f1..978e8eab 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -300,7 +300,7 @@ func (intf *linkInterface) notifyBlockedSend() { intf.Act(nil, func() { if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. - intf.link.core.switchTable.blockPeer(intf.peer.port) + intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) } @@ -340,7 +340,7 @@ func (intf *linkInterface) notifyStalled() { intf.stallTimer.Stop() intf.stallTimer = nil intf.stalled = true - intf.link.core.switchTable.blockPeer(intf.peer.port) + intf.link.core.switchTable.blockPeer(intf, intf.peer.port) } }) } diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 745756fe..b179d20b 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -18,6 +18,7 @@ type nodeinfo struct { myNodeInfo NodeInfoPayload callbacks map[crypto.BoxPubKey]nodeinfoCallback cache map[crypto.BoxPubKey]nodeinfoCached + table *lookupTable } type nodeinfoCached struct { @@ -187,7 +188,7 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse } func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { - loc := m.core.switchTable.getLocator() + loc := m.table.self nodeinfo := nodeinfoReqRes{ SendCoords: loc.getCoords(), IsResponse: isResponse, diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7fa2b317..9acb9321 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -162,7 +162,7 @@ func (ps *peers) _removePeer(p *peer) { if q := ps.ports[p.port]; p.port == 0 || q != p { return } // Can't remove self peer or nonexistant peer - ps.core.switchTable.forgetPeer(p.port) + ps.core.switchTable.forgetPeer(ps, p.port) oldPorts := ps.ports newPorts := make(map[switchPort]*peer) for k, v := range oldPorts { @@ -328,7 +328,7 @@ func (p *peer) _handleLinkTraffic(bs []byte) { // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. func (p *peer) _sendSwitchMsg() { - msg := p.core.switchTable.getMsg() + msg := p.table.getMsg() if msg == nil { return } @@ -367,19 +367,26 @@ func (p *peer) _handleSwitchMsg(packet []byte) { } prevKey = hop.Next } - p.core.switchTable.handleMsg(&msg, p.port) - if !p.core.switchTable.checkRoot(&msg) { - // Bad switch message - p.dinfo = nil - return - } - // Pass a message to the dht informing it that this peer (still) exists - loc.coords = loc.coords[:len(loc.coords)-1] - p.dinfo = &dhtInfo{ - key: p.box, - coords: loc.getCoords(), - } - p._updateDHT() + p.core.switchTable.Act(p, func() { + if !p.core.switchTable._checkRoot(&msg) { + // Bad switch message + p.Act(&p.core.switchTable, func() { + p.dinfo = nil + }) + } else { + // handle the message + p.core.switchTable._handleMsg(&msg, p.port, false) + p.Act(&p.core.switchTable, func() { + // Pass a message to the dht informing it that this peer (still) exists + loc.coords = loc.coords[:len(loc.coords)-1] + p.dinfo = &dhtInfo{ + key: p.box, + coords: loc.getCoords(), + } + p._updateDHT() + }) + } + }) } // This generates the bytes that we sign or check the signature of for a switchMsg. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index ac4d655d..40b8303f 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -46,6 +46,7 @@ type router struct { nodeinfo nodeinfo searches searches sessions sessions + table *lookupTable // has a copy of our locator } // Initializes the router struct, which includes setting up channels to/from the adapter. @@ -77,6 +78,21 @@ func (r *router) init(core *Core) { r.sessions.init(r) } +func (r *router) updateTable(from phony.Actor, table *lookupTable) { + r.Act(from, func() { + r.table = table + r.nodeinfo.Act(r, func() { + r.nodeinfo.table = table + }) + for _, ses := range r.sessions.sinfos { + sinfo := ses + sinfo.Act(r, func() { + sinfo.table = table + }) + } + }) +} + // Reconfigures the router and any child modules. This should only ever be run // by the router actor. func (r *router) reconfigure() { @@ -130,7 +146,7 @@ func (r *router) reset(from phony.Actor) { func (r *router) doMaintenance() { phony.Block(r, func() { // Any periodic maintenance stuff goes here - r.core.switchTable.doMaintenance() + r.core.switchTable.doMaintenance(r) r.dht.doMaintenance() r.sessions.cleanup() }) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 91f0490c..febde3d8 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -161,11 +161,10 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) { // Initially start a search func (sinfo *searchInfo) startSearch() { - loc := sinfo.searches.router.core.switchTable.getLocator() var infos []*dhtInfo infos = append(infos, &dhtInfo{ key: sinfo.searches.router.core.boxPub, - coords: loc.getCoords(), + coords: sinfo.searches.router.table.self.getCoords(), }) // Start the search by asking ourself, useful if we're the destination sinfo.continueSearch(infos) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index eaa67fd0..01c2cdfb 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -52,6 +52,7 @@ type sessionInfo struct { cancel util.Cancellation // Used to terminate workers conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers + table *lookupTable // table.self is a locator where we get our coords } // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -217,6 +218,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) + sinfo.table = ss.router.table ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle return &sinfo @@ -266,8 +268,7 @@ func (ss *sessions) removeSession(sinfo *sessionInfo) { // Returns a session ping appropriate for the given session info. func (sinfo *sessionInfo) _getPing() sessionPing { - loc := sinfo.sessions.router.core.switchTable.getLocator() - coords := loc.getCoords() + coords := sinfo.table.self.getCoords() ping := sessionPing{ SendPermPub: sinfo.sessions.router.core.boxPub, Handle: sinfo.myHandle, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ab2e1194..2661b460 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,12 +12,9 @@ package yggdrasil // A little annoying to do with constant changes from backpressure import ( - //"math/rand" - "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - //"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/Arceliar/phony" ) @@ -149,6 +146,7 @@ type tableElem struct { type lookupTable struct { self switchLocator elems map[switchPort]tableElem + _msg switchMsg } // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. @@ -168,7 +166,6 @@ type switchTable struct { key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // phony.Inbox // Owns the below @@ -208,24 +205,17 @@ func (t *switchTable) reconfigure() { t.core.peers.reconfigure() } -// Safely gets a copy of this node's locator. -func (t *switchTable) getLocator() switchLocator { - t.mutex.RLock() - defer t.mutex.RUnlock() - return t.data.locator.clone() -} - // Regular maintenance to possibly timeout/reset the root and similar. -func (t *switchTable) doMaintenance() { - // Periodic maintenance work to keep things internally consistent - t.mutex.Lock() // Write lock - defer t.mutex.Unlock() // Release lock when we're done - t.cleanRoot() - t.cleanDropped() +func (t *switchTable) doMaintenance(from phony.Actor) { + t.Act(from, func() { + // Periodic maintenance work to keep things internally consistent + t._cleanRoot() + t._cleanDropped() + }) } // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. -func (t *switchTable) cleanRoot() { +func (t *switchTable) _cleanRoot() { // TODO rethink how this is done?... // Get rid of the root if it looks like its timed out now := time.Now() @@ -259,49 +249,49 @@ func (t *switchTable) cleanRoot() { } // Blocks and, if possible, unparents a peer -func (t *switchTable) blockPeer(port switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - peer, isIn := t.data.peers[port] - if !isIn { - return - } - peer.blocked = true - t.data.peers[port] = peer - if port != t.parent { - return - } - t.parent = 0 - for _, info := range t.data.peers { - if info.port == port { - continue +func (t *switchTable) blockPeer(from phony.Actor, port switchPort) { + t.Act(from, func() { + peer, isIn := t.data.peers[port] + if !isIn { + return } - t.unlockedHandleMsg(&info.msg, info.port, true) - } - t.unlockedHandleMsg(&peer.msg, peer.port, true) + peer.blocked = true + t.data.peers[port] = peer + if port != t.parent { + return + } + t.parent = 0 + for _, info := range t.data.peers { + if info.port == port { + continue + } + t._handleMsg(&info.msg, info.port, true) + } + t._handleMsg(&peer.msg, peer.port, true) + }) } // Removes a peer. // Must be called by the router actor with a lambda that calls this. // If the removed peer was this node's parent, it immediately tries to find a new parent. -func (t *switchTable) forgetPeer(port switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - delete(t.data.peers, port) - defer t._updateTable() - if port != t.parent { - return - } - t.parent = 0 - for _, info := range t.data.peers { - t.unlockedHandleMsg(&info.msg, info.port, true) - } +func (t *switchTable) forgetPeer(from phony.Actor, port switchPort) { + t.Act(from, func() { + delete(t.data.peers, port) + defer t._updateTable() + if port != t.parent { + return + } + t.parent = 0 + for _, info := range t.data.peers { + t._handleMsg(&info.msg, info.port, true) + } + }) } // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // This function is called periodically to do that cleanup. -func (t *switchTable) cleanDropped() { +func (t *switchTable) _cleanDropped() { // TODO? only call this after root changes, not periodically for root := range t.drop { if !firstIsBetter(&root, &t.data.locator.root) { @@ -327,9 +317,7 @@ type switchMsgHop struct { } // This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer. -func (t *switchTable) getMsg() *switchMsg { - t.mutex.RLock() - defer t.mutex.RUnlock() +func (t *switchTable) _getMsg() *switchMsg { if t.parent == 0 { return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} } else if parent, isIn := t.data.peers[t.parent]; isIn { @@ -341,14 +329,18 @@ func (t *switchTable) getMsg() *switchMsg { } } +func (t *lookupTable) getMsg() *switchMsg { + msg := t._msg + msg.Hops = append([]switchMsgHop(nil), t._msg.Hops...) + return &msg +} + // This function checks that the root information in a switchMsg is OK. // In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout. -func (t *switchTable) checkRoot(msg *switchMsg) bool { +func (t *switchTable) _checkRoot(msg *switchMsg) bool { // returns false if it's a dropped root, not a better root, or has an older timestamp // returns true otherwise // used elsewhere to keep inserting peers into the dht only if root info is OK - t.mutex.RLock() - defer t.mutex.RUnlock() dropTstamp, isIn := t.drop[msg.Root] switch { case isIn && dropTstamp >= msg.TStamp: @@ -364,20 +356,13 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool { } } -// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch. -func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { - t.mutex.Lock() - defer t.mutex.Unlock() - t.unlockedHandleMsg(msg, fromPort, false) -} - // This updates the switch with information about a peer. // Then the tricky part, it decides if it should update our own locator as a result. // That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc... // There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing. // It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order. // Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things. -func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { +func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { // TODO directly use a switchMsg instead of switchMessage + sigs now := time.Now() // Set up the sender peerInfo @@ -500,10 +485,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep if peer.port == sender.port { continue } - t.unlockedHandleMsg(&peer.msg, peer.port, true) + t._handleMsg(&peer.msg, peer.port, true) } // Process the sender last, to avoid keeping them as a parent if at all possible. - t.unlockedHandleMsg(&sender.msg, sender.port, true) + t._handleMsg(&sender.msg, sender.port, true) case now.Sub(t.time) < switch_throttle: // We've already gotten an update from this root recently, so ignore this one to avoid flooding. case sender.locator.tstamp > t.data.locator.tstamp: @@ -521,7 +506,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep } t.data.locator = sender.locator t.parent = sender.port - t.core.peers.sendSwitchMsgs(t) + defer t.core.peers.sendSwitchMsgs(t) } if true || doUpdate { defer t._updateTable() @@ -560,7 +545,9 @@ func (t *switchTable) _updateTable() { time: pinfo.time, } } - t.core.peers.updateTables(nil, &newTable) // TODO not be from nil + newTable._msg = *t._getMsg() + t.core.peers.updateTables(t, &newTable) + t.core.router.updateTable(t, &newTable) } // Starts the switch worker From 945930aa2ccbc327ae6bef0ec8db36b65a398a17 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 3 Apr 2020 00:32:26 -0500 Subject: [PATCH 7/9] WIP have peer actors queue packets, temporarily a single simple FIFO queue with head drop --- src/yggdrasil/api.go | 29 ---- src/yggdrasil/link.go | 18 +-- src/yggdrasil/packetqueue.go | 39 +++++ src/yggdrasil/peer.go | 43 ++++-- src/yggdrasil/router.go | 9 +- src/yggdrasil/switch.go | 291 +---------------------------------- 6 files changed, 91 insertions(+), 338 deletions(-) create mode 100644 src/yggdrasil/packetqueue.go diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index a722dc52..31ece6b8 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -199,35 +199,6 @@ func (c *Core) GetDHT() []DHTEntry { return dhtentries } -// GetSwitchQueues returns information about the switch queues that are -// currently in effect. These values can change within an instant. -func (c *Core) GetSwitchQueues() SwitchQueues { - var switchqueues SwitchQueues - switchTable := &c.switchTable - getSwitchQueues := func() { - switchqueues = SwitchQueues{ - Count: uint64(len(switchTable.queues.bufs)), - Size: switchTable.queues.size, - HighestCount: uint64(switchTable.queues.maxbufs), - HighestSize: switchTable.queues.maxsize, - MaximumSize: switchTable.queues.totalMaxSize, - } - for port, pbuf := range switchTable.queues.bufs { - for k, v := range pbuf { - queue := SwitchQueue{ - ID: k, - Size: v.size, - Packets: uint64(len(v.packets)), - Port: uint64(port), - } - switchqueues.Queues = append(switchqueues.Queues, queue) - } - } - } - phony.Block(&c.switchTable, getSwitchQueues) - return switchqueues -} - // GetSessions returns a list of open sessions from this node to other nodes. func (c *Core) GetSessions() []Session { var sessions []Session diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 978e8eab..15017993 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -62,7 +62,7 @@ type linkInterface struct { keepAliveTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - inSwitch bool // True if the switch is tracking this link + isIdle bool // True if the peer actor knows the link is idle stalled bool // True if we haven't been receiving any response traffic unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) } @@ -278,7 +278,7 @@ const ( func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { intf.Act(&intf.writer, func() { if !isLinkTraffic { - intf.inSwitch = false + intf.isIdle = false } intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() @@ -311,7 +311,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { intf.sendTimer.Stop() intf.sendTimer = nil if !isLinkTraffic { - intf._notifySwitch() + intf._notifyIdle() } if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) @@ -320,15 +320,13 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { } // Notify the switch that we're ready for more traffic, assuming we're not in a stalled state -func (intf *linkInterface) _notifySwitch() { - if !intf.inSwitch { +func (intf *linkInterface) _notifyIdle() { + if !intf.isIdle { if intf.stalled { intf.unstalled = false } else { - intf.inSwitch = true - intf.link.core.switchTable.Act(intf, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) + intf.isIdle = true + intf.peer.Act(intf, intf.peer._handleIdle) } } } @@ -364,7 +362,7 @@ func (intf *linkInterface) notifyRead(size int) { } intf.stalled = false if !intf.unstalled { - intf._notifySwitch() + intf._notifyIdle() intf.unstalled = true } if size > 0 && intf.stallTimer == nil { diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go new file mode 100644 index 00000000..ac66c0da --- /dev/null +++ b/src/yggdrasil/packetqueue.go @@ -0,0 +1,39 @@ +package yggdrasil + +import "github.com/yggdrasil-network/yggdrasil-go/src/util" + +// TODO take max size from config +const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB + +// TODO separate queues per e.g. traffic flow +type packetQueue struct { + packets [][]byte + size uint32 +} + +func (q *packetQueue) cleanup() { + for q.size > MAX_PACKET_QUEUE_SIZE { + if packet, success := q.pop(); success { + util.PutBytes(packet) + } else { + panic("attempted to drop packet from empty queue") + break + } + } +} + +func (q *packetQueue) push(packet []byte) { + q.packets = append(q.packets, packet) + q.size += uint32(len(packet)) + q.cleanup() +} + +func (q *packetQueue) pop() ([]byte, bool) { + if len(q.packets) > 0 { + packet := q.packets[0] + q.packets = q.packets[1:] + q.size -= uint32(len(packet)) + return packet, true + } + return nil, false +} diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 9acb9321..bc9de04c 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -100,6 +100,8 @@ type peer struct { bytesRecvd uint64 ports map[switchPort]*peer table *lookupTable + queue packetQueue + idle bool } func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { @@ -243,6 +245,13 @@ func (p *peer) _handlePacket(packet []byte) { } } +// Get the coords of a packet without decoding +func peer_getPacketCoords(packet []byte) []byte { + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + return coords +} + // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { @@ -250,7 +259,7 @@ func (p *peer) _handleTraffic(packet []byte) { // Drop traffic if the peer isn't in the switch return } - coords := switch_getPacketCoords(packet) + coords := peer_getPacketCoords(packet) next := p.table.lookup(coords) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketsFrom(p, [][]byte{packet}) @@ -264,17 +273,33 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { }) } -// This just calls p.out(packet) for now. func (p *peer) _sendPackets(packets [][]byte) { - // Is there ever a case where something more complicated is needed? - // What if p.out blocks? - var size int for _, packet := range packets { - size += len(packet) + p.queue.push(packet) + } + if p.idle { + p.idle = false + p._handleIdle() + } +} + +func (p *peer) _handleIdle() { + var packets [][]byte + var size uint64 + for size < 65535 { + if packet, success := p.queue.pop(); success { + packets = append(packets, packet) + size += uint64(len(packet)) + } else { + break + } + } + if len(packets) > 0 { + p.bytesSent += uint64(size) + p.out(packets) + } else { + p.idle = true } - p.bytesSent += uint64(size) - // FIXME need to manage queues here or else things can block! - p.out(packets) } // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 40b8303f..1be94661 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,14 @@ func (r *router) init(core *Core) { // FIXME don't block here! p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) }) - p.out = func(packets [][]byte) { r.handlePackets(p, packets) } + p.out = func(packets [][]byte) { + r.handlePackets(p, packets) + r.Act(p, func() { + // after the router handle the packets, notify the peer that it's ready for more + p.Act(r, p._handleIdle) + }) + } + p.Act(r, p._handleIdle) r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 2661b460..091596b5 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -164,13 +164,11 @@ type switchData struct { type switchTable struct { core *Core key crypto.SigPubKey // Our own key + phony.Inbox // Owns the below time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // - phony.Inbox // Owns the below - queues switch_buffers // Queues - not atomic so ONLY use through the actor - idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor } // Minimum allowed total size of switch queues. @@ -185,18 +183,7 @@ func (t *switchTable) init(core *Core) { peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} t.drop = make(map[crypto.SigPubKey]int64) - phony.Block(t, func() { - core.config.Mutex.RLock() - if core.config.Current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize { - t.queues.totalMaxSize = core.config.Current.SwitchOptions.MaxTotalQueueSize - } else { - t.queues.totalMaxSize = SwitchQueueTotalMinSize - } - core.config.Mutex.RUnlock() - t.queues.bufs = make(map[switchPort]map[string]switch_buffer) - t.idle = make(map[switchPort]struct{}) - }) - t._updateTable() + phony.Block(t, t._updateTable) } func (t *switchTable) reconfigure() { @@ -557,73 +544,6 @@ func (t *switchTable) start() error { return nil } -type closerInfo struct { - elem tableElem - dist int -} - -// Return a map of ports onto distance, keeping only ports closer to the destination than this node -// If the map is empty (or nil), then no peer is closer -/* -func (t *switchTable) getCloser(dest []byte) []closerInfo { - table := t.getTable() - myDist := table.self.dist(dest) - if myDist == 0 { - // Skip the iteration step if it's impossible to be closer - return nil - } - var closer []closerInfo - for _, info := range table.elems { - dist := info.locator.dist(dest) - if dist < myDist { - closer = append(closer, closerInfo{info, dist}) - } - } - return closer -} -*/ - -// Returns true if the peer is closer to the destination than ourself -/* -func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { - table := t.getTable() - if info, isIn := table.elems[port]; isIn { - theirDist := info.locator.dist(dest) - myDist := table.self.dist(dest) - return theirDist < myDist - } else { - return false - } -} -*/ - -// Get the coords of a packet without decoding -func switch_getPacketCoords(packet []byte) []byte { - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - return coords -} - -// Returns a unique string for each stream of traffic -// Equal to coords -// The sender may append arbitrary info to the end of coords (as long as it's begins with a 0x00) to designate separate traffic streams -// Currently, it's the IPv6 next header type and the first 2 uint16 of the next header -// This is equivalent to the TCP/UDP protocol numbers and the source / dest ports -// TODO figure out if something else would make more sense (other transport protocols?) -func switch_getPacketStreamID(packet []byte) string { - return string(switch_getPacketCoords(packet)) -} - -// Returns the flowlabel from a given set of coords -func switch_getFlowLabelFromCoords(in []byte) []byte { - for i, v := range in { - if v == 0 { - return in[i+1:] - } - } - return []byte{} -} - // Find the best port to forward to for a given set of coords func (t *lookupTable) lookup(coords []byte) switchPort { var bestPort switchPort @@ -660,210 +580,3 @@ func (t *lookupTable) lookup(coords []byte) switchPort { } return bestPort } - -// Handle an incoming packet -// Either send it to ourself, or to the first idle peer that's free -// Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { - /* - coords := switch_getPacketCoords(packet) - table := t.getTable() - port := table.lookup(coords) - ports := t.core.peers.getPorts() - peer := ports[port] - if peer == nil { - // FIXME hack, if the peer disappeared durring a race then don't buffer - return true, 0 - } - if _, isIdle := idle[port]; isIdle || port == 0 { - // Either no closer peers, or the closest peer is idle - delete(idle, port) - peer.sendPacketsFrom(t, [][]byte{packet}) - return true, port - } - // There's a closer peer, but it's not idle, so buffer it - return false, port - */ - return true, 0 -} - -// Info about a buffered packet -type switch_packetInfo struct { - bytes []byte - time time.Time // Timestamp of when the packet arrived -} - -// Used to keep track of buffered packets -type switch_buffer struct { - packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large - size uint64 // Total queue size in bytes -} - -type switch_buffers struct { - totalMaxSize uint64 - bufs map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID - size uint64 // Total size of all buffers, in bytes - maxbufs int - maxsize uint64 -} - -func (b *switch_buffers) _cleanup(t *switchTable) { - /* - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) - } - b.size -= buf.size - delete(pbufs, streamID) - } - } - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } - - for b.size > b.totalMaxSize { - // Drop a random queue - target := rand.Uint64() % b.size - var size uint64 // running total - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - size += buf.size - if size < target { - continue - } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(pbufs, streamID) - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[streamID] = buf - } - break - } - } - } - */ -} - -// Handles incoming idle notifications -// Loops over packets and sends the newest one that's OK for this peer to send -// Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) _handleIdle(port switchPort) bool { - // TODO? only send packets for which this is the best next hop that isn't currently blocked sending - /* - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - var packets [][]byte - var psize int - t.queues._cleanup(t) - now := time.Now() - pbufs := t.queues.bufs[port] - for psize < 65535 { - var best *string - var bestPriority float64 - for streamID, buf := range pbufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority { - b := streamID // copy since streamID is mutated in the loop - best = &b - bestPriority = priority - } - } - if best != nil { - buf := pbufs[*best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(pbufs, *best) - if len(pbufs) == 0 { - delete(t.queues.bufs, port) - } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[*best] = buf - - } - packets = append(packets, packet.bytes) - psize += len(packet.bytes) - } else { - // Finished finding packets - break - } - } - if len(packets) > 0 { - to.sendPacketsFrom(t, packets) - return true - } - return false - */ - return false -} - -func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { - t.Act(from, func() { - t._packetIn(bytes) - }) -} - -func (t *switchTable) _packetIn(bytes []byte) { - // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if sent, best := t._handleIn(bytes, t.idle); !sent { - // There's nobody free to take it right now, so queue it for later - packet := switch_packetInfo{bytes, time.Now()} - streamID := switch_getPacketStreamID(packet.bytes) - if _, isIn := t.queues.bufs[best]; !isIn { - t.queues.bufs[best] = make(map[string]switch_buffer) - } - buf, bufExists := t.queues.bufs[best][streamID] - buf.packets = append(buf.packets, packet) - buf.size += uint64(len(packet.bytes)) - t.queues.size += uint64(len(packet.bytes)) - // Keep a track of the max total queue size - if t.queues.size > t.queues.maxsize { - t.queues.maxsize = t.queues.size - } - t.queues.bufs[best][streamID] = buf - if !bufExists { - // Keep a track of the max total queue count. Only recalculate this - // when the queue is new because otherwise repeating len(dict) might - // cause unnecessary processing overhead - var count int - for _, pbufs := range t.queues.bufs { - count += len(pbufs) - } - if count > t.queues.maxbufs { - t.queues.maxbufs = count - } - } - t.queues._cleanup(t) - } -} - -func (t *switchTable) _idleIn(port switchPort) { - // Try to find something to send to this peer - if !t._handleIdle(port) { - // Didn't find anything ready to send yet, so stay idle - t.idle[port] = struct{}{} - } -} From 09efdfef9a5ec99ac8ce38c179063127bb6cebad Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 3 Apr 2020 19:26:48 -0500 Subject: [PATCH 8/9] fix bug in switch actor's cleanRoot, strict nonce handling at the session level, and add separate queues per stream to the packetqueue code --- src/yggdrasil/packetqueue.go | 115 ++++++++++++++++++++++++++++++----- src/yggdrasil/session.go | 12 +--- src/yggdrasil/switch.go | 4 +- 3 files changed, 105 insertions(+), 26 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index ac66c0da..ff717258 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,38 +1,125 @@ package yggdrasil -import "github.com/yggdrasil-network/yggdrasil-go/src/util" +import ( + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) // TODO take max size from config -const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB +const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB + +type pqStreamID string + +type pqPacketInfo struct { + packet []byte + time time.Time +} + +type pqStream struct { + infos []pqPacketInfo + size uint64 +} // TODO separate queues per e.g. traffic flow type packetQueue struct { - packets [][]byte - size uint32 + streams map[pqStreamID]pqStream + size uint64 } func (q *packetQueue) cleanup() { for q.size > MAX_PACKET_QUEUE_SIZE { - if packet, success := q.pop(); success { - util.PutBytes(packet) + // TODO? drop from a random stream + // odds proportional to size? bandwidth? + // always using the worst is exploitable -> flood 1 packet per random stream + // find the stream that's using the most bandwidth + now := time.Now() + var worst pqStreamID + for id := range q.streams { + worst = id + break // get a random ID to start + } + worstStream := q.streams[worst] + worstSize := float64(worstStream.size) + worstAge := now.Sub(worstStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if worstSize*thisAge < thisSize*worstAge { + // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth + worst = id + worstStream = stream + worstSize = thisSize + worstAge = thisAge + } + } + // Drop the oldest packet from the worst stream + packet := worstStream.infos[0].packet + worstStream.infos = worstStream.infos[1:] + worstStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + util.PutBytes(packet) + // save the modified stream to queues + if len(worstStream.infos) > 0 { + q.streams[worst] = worstStream } else { - panic("attempted to drop packet from empty queue") - break + delete(q.streams, worst) } } } func (q *packetQueue) push(packet []byte) { - q.packets = append(q.packets, packet) - q.size += uint32(len(packet)) + if q.streams == nil { + q.streams = make(map[pqStreamID]pqStream) + } + // get stream + id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + stream := q.streams[id] + // update stream + stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()}) + stream.size += uint64(len(packet)) + // save update to queues + q.streams[id] = stream + q.size += uint64(len(packet)) q.cleanup() } func (q *packetQueue) pop() ([]byte, bool) { - if len(q.packets) > 0 { - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint32(len(packet)) + if len(q.streams) > 0 { + // get the stream that uses the least bandwidth + now := time.Now() + var best pqStreamID + for id := range q.streams { + best = id + break // get a random ID to start + } + bestStream := q.streams[best] + bestSize := float64(bestStream.size) + bestAge := now.Sub(bestStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if bestSize*thisAge > thisSize*bestAge { + // bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth + best = id + bestStream = stream + bestSize = thisSize + bestAge = thisAge + } + } + // get the oldest packet from the best stream + packet := bestStream.infos[0].packet + bestStream.infos = bestStream.infos[1:] + bestStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + // save the modified stream to queues + if len(bestStream.infos) > 0 { + q.streams[best] = bestStream + } else { + delete(q.streams, best) + } return packet, true } return nil, false diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 01c2cdfb..223ea33f 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -16,9 +16,6 @@ import ( "github.com/Arceliar/phony" ) -// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery -const nonceWindow = time.Second - // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { @@ -394,14 +391,9 @@ func (sinfo *sessionInfo) _getMTU() MTU { return sinfo.myMTU } -// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. +// Checks if a packet's nonce is newer than any previously received func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { - // The bitmask is to allow for some non-duplicate out-of-order packets - if theirNonce.Minus(&sinfo.theirNonce) > 0 { - // This is newer than the newest nonce we've seen - return true - } - return time.Since(sinfo.time) < nonceWindow + return theirNonce.Minus(&sinfo.theirNonce) > 0 } // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 091596b5..4f9044cd 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -227,10 +227,10 @@ func (t *switchTable) _cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - defer t._updateTable() - t.core.router.reset(nil) + defer t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} + t._updateTable() // updates base copy of switch msg in lookupTable t.core.peers.sendSwitchMsgs(t) } } From 9d0969db2be1bff624a641158544db687f3d2427 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 5 Apr 2020 14:57:05 -0500 Subject: [PATCH 9/9] prevent a hypothetical block on link message sending --- src/yggdrasil/link.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 15017993..78986286 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -229,10 +229,18 @@ func (intf *linkInterface) handler() error { intf.peer.Act(nil, intf.peer._removeSelf) }() intf.peer.out = func(msgs [][]byte) { - intf.writer.sendFrom(intf.peer, msgs, false) + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, msgs, false) } intf.peer.linkOut = func(bs []byte) { - intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) } themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String()