diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 31ece6b8..66ee9b81 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -123,10 +123,10 @@ func (c *Core) GetPeers() []Peer { var info Peer phony.Block(p, func() { info = Peer{ - Endpoint: p.intf.name, + Endpoint: p.intf.name(), BytesSent: p.bytesSent, BytesRecvd: p.bytesRecvd, - Protocol: p.intf.info.linkType, + Protocol: p.intf.interfaceType(), Port: uint64(port), Uptime: time.Since(p.firstSeen), } @@ -163,8 +163,8 @@ func (c *Core) GetSwitchPeers() []SwitchPeer { BytesSent: peer.bytesSent, BytesRecvd: peer.bytesRecvd, Port: uint64(elem.port), - Protocol: peer.intf.info.linkType, - Endpoint: peer.intf.info.remote, + Protocol: peer.intf.interfaceType(), + Endpoint: peer.intf.remote(), } copy(info.PublicKey[:], peer.box[:]) }) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 539d0488..3b3cfdb6 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -47,7 +47,7 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string + lname string link *link peer *peer msgIO linkInterfaceMsgIO @@ -125,7 +125,7 @@ func (l *link) listen(uri string) error { func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging intf := linkInterface{ - name: name, + lname: name, link: l, msgIO: msgIO, info: linkInfo{ @@ -178,7 +178,7 @@ func (intf *linkInterface) handler() error { } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.link.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } // Check if we're authorized to connect to this key / IP @@ -217,23 +217,9 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - out := func(msgs [][]byte) { - // nil to prevent it from blocking if the link is somehow frozen - // this is safe because another packet won't be sent until the link notifies - // the peer that it's ready for one - intf.writer.sendFrom(nil, msgs, false) - } - linkOut := func(bs []byte) { - // nil to prevent it from blocking if the link is somehow frozen - // FIXME this is hypothetically not safe, the peer shouldn't be sending - // additional packets until this one finishes, otherwise this could leak - // memory if writing happens slower than link packets are generated... - // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) - } phony.Block(&intf.link.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut) + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -275,6 +261,58 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// +// linkInterface needs to match the peerInterface type needed by the peers + +func (intf *linkInterface) out(bss [][]byte) { + intf.Act(nil, func() { + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, bss, false) + }) +} + +func (intf *linkInterface) linkOut(bs []byte) { + intf.Act(nil, func() { + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) + }) +} + +func (intf *linkInterface) notifyQueued(seq uint64) { + // This is the part where we want non-nil 'from' fields + intf.Act(intf.peer, func() { + if !intf.isIdle { + intf.peer.dropFromQueue(intf, seq) + } + }) +} + +func (intf *linkInterface) close() { + intf.Act(nil, func() { intf.msgIO.close() }) +} + +func (intf *linkInterface) name() string { + return intf.lname +} + +func (intf *linkInterface) local() string { + return intf.info.local +} + +func (intf *linkInterface) remote() string { + return intf.info.remote +} + +func (intf *linkInterface) interfaceType() string { + return intf.info.linkType +} + +//////////////////////////////////////////////////////////////////////////////// const ( sendTime = 1 * time.Second // How long to wait before deciding a send is blocked keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 2000ffa6..7abdaea7 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -4,9 +4,6 @@ import ( "time" ) -// TODO take max size from config -const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB - type pqStreamID string type pqPacketInfo struct { @@ -25,46 +22,50 @@ type packetQueue struct { size uint64 } -func (q *packetQueue) cleanup() { - for q.size > MAX_PACKET_QUEUE_SIZE { - // TODO? drop from a random stream - // odds proportional to size? bandwidth? - // always using the worst is exploitable -> flood 1 packet per random stream - // find the stream that's using the most bandwidth - now := time.Now() - var worst pqStreamID - for id := range q.streams { +// drop will remove a packet from the queue, returning it to the pool +// returns true if a packet was removed, false otherwise +func (q *packetQueue) drop() bool { + if q.size == 0 { + return false + } + // TODO? drop from a random stream + // odds proportional to size? bandwidth? + // always using the worst is exploitable -> flood 1 packet per random stream + // find the stream that's using the most bandwidth + now := time.Now() + var worst pqStreamID + for id := range q.streams { + worst = id + break // get a random ID to start + } + worstStream := q.streams[worst] + worstSize := float64(worstStream.size) + worstAge := now.Sub(worstStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if worstSize*thisAge < thisSize*worstAge { + // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth worst = id - break // get a random ID to start - } - worstStream := q.streams[worst] - worstSize := float64(worstStream.size) - worstAge := now.Sub(worstStream.infos[0].time).Seconds() - for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if worstSize*thisAge < thisSize*worstAge { - // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth - worst = id - worstStream = stream - worstSize = thisSize - worstAge = thisAge - } - } - // Drop the oldest packet from the worst stream - packet := worstStream.infos[0].packet - worstStream.infos = worstStream.infos[1:] - worstStream.size -= uint64(len(packet)) - q.size -= uint64(len(packet)) - pool_putBytes(packet) - // save the modified stream to queues - if len(worstStream.infos) > 0 { - q.streams[worst] = worstStream - } else { - delete(q.streams, worst) + worstStream = stream + worstSize = thisSize + worstAge = thisAge } } + // Drop the oldest packet from the worst stream + packet := worstStream.infos[0].packet + worstStream.infos = worstStream.infos[1:] + worstStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + pool_putBytes(packet) + // save the modified stream to queues + if len(worstStream.infos) > 0 { + q.streams[worst] = worstStream + } else { + delete(q.streams, worst) + } + return true } func (q *packetQueue) push(packet []byte) { @@ -80,7 +81,6 @@ func (q *packetQueue) push(packet []byte) { // save update to queues q.streams[id] = stream q.size += uint64(len(packet)) - q.cleanup() } func (q *packetQueue) pop() ([]byte, bool) { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 31bba661..31ea5f46 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -77,29 +77,38 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } +type peerInterface interface { + out([][]byte) + linkOut([]byte) + notifyQueued(uint64) + close() + // These next ones are only used by the API + name() string + local() string + remote() string + interfaceType() string +} + // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox core *Core - intf *linkInterface + intf peerInterface port switchPort box crypto.BoxPubKey sig crypto.SigPubKey shared crypto.BoxSharedKey linkShared crypto.BoxSharedKey endpoint string - firstSeen time.Time // To track uptime for getPeers - linkOut func([]byte) // used for protocol traffic (bypasses the switch) - dinfo *dhtInfo // used to keep the DHT working - out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes - done (chan struct{}) // closed to exit the linkLoop - close func() // Called when a peer is removed, to close the underlying connection, or via admin api + firstSeen time.Time // To track uptime for getPeers + dinfo *dhtInfo // used to keep the DHT working // The below aren't actually useful internally, they're just gathered for getPeers statistics bytesSent uint64 bytesRecvd uint64 ports map[switchPort]*peer table *lookupTable queue packetQueue + seq uint64 // this and idle are used to detect when to drop packets from queue idle bool } @@ -123,19 +132,15 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer { now := time.Now() p := peer{box: *box, + core: ps.core, + intf: intf, sig: *sig, shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), linkShared: *linkShared, firstSeen: now, - done: make(chan struct{}), - close: closer, - core: ps.core, - intf: intf, - out: out, - linkOut: linkOut, } oldPorts := ps.ports newPorts := make(map[switchPort]*peer) @@ -172,10 +177,7 @@ func (ps *peers) _removePeer(p *peer) { newPorts[k] = v } delete(newPorts, p.port) - if p.close != nil { - p.close() - } - close(p.done) + p.intf.close() ps.ports = newPorts ps._updatePeers() } @@ -295,12 +297,26 @@ func (p *peer) _handleIdle() { } if len(packets) > 0 { p.bytesSent += uint64(size) - p.out(packets) + p.intf.out(packets) } else { p.idle = true } } +func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { + p.Act(from, func() { + switch { + case seq != p.seq: + case p.queue.drop(): + p.intf.notifyQueued(p.seq) + } + if seq != p.seq { + return + } + + }) +} + // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) _sendLinkPacket(packet []byte) { @@ -316,7 +332,7 @@ func (p *peer) _sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - p.linkOut(packet) + p.intf.linkOut(packet) } // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 1bb14c4c..303ada69 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -45,6 +45,8 @@ type router struct { nodeinfo nodeinfo searches searches sessions sessions + intf routerInterface + peer *peer table *lookupTable // has a copy of our locator } @@ -53,28 +55,17 @@ func (r *router) init(core *Core) { r.core = core r.addr = *address.AddrForNodeID(&r.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) - self := linkInterface{ - name: "(self)", - info: linkInfo{ - local: "(self)", - remote: "(self)", - linkType: "self", - }, - } - var p *peer - peerOut := func(packets [][]byte) { - r.handlePackets(p, packets) - r.Act(p, func() { - // after the router handle the packets, notify the peer that it's ready for more - p.Act(r, p._handleIdle) - }) - } + r.intf.router = r phony.Block(&r.core.peers, func() { // FIXME don't block here! - p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil) + r.peer = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &r.intf) }) - p.Act(r, p._handleIdle) - r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } + r.peer.Act(r, r.peer._handleIdle) + r.out = func(bs []byte) { + r.intf.Act(r, func() { + r.peer.handlePacketFrom(&r.intf, bs) + }) + } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy) @@ -123,15 +114,6 @@ func (r *router) start() error { return nil } -// In practice, the switch will call this with 1 packet -func (r *router) handlePackets(from phony.Actor, packets [][]byte) { - r.Act(from, func() { - for _, packet := range packets { - r._handlePacket(packet) - } - }) -} - // Insert a peer info into the dht, TODO? make the dht a separate actor func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { r.Act(from, func() { @@ -275,3 +257,58 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { req.SendPermPub = *fromKey r.nodeinfo.handleNodeInfo(r, &req) } + +//////////////////////////////////////////////////////////////////////////////// + +// routerInterface is a helper that implements peerInterface +type routerInterface struct { + phony.Inbox + router *router + busy bool +} + +func (intf *routerInterface) out(bss [][]byte) { + intf.Act(intf.router.peer, func() { + intf.router.Act(intf, func() { + for _, bs := range bss { + intf.router._handlePacket(bs) + } + // we may block due to the above + // so we send a message to ourself, that we'd handle after unblocking + // that message tells us to tell the interface that we're finally idle again + intf.router.Act(nil, func() { + intf.Act(intf.router, intf._handleIdle) + }) + intf.Act(intf.router, intf._handleBusy) + }) + }) +} + +func (intf *routerInterface) _handleBusy() { + intf.busy = true +} + +func (intf *routerInterface) _handleIdle() { + intf.busy = false + intf.router.peer.Act(intf, intf.router.peer._handleIdle) +} + +func (intf *routerInterface) linkOut(_ []byte) {} + +func (intf *routerInterface) notifyQueued(seq uint64) { + intf.Act(intf.router.peer, func() { + if intf.busy { + intf.router.peer.dropFromQueue(intf, seq) + } + }) +} + +func (intf *routerInterface) close() {} + +func (intf *routerInterface) name() string { return "(self)" } + +func (intf *routerInterface) local() string { return "(self)" } + +func (intf *routerInterface) remote() string { return "(self)" } + +func (intf *routerInterface) interfaceType() string { return "self" }