From 8d259748672d2cb8470e096bd01133d0a9908242 Mon Sep 17 00:00:00 2001 From: frekky Date: Tue, 10 Nov 2015 20:49:44 +0800 Subject: [PATCH] Increased bad fragment tolerance to keep data flowing --- src/window.c | 100 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 27 deletions(-) diff --git a/src/window.c b/src/window.c index 1e20d93..36f4946 100644 --- a/src/window.c +++ b/src/window.c @@ -24,6 +24,7 @@ #include #include "common.h" +#include "util.h" #include "window.h" int window_debug = 0; @@ -123,33 +124,49 @@ window_append_fragment(struct frag_buffer *w, fragment *src) return 1; } + +ssize_t +window_process_incoming_fragment(struct frag_buffer *w, fragment *f) /* Handles fragment received from the sending side (RECV) * Returns index of fragment in window or <0 if dropped * The next ACK MUST be for this fragment */ -ssize_t -window_process_incoming_fragment(struct frag_buffer *w, fragment *f) { /* Check if packet is in window */ - unsigned startid, endid; + unsigned startid, endid, offset; fragment *fd; startid = w->start_seq_id; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; + offset = SEQ_OFFSET(startid, f->seqID); + if (!INWINDOW_SEQ(startid, endid, f->seqID)) { - WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); w->oos++; - return -1; + if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) { + /* Only drop the fragment if it is ancient */ + WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid); + return -1; + } else { + /* Save "new" fragments to avoid causing other end to advance + * when this fragment is ACK'd despite being dropped */ + WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).", + f->seqID, offset, startid, w->windowsize); + } } /* Place fragment into correct location in buffer */ ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); + /* Check if fragment already received */ fd = &w->frags[dest]; if (fd->len != 0) { WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); - if (f->seqID == fd->seqID) + if (f->seqID == fd->seqID) { + /* use retries as counter for dupes */ + fd->retries ++; return -1; + } } + memcpy(fd, f, sizeof(fragment)); w->numitems ++; @@ -173,22 +190,24 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int if (w->direction != WINDOW_RECVING) return 0; if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) { - WDEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!", + WDEBUG("chunk_start (%lu) pointing to non-start fragment (seq %u, len %lu)!", w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len); return 0; } if (compression) *compression = 1; fragment *f; - size_t i, curseq; + size_t i; + unsigned curseq; int end = 0; curseq = w->frags[w->chunk_start].seqID; for (i = 0; i < w->numitems; ++i) { woffs = WRAP(w->chunk_start + i); f = &w->frags[woffs]; fraglen = f->len; - if (fraglen == 0 || !f->data || f->seqID != curseq) { - WDEBUG("data missing! Not reassembling!"); + if (fraglen == 0 || f->seqID != curseq) { + WDEBUG("Missing next frag %u [%lu], got seq %u (%lu bytes) instead! Not reassembling!", + curseq, woffs, f->seqID, fraglen); return 0; } @@ -200,8 +219,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int if (compression) { *compression &= f->compressed & 1; if (f->compressed != *compression) { - WDEBUG("Inconsistent compression flags in chunk. Not reassembling!"); - return 0; + WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!"); } } if (fraglen > maxlen) { @@ -218,13 +236,18 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int break; } + /* Move position counters and expected next seqID */ maxlen -= fraglen; curseq = (curseq + 1) % MAX_SEQ_ID; } - if (end == 0) { /* no end of chunk found but reached end of data */ + + if (end == 0) { + /* no end of chunk found because the window buffer has no more frags + * meaning they haven't been received yet. */ return 0; } - WDEBUG("Reassembling %lu bytes of data from %lu frags; compression %d!", datalen, i + 1, *compression); + + WDEBUG("Reassembled %lu bytes from %lu frags; %scompressed!", datalen, i + 1, *compression ? "" : "un"); /* Clear all used fragments */ size_t p; ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, @@ -235,15 +258,19 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int return datalen; } -/* Returns number of fragments that can be sent immediately; effectively - * the same as window_get_next_sending_fragment but without changing anything. */ size_t -window_sending(struct frag_buffer *w) +window_sending(struct frag_buffer *w, struct timeval *nextresend) +/* Returns number of fragments that can be sent immediately; effectively + the same as window_get_next_sending_fragment but without doing anything. + *nextresend is time before the next frag will be resent */ { - struct timeval timeout, now; + struct timeval age, now, oldest; fragment *f; size_t tosend = 0; + oldest.tv_sec = 0; + oldest.tv_usec = 0; + if (w->numitems == 0) return 0; @@ -253,12 +280,30 @@ window_sending(struct frag_buffer *w) f = &w->frags[WRAP(w->window_start + i)]; if (f->len == 0 || f->acks >= 1) continue; - timeradd(&w->timeout, &f->lastsent, &timeout); - if (f->retries < 1 || !timercmp(&now, &timeout, <)) { - /* Fragment not sent or timed out (to be re-sent) */ + if (f->retries < 1 || f->lastsent.tv_sec == 0) { + /* Sending frag for first time + * Note: if retries==0 then lastsent MUST also be 0 */ tosend++; + } else { + /* Frag has been sent before so lastsent is a valid timestamp */ + timersub(&now, &f->lastsent, &age); + + if (!timercmp(&age, &w->timeout, <)) { + /* ACK timeout: Frag will be resent */ + tosend++; + } else if (timercmp(&age, &oldest, >)) { + /* Hasn't timed out yet and is oldest so far */ + oldest = age; + } } } + + if (nextresend) { + /* nextresend = time before oldest fragment (not being sent now) + * will be re-sent = timeout - age */ + timersub(&w->timeout, &oldest, nextresend); + } + return tosend; } @@ -267,7 +312,7 @@ window_sending(struct frag_buffer *w) fragment * window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) { - struct timeval timeout, now; + struct timeval age, now; fragment *f = NULL; if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) @@ -277,13 +322,14 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; - if (f->acks >= 1) continue; + if (f->acks >= 1 || f->len == 0) continue; - timeradd(&w->timeout, &f->lastsent, &timeout); + timersub(&now, &f->lastsent, &age); - if (f->retries >= 1 && !timercmp(&now, &timeout, <)) { - /* Fragment sent before, not ACK'd */ - WDEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); + if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) { + /* Resending fragment due to ACK timeout */ + WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/total %u", + f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->resends); w->resends ++; goto found; } else if (f->retries == 0 && f->len > 0) {