From 761ae531cb7eddf5c88779c746a0f41b67934b6d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 25 May 2020 15:19:32 -0500 Subject: [PATCH 1/3] work-in-progress faster queue logic --- src/yggdrasil/packetqueue.go | 85 +++++++----------------------------- 1 file changed, 16 insertions(+), 69 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 464bc6ce..d91a18ef 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,10 +1,15 @@ package yggdrasil +/* import ( "math/rand" "time" ) +*/ +// TODO separate queues per e.g. traffic flow +// For now, we put everything in queue +/* type pqStreamID string type pqPacketInfo struct { @@ -13,13 +18,15 @@ type pqPacketInfo struct { } type pqStream struct { + id string infos []pqPacketInfo - size uint64 + size int } +*/ -// TODO separate queues per e.g. traffic flow type packetQueue struct { - streams map[pqStreamID]pqStream + //streams []pqStream + packets [][]byte size uint64 } @@ -29,83 +36,23 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - // select a random stream, odds based on stream size - offset := rand.Uint64() % q.size - var worst pqStreamID - var size uint64 - for id, stream := range q.streams { - worst = id - size += stream.size - if size >= offset { - break - } - } - // drop the oldest packet from the stream - worstStream := q.streams[worst] - packet := worstStream.infos[0].packet - worstStream.infos = worstStream.infos[1:] - worstStream.size -= uint64(len(packet)) + packet := q.packets[0] + q.packets = q.packets[1:] q.size -= uint64(len(packet)) pool_putBytes(packet) - // save the modified stream to queues - if len(worstStream.infos) > 0 { - q.streams[worst] = worstStream - } else { - delete(q.streams, worst) - } return true } func (q *packetQueue) push(packet []byte) { - if q.streams == nil { - q.streams = make(map[pqStreamID]pqStream) - } - // get stream - id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now - stream := q.streams[id] - // update stream - stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()}) - stream.size += uint64(len(packet)) - // save update to queues - q.streams[id] = stream + q.packets = append(q.packets, packet) q.size += uint64(len(packet)) } func (q *packetQueue) pop() ([]byte, bool) { - if len(q.streams) > 0 { - // get the stream that uses the least bandwidth - now := time.Now() - var best pqStreamID - for id := range q.streams { - best = id - break // get a random ID to start - } - bestStream := q.streams[best] - bestSize := float64(bestStream.size) - bestAge := now.Sub(bestStream.infos[0].time).Seconds() - for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if bestSize*thisAge > thisSize*bestAge { - // bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth - best = id - bestStream = stream - bestSize = thisSize - bestAge = thisAge - } - } - // get the oldest packet from the best stream - packet := bestStream.infos[0].packet - bestStream.infos = bestStream.infos[1:] - bestStream.size -= uint64(len(packet)) + if q.size > 0 { + packet := q.packets[0] + q.packets = q.packets[1:] q.size -= uint64(len(packet)) - // save the modified stream to queues - if len(bestStream.infos) > 0 { - q.streams[best] = bestStream - } else { - delete(q.streams, best) - } return packet, true } return nil, false From 1f65ffb310c12343bcd56a53399661046f5490a7 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 25 May 2020 16:07:56 -0500 Subject: [PATCH 2/3] work-in-progress heap-based queue structure --- src/yggdrasil/packetqueue.go | 89 ++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 19 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index d91a18ef..358aaeb1 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,15 +1,13 @@ package yggdrasil -/* import ( - "math/rand" + "container/heap" "time" ) -*/ // TODO separate queues per e.g. traffic flow // For now, we put everything in queue -/* + type pqStreamID string type pqPacketInfo struct { @@ -18,15 +16,13 @@ type pqPacketInfo struct { } type pqStream struct { - id string + id pqStreamID infos []pqPacketInfo - size int + size uint64 } -*/ type packetQueue struct { - //streams []pqStream - packets [][]byte + streams []pqStream size uint64 } @@ -36,24 +32,79 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint64(len(packet)) - pool_putBytes(packet) + var longestIdx int + for idx := range q.streams { + if q.streams[idx].size > q.streams[longestIdx].size { + longestIdx = idx + } + } + stream := heap.Remove(q, longestIdx).(pqStream) + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + heap.Push(q, stream) + } + pool_putBytes(info.packet) return true } func (q *packetQueue) push(packet []byte) { - q.packets = append(q.packets, packet) - q.size += uint64(len(packet)) + id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + info := pqPacketInfo{packet: packet, time: time.Now()} + for idx := range q.streams { + if q.streams[idx].id == id { + q.streams[idx].infos = append(q.streams[idx].infos, info) + q.streams[idx].size += uint64(len(packet)) + q.size += uint64(len(packet)) + return + } + } + stream := pqStream{id: id, size: uint64(len(packet))} + stream.infos = append(stream.infos, info) + heap.Push(q, stream) } func (q *packetQueue) pop() ([]byte, bool) { if q.size > 0 { - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint64(len(packet)) - return packet, true + stream := heap.Pop(q).(pqStream) + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + heap.Push(q, stream) + } + return info.packet, true } return nil, false } + +//////////////////////////////////////////////////////////////////////////////// + +// Interface methods for packetQueue to satisfy heap.Interface + +func (q *packetQueue) Len() int { + return len(q.streams) +} + +func (q *packetQueue) Less(i, j int) bool { + return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time) +} + +func (q *packetQueue) Swap(i, j int) { + q.streams[i], q.streams[j] = q.streams[j], q.streams[i] +} + +func (q *packetQueue) Push(x interface{}) { + stream := x.(pqStream) + q.streams = append(q.streams, stream) + q.size += stream.size +} + +func (q *packetQueue) Pop() interface{} { + idx := len(q.streams) - 1 + stream := q.streams[idx] + q.streams = q.streams[:idx] + q.size -= stream.size + return stream +} From 09f9f4e8e4fdc2aafb65bbc31a88551341dded80 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 25 May 2020 20:09:57 -0500 Subject: [PATCH 3/3] use heap.Fix instead of heap.Remove + heap.Push when updating queues, this is theoretically faster --- src/yggdrasil/packetqueue.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 358aaeb1..6273e6c8 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -38,12 +38,16 @@ func (q *packetQueue) drop() bool { longestIdx = idx } } - stream := heap.Remove(q, longestIdx).(pqStream) + stream := q.streams[longestIdx] info := stream.infos[0] if len(stream.infos) > 1 { stream.infos = stream.infos[1:] stream.size -= uint64(len(info.packet)) - heap.Push(q, stream) + q.streams[longestIdx] = stream + q.size -= uint64(len(info.packet)) + heap.Fix(q, longestIdx) + } else { + heap.Remove(q, longestIdx) } pool_putBytes(info.packet) return true @@ -67,12 +71,16 @@ func (q *packetQueue) push(packet []byte) { func (q *packetQueue) pop() ([]byte, bool) { if q.size > 0 { - stream := heap.Pop(q).(pqStream) + stream := q.streams[0] info := stream.infos[0] if len(stream.infos) > 1 { stream.infos = stream.infos[1:] stream.size -= uint64(len(info.packet)) - heap.Push(q, stream) + q.streams[0] = stream + q.size -= uint64(len(info.packet)) + heap.Fix(q, 0) + } else { + heap.Remove(q, 0) } return info.packet, true }