diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6c5fd21d..5b72620c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -586,36 +586,66 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool } } +// Info about a buffered packet +type switch_packetInfo struct { + bytes []byte + time time.Time // Timestamp of when the packet arrived +} + +// Used to keep track of buffered packets +type switch_buffer struct { + packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large + count uint64 // Total queue size, including dropped packets +} + +func (b *switch_buffer) dropTimedOut() { + // TODO figure out what timeout makes sense + const timeout = 25 * time.Millisecond + now := time.Now() + for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout { + util_putBytes(b.packets[0].bytes) + b.packets = b.packets[1:] + } +} + // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string - var bestSize int - for streamID, packets := range buffs { + var bestSize uint64 + for streamID, buf := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[0] - coords := switch_getPacketCoords(packet) - if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { + buf.dropTimedOut() + if len(buf.packets) == 0 { + delete(buffs, streamID) + continue + } + buffs[streamID] = buf + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) { best = streamID - bestSize = len(packets) + bestSize = buf.count } } if bestSize != 0 { - packets := buffs[best] - var packet []byte - packet, packets = packets[0], packets[1:] - if len(packets) == 0 { + buf := buffs[best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.count-- + if len(buf.packets) == 0 { delete(buffs, best) } else { - buffs[best] = packets + buffs[best] = buf } - to.sendPacket(packet) + to.sendPacket(packet.bytes) return true } else { return false @@ -624,8 +654,8 @@ func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) boo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) - idle := make(map[switchPort]struct{}) // this is to deduplicate things + buffs := make(map[string]switch_buffer) // Packets per PacketStreamID (string) + idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { case packet := <-t.packetIn: @@ -633,12 +663,12 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(buffs[streamID], packet) - for len(packets) > 32 { - util_putBytes(packets[0]) - packets = packets[1:] - } - buffs[streamID] = packets + buf := buffs[streamID] + buf.dropTimedOut() + pinfo := switch_packetInfo{packet, time.Now()} + buf.packets = append(buf.packets, pinfo) + buf.count++ + buffs[streamID] = buf } case port := <-t.idleIn: // Try to find something to send to this peer