diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 7abdaea7..464bc6ce 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,6 +1,7 @@ package yggdrasil import ( + "math/rand" "time" ) @@ -28,32 +29,19 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - // TODO? drop from a random stream - // odds proportional to size? bandwidth? - // always using the worst is exploitable -> flood 1 packet per random stream - // find the stream that's using the most bandwidth - now := time.Now() + // select a random stream, odds based on stream size + offset := rand.Uint64() % q.size var worst pqStreamID - for id := range q.streams { - worst = id - break // get a random ID to start - } - worstStream := q.streams[worst] - worstSize := float64(worstStream.size) - worstAge := now.Sub(worstStream.infos[0].time).Seconds() + var size uint64 for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if worstSize*thisAge < thisSize*worstAge { - // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth - worst = id - worstStream = stream - worstSize = thisSize - worstAge = thisAge + worst = id + size += stream.size + if size >= offset { + break } } - // Drop the oldest packet from the worst stream + // drop the oldest packet from the stream + worstStream := q.streams[worst] packet := worstStream.infos[0].packet worstStream.infos = worstStream.infos[1:] worstStream.size -= uint64(len(packet)) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 361a0ea1..31103074 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -110,6 +110,7 @@ type peer struct { queue packetQueue seq uint64 // this and idle are used to detect when to drop packets from queue idle bool + drop bool // set to true if we're dropping packets from the queue } func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { @@ -275,13 +276,19 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { } func (p *peer) _sendPackets(packets [][]byte) { + size := p.queue.size for _, packet := range packets { p.queue.push(packet) } - if p.idle { + switch { + case p.idle: p.idle = false p._handleIdle() - } else { + case p.drop: + for p.queue.size > size { + p.queue.drop() + } + default: p.intf.notifyQueued(p.seq) } } @@ -303,17 +310,14 @@ func (p *peer) _handleIdle() { p.intf.out(packets) } else { p.idle = true + p.drop = false } } func (p *peer) dropFromQueue(from phony.Actor, seq uint64) { p.Act(from, func() { - switch { - case seq != p.seq: - case p.queue.size < streamMsgSize: - case p.queue.drop(): - p.core.log.Debugln("DEBUG dropped:", p.port, p.queue.size) - p.intf.notifyQueued(p.seq) + if seq == p.seq { + p.drop = true } }) }