Increased bad fragment tolerance to keep data flowing

This commit is contained in:
frekky 2015-11-10 20:49:44 +08:00
parent 9dec2de448
commit 8d25974867

View File

@ -24,6 +24,7 @@
#include <stdlib.h> #include <stdlib.h>
#include "common.h" #include "common.h"
#include "util.h"
#include "window.h" #include "window.h"
int window_debug = 0; int window_debug = 0;
@ -123,33 +124,49 @@ window_append_fragment(struct frag_buffer *w, fragment *src)
return 1; return 1;
} }
ssize_t
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
/* Handles fragment received from the sending side (RECV) /* Handles fragment received from the sending side (RECV)
* Returns index of fragment in window or <0 if dropped * Returns index of fragment in window or <0 if dropped
* The next ACK MUST be for this fragment */ * The next ACK MUST be for this fragment */
ssize_t
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
{ {
/* Check if packet is in window */ /* Check if packet is in window */
unsigned startid, endid; unsigned startid, endid, offset;
fragment *fd; fragment *fd;
startid = w->start_seq_id; startid = w->start_seq_id;
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
offset = SEQ_OFFSET(startid, f->seqID);
if (!INWINDOW_SEQ(startid, endid, f->seqID)) { if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid);
w->oos++; w->oos++;
return -1; if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) {
/* Only drop the fragment if it is ancient */
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
return -1;
} else {
/* Save "new" fragments to avoid causing other end to advance
* when this fragment is ACK'd despite being dropped */
WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).",
f->seqID, offset, startid, w->windowsize);
}
} }
/* Place fragment into correct location in buffer */ /* Place fragment into correct location in buffer */
ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]",
f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest);
/* Check if fragment already received */ /* Check if fragment already received */
fd = &w->frags[dest]; fd = &w->frags[dest];
if (fd->len != 0) { if (fd->len != 0) {
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
if (f->seqID == fd->seqID) if (f->seqID == fd->seqID) {
/* use retries as counter for dupes */
fd->retries ++;
return -1; return -1;
}
} }
memcpy(fd, f, sizeof(fragment)); memcpy(fd, f, sizeof(fragment));
w->numitems ++; w->numitems ++;
@ -173,22 +190,24 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
if (w->direction != WINDOW_RECVING) if (w->direction != WINDOW_RECVING)
return 0; return 0;
if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) { if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) {
WDEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!", WDEBUG("chunk_start (%lu) pointing to non-start fragment (seq %u, len %lu)!",
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len); w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
return 0; return 0;
} }
if (compression) *compression = 1; if (compression) *compression = 1;
fragment *f; fragment *f;
size_t i, curseq; size_t i;
unsigned curseq;
int end = 0; int end = 0;
curseq = w->frags[w->chunk_start].seqID; curseq = w->frags[w->chunk_start].seqID;
for (i = 0; i < w->numitems; ++i) { for (i = 0; i < w->numitems; ++i) {
woffs = WRAP(w->chunk_start + i); woffs = WRAP(w->chunk_start + i);
f = &w->frags[woffs]; f = &w->frags[woffs];
fraglen = f->len; fraglen = f->len;
if (fraglen == 0 || !f->data || f->seqID != curseq) { if (fraglen == 0 || f->seqID != curseq) {
WDEBUG("data missing! Not reassembling!"); WDEBUG("Missing next frag %u [%lu], got seq %u (%lu bytes) instead! Not reassembling!",
curseq, woffs, f->seqID, fraglen);
return 0; return 0;
} }
@ -200,8 +219,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
if (compression) { if (compression) {
*compression &= f->compressed & 1; *compression &= f->compressed & 1;
if (f->compressed != *compression) { if (f->compressed != *compression) {
WDEBUG("Inconsistent compression flags in chunk. Not reassembling!"); WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!");
return 0;
} }
} }
if (fraglen > maxlen) { if (fraglen > maxlen) {
@ -218,13 +236,18 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
break; break;
} }
/* Move position counters and expected next seqID */
maxlen -= fraglen; maxlen -= fraglen;
curseq = (curseq + 1) % MAX_SEQ_ID; curseq = (curseq + 1) % MAX_SEQ_ID;
} }
if (end == 0) { /* no end of chunk found but reached end of data */
if (end == 0) {
/* no end of chunk found because the window buffer has no more frags
* meaning they haven't been received yet. */
return 0; return 0;
} }
WDEBUG("Reassembling %lu bytes of data from %lu frags; compression %d!", datalen, i + 1, *compression);
WDEBUG("Reassembled %lu bytes from %lu frags; %scompressed!", datalen, i + 1, *compression ? "" : "un");
/* Clear all used fragments */ /* Clear all used fragments */
size_t p; size_t p;
ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p,
@ -235,15 +258,19 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
return datalen; return datalen;
} }
/* Returns number of fragments that can be sent immediately; effectively
* the same as window_get_next_sending_fragment but without changing anything. */
size_t size_t
window_sending(struct frag_buffer *w) window_sending(struct frag_buffer *w, struct timeval *nextresend)
/* Returns number of fragments that can be sent immediately; effectively
the same as window_get_next_sending_fragment but without doing anything.
*nextresend is time before the next frag will be resent */
{ {
struct timeval timeout, now; struct timeval age, now, oldest;
fragment *f; fragment *f;
size_t tosend = 0; size_t tosend = 0;
oldest.tv_sec = 0;
oldest.tv_usec = 0;
if (w->numitems == 0) if (w->numitems == 0)
return 0; return 0;
@ -253,12 +280,30 @@ window_sending(struct frag_buffer *w)
f = &w->frags[WRAP(w->window_start + i)]; f = &w->frags[WRAP(w->window_start + i)];
if (f->len == 0 || f->acks >= 1) continue; if (f->len == 0 || f->acks >= 1) continue;
timeradd(&w->timeout, &f->lastsent, &timeout); if (f->retries < 1 || f->lastsent.tv_sec == 0) {
if (f->retries < 1 || !timercmp(&now, &timeout, <)) { /* Sending frag for first time
/* Fragment not sent or timed out (to be re-sent) */ * Note: if retries==0 then lastsent MUST also be 0 */
tosend++; tosend++;
} else {
/* Frag has been sent before so lastsent is a valid timestamp */
timersub(&now, &f->lastsent, &age);
if (!timercmp(&age, &w->timeout, <)) {
/* ACK timeout: Frag will be resent */
tosend++;
} else if (timercmp(&age, &oldest, >)) {
/* Hasn't timed out yet and is oldest so far */
oldest = age;
}
} }
} }
if (nextresend) {
/* nextresend = time before oldest fragment (not being sent now)
* will be re-sent = timeout - age */
timersub(&w->timeout, &oldest, nextresend);
}
return tosend; return tosend;
} }
@ -267,7 +312,7 @@ window_sending(struct frag_buffer *w)
fragment * fragment *
window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
{ {
struct timeval timeout, now; struct timeval age, now;
fragment *f = NULL; fragment *f = NULL;
if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) if (*other_ack >= MAX_SEQ_ID || *other_ack < 0)
@ -277,13 +322,14 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
f = &w->frags[WRAP(w->window_start + i)]; f = &w->frags[WRAP(w->window_start + i)];
if (f->acks >= 1) continue; if (f->acks >= 1 || f->len == 0) continue;
timeradd(&w->timeout, &f->lastsent, &timeout); timersub(&now, &f->lastsent, &age);
if (f->retries >= 1 && !timercmp(&now, &timeout, <)) { if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) {
/* Fragment sent before, not ACK'd */ /* Resending fragment due to ACK timeout */
WDEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/total %u",
f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->resends);
w->resends ++; w->resends ++;
goto found; goto found;
} else if (f->retries == 0 && f->len > 0) { } else if (f->retries == 0 && f->len > 0) {