diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 5fecc15d..9d3866f8 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -90,6 +90,10 @@ func (a *admin) init(c *Core, listenaddr string) { } return admin_info{"switchpeers": switchpeers}, nil }) + a.addHandler("getSwitchQueues", []string{}, func(in admin_info) (admin_info, error) { + queues := a.getData_getSwitchQueues() + return admin_info{"switchqueues": queues.asMap()}, nil + }) a.addHandler("getDHT", []string{}, func(in admin_info) (admin_info, error) { sort := "ip" dht := make(admin_info) @@ -518,6 +522,35 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { return peerInfos } +// getData_getSwitchQueues returns info from Core.switchTable for an queue data. +func (a *admin) getData_getSwitchQueues() admin_nodeInfo { + var peerInfos admin_nodeInfo + switchTable := a.core.switchTable + getSwitchQueues := func() { + queues := make([]map[string]interface{}, 0) + for k, v := range switchTable.queues.bufs { + nexthop := switchTable.bestPortForCoords([]byte(k)) + queue := map[string]interface{}{ + "queue_id": k, + "queue_size": v.size, + "queue_packets": len(v.packets), + "queue_port": nexthop, + } + queues = append(queues, queue) + } + peerInfos = admin_nodeInfo{ + {"queues", queues}, + {"queues_count", len(switchTable.queues.bufs)}, + {"queues_size", switchTable.queues.size}, + {"highest_queues_count", switchTable.queues.maxbufs}, + {"highest_queues_size", switchTable.queues.maxsize}, + {"maximum_queues_size", switch_buffer_maxSize}, + } + } + a.core.switchTable.doAdmin(getSwitchQueues) + return peerInfos +} + // getData_getDHT returns info from Core.dht for an admin response. func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 63380dad..d6e981e7 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -161,11 +161,13 @@ type switchTable struct { parent switchPort // Port of whatever peer is our parent, or self if we're root drop map[sigPubKey]int64 // Tstamp associated with a dropped root mutex sync.RWMutex // Lock for reads/writes of switchData - data switchData - updater atomic.Value //*sync.Once - table atomic.Value //lookupTable - packetIn chan []byte // Incoming packets for the worker to handle - idleIn chan switchPort // Incoming idle notifications from peer links + data switchData // + updater atomic.Value // *sync.Once + table atomic.Value // lookupTable + packetIn chan []byte // Incoming packets for the worker to handle + idleIn chan switchPort // Incoming idle notifications from peer links + admin chan func() // Pass a lambda for the admin socket to query stuff + queues switch_buffers // Queues - not atomic so ONLY use through admin chan } // Initializes the switchTable struct. @@ -181,6 +183,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.drop = make(map[sigPubKey]int64) t.packetIn = make(chan []byte, 1024) t.idleIn = make(chan switchPort, 1024) + t.admin = make(chan func()) } // Safely gets a copy of this node's locator. @@ -538,6 +541,22 @@ func switch_getPacketStreamID(packet []byte) string { return string(switch_getPacketCoords(packet)) } +// Find the best port for a given set of coords +func (t *switchTable) bestPortForCoords(coords []byte) switchPort { + table := t.getTable() + var best switchPort + bestDist := table.self.dist(coords) + for to, elem := range table.elems { + dist := elem.locator.dist(coords) + if !(dist < bestDist) { + continue + } + best = to + bestDist = dist + } + return best +} + // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued @@ -582,6 +601,8 @@ type switch_packetInfo struct { time time.Time // Timestamp of when the packet arrived } +const switch_buffer_maxSize = 4 * 1048576 // Maximum 4 MB + // Used to keep track of buffered packets type switch_buffer struct { packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large @@ -589,8 +610,10 @@ type switch_buffer struct { } type switch_buffers struct { - bufs map[string]switch_buffer // Buffers indexed by StreamID - size uint64 // Total size of all buffers, in bytes + bufs map[string]switch_buffer // Buffers indexed by StreamID + size uint64 // Total size of all buffers, in bytes + maxbufs int + maxsize uint64 } func (b *switch_buffers) cleanup(t *switchTable) { @@ -606,8 +629,8 @@ func (b *switch_buffers) cleanup(t *switchTable) { delete(b.bufs, streamID) } } - const maxSize = 4 * 1048576 // Maximum 4 MB - for b.size > maxSize { + + for b.size > switch_buffer_maxSize { // Drop a random queue target := rand.Uint64() % b.size var size uint64 // running total @@ -635,16 +658,16 @@ func (b *switch_buffers) cleanup(t *switchTable) { // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool { +func (t *switchTable) handleIdle(port switchPort) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string var bestPriority float64 - bufs.cleanup(t) + t.queues.cleanup(t) now := time.Now() - for streamID, buf := range bufs.bufs { + for streamID, buf := range t.queues.bufs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue packet := buf.packets[0] @@ -656,17 +679,17 @@ func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool { } } if bestPriority != 0 { - buf := bufs.bufs[best] + buf := t.queues.bufs[best] var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] buf.size -= uint64(len(packet.bytes)) - bufs.size -= uint64(len(packet.bytes)) + t.queues.size -= uint64(len(packet.bytes)) if len(buf.packets) == 0 { - delete(bufs.bufs, best) + delete(t.queues.bufs, best) } else { // Need to update the map, since buf was retrieved by value - bufs.bufs[best] = buf + t.queues.bufs[best] = buf } to.sendPacket(packet.bytes) return true @@ -677,9 +700,8 @@ func (t *switchTable) handleIdle(port switchPort, bufs *switch_buffers) bool { // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - var bufs switch_buffers - bufs.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) - idle := make(map[switchPort]struct{}) // this is to deduplicate things + t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) + idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { case bytes := <-t.packetIn: @@ -688,19 +710,47 @@ func (t *switchTable) doWorker() { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) - buf := bufs.bufs[streamID] + buf, bufExists := t.queues.bufs[streamID] buf.packets = append(buf.packets, packet) buf.size += uint64(len(packet.bytes)) - bufs.size += uint64(len(packet.bytes)) - bufs.bufs[streamID] = buf - bufs.cleanup(t) + t.queues.size += uint64(len(packet.bytes)) + // Keep a track of the max total queue size + if t.queues.size > t.queues.maxsize { + t.queues.maxsize = t.queues.size + } + t.queues.bufs[streamID] = buf + if !bufExists { + // Keep a track of the max total queue count. Only recalculate this + // when the queue is new because otherwise repeating len(dict) might + // cause unnecessary processing overhead + if len(t.queues.bufs) > t.queues.maxbufs { + t.queues.maxbufs = len(t.queues.bufs) + } + } + t.queues.cleanup(t) } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, &bufs) { + if !t.handleIdle(port) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } + case f := <-t.admin: + f() } } } + +// Passed a function to call. +// This will send the function to t.admin and block until it finishes. +func (t *switchTable) doAdmin(f func()) { + // Pass this a function that needs to be run by the router's main goroutine + // It will pass the function to the router and wait for the router to finish + done := make(chan struct{}) + newF := func() { + f() + close(done) + } + t.admin <- newF + <-done +} diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 82673b9c..1942acd0 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -165,7 +165,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { return } else { if ief.Flags & net.FlagUp == 0 { - return + return } addrs, err := ief.Addrs() if err == nil { diff --git a/yggdrasilctl.go b/yggdrasilctl.go index 9d3fbb14..d98386b7 100644 --- a/yggdrasilctl.go +++ b/yggdrasilctl.go @@ -183,6 +183,54 @@ func main() { fmt.Println("Coords:", coords) } } + case "getswitchqueues": + maximumqueuesize := float64(4194304) + portqueues := make(map[float64]float64) + portqueuesize := make(map[float64]float64) + portqueuepackets := make(map[float64]float64) + v := res["switchqueues"].(map[string]interface{}) + if queuecount, ok := v["queues_count"].(float64); ok { + fmt.Printf("Active queue count: %d queues\n", uint(queuecount)) + } + if queuesize, ok := v["queues_size"].(float64); ok { + fmt.Printf("Active queue size: %d bytes\n", uint(queuesize)) + } + if highestqueuecount, ok := v["highest_queues_count"].(float64); ok { + fmt.Printf("Highest queue count: %d queues\n", uint(highestqueuecount)) + } + if highestqueuesize, ok := v["highest_queues_size"].(float64); ok { + fmt.Printf("Highest queue size: %d bytes\n", uint(highestqueuesize)) + } + if m, ok := v["maximum_queues_size"].(float64); ok { + fmt.Printf("Maximum queue size: %d bytes\n", uint(maximumqueuesize)) + maximumqueuesize = m + } + if queues, ok := v["queues"].([]interface{}); ok { + if len(queues) != 0 { + fmt.Println("Active queues:") + for _, v := range queues { + queueport := v.(map[string]interface{})["queue_port"].(float64) + queuesize := v.(map[string]interface{})["queue_size"].(float64) + queuepackets := v.(map[string]interface{})["queue_packets"].(float64) + queueid := v.(map[string]interface{})["queue_id"].(string) + portqueues[queueport] += 1 + portqueuesize[queueport] += queuesize + portqueuepackets[queueport] += queuepackets + queuesizepercent := (100 / maximumqueuesize) * queuesize + fmt.Printf("- Switch port %d, Stream ID: %v, size: %d bytes (%d%% full), %d packets\n", + uint(queueport), []byte(queueid), uint(queuesize), + uint(queuesizepercent), uint(queuepackets)) + } + } + } + if len(portqueuesize) > 0 && len(portqueuepackets) > 0 { + fmt.Println("Aggregated statistics by switchport:") + for k, v := range portqueuesize { + queuesizepercent := (100 / (portqueues[k] * maximumqueuesize)) * v + fmt.Printf("- Switch port %d, size: %d bytes (%d%% full), %d packets\n", + uint(k), uint(v), uint(queuesizepercent), uint(portqueuepackets[k])) + } + } case "addpeer", "removepeer", "addallowedencryptionpublickey", "removeallowedencryptionpublickey": if _, ok := res["added"]; ok { for _, v := range res["added"].([]interface{}) {