Fixed some windowing issues + made debug output optional (WINDOW_DEBUG)

This commit is contained in:
frekky 2015-08-29 20:06:53 +08:00
parent 02c2763c26
commit b6162241e6
2 changed files with 77 additions and 48 deletions

View File

@ -25,12 +25,6 @@
#include "common.h" #include "common.h"
#include "window.h" #include "window.h"
void
window_buffer_reset(struct frag_buffer *w)
{
}
struct frag_buffer * struct frag_buffer *
window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir) window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir)
{ {
@ -64,7 +58,7 @@ window_buffer_resize(struct frag_buffer *w, size_t length)
{ {
if (w->length == length) return; if (w->length == length) return;
if (w->numitems > 0) { if (w->numitems > 0) {
warnx("Resizing window buffer with things still in it! This will cause problems!"); DEBUG("Resizing window buffer with things still in it! This will cause problems!");
} }
if (w->frags) free(w->frags); if (w->frags) free(w->frags);
w->frags = calloc(length, sizeof(fragment)); w->frags = calloc(length, sizeof(fragment));
@ -115,19 +109,19 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
startid = w->start_seq_id; startid = w->start_seq_id;
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
if (!INWINDOW_SEQ(startid, endid, f->seqID)) { if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
warnx("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid);
return 0; return 0;
} }
/* Place fragment into correct location in buffer */ /* Place fragment into correct location in buffer */
size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
// warnx(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); DEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest);
/* Check if fragment already received */ /* Check if fragment already received */
fd = &w->frags[dest]; fd = &w->frags[dest];
if (fd->len != 0) { if (fd->len != 0) {
warnx("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); DEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
return 0; return 0;
} }
memcpy(&fd, f, sizeof(fragment)); memcpy(fd, f, sizeof(fragment));
fd->retries = 0; fd->retries = 0;
fd->ack_other = -1; fd->ack_other = -1;
fd->acks = 0; fd->acks = 0;
@ -138,57 +132,71 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
/* Reassembles first complete sequence of fragments into data. (RECV) /* Reassembles first complete sequence of fragments into data. (RECV)
* Returns length of data reassembled, or 0 if no data reassembled */ * Returns length of data reassembled, or 0 if no data reassembled */
size_t size_t
window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, int *compression) window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression)
{ {
size_t woffs, fraglen, datalen = 0; size_t woffs, fraglen, datalen = 0;
uint8_t *dest; //, *fdata_start; uint8_t *dest; //, *fdata_start;
dest = data; dest = data;
if (w->direction != WINDOW_RECVING)
return 0;
if (w->frags[w->chunk_start].start == 0) { if (w->frags[w->chunk_start].start == 0) {
// warnx("chunk_start pointing to non-start fragment (%u)!", w->frags[w->chunk_start].seqID); DEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!",
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
return 0; return 0;
} }
if (compression) *compression = 1; if (compression) *compression = 1;
fragment *f; fragment *f;
size_t i, curseq; size_t i, curseq;
int end = 0;
curseq = w->frags[w->chunk_start].seqID; curseq = w->frags[w->chunk_start].seqID;
for (i = 0; i < w->numitems; ++i) { for (i = 0; i < w->numitems; ++i) {
woffs = WRAP(w->chunk_start + i); woffs = WRAP(w->chunk_start + i);
f = &w->frags[woffs]; f = &w->frags[woffs];
fraglen = f->len; fraglen = f->len;
if (fraglen == 0 || !f->data || f->seqID != curseq) { if (fraglen == 0 || !f->data || f->seqID != curseq) {
// warnx("data missing! Not reassembling!"); DEBUG("data missing! Not reassembling!");
return 0; return 0;
} }
// warnx(" Fragment seq %u, data length %u, data offset %lu, total len %u, maxlen %u", DEBUG(" Fragment seq %u, data length %lu, data offset %lu, total len %lu, maxlen %lu",
// f->seqID, fraglen, dest - data, datalen, maxlen); f->seqID, fraglen, dest - data, datalen, maxlen);
memcpy(dest, f->data, MIN(fraglen, maxlen)); memcpy(dest, f->data, MIN(fraglen, maxlen));
dest += fraglen; dest += fraglen;
datalen += fraglen; datalen += fraglen;
if (compression) *compression &= f->compressed & 1; if (compression) {
*compression &= f->compressed & 1;
if (f->compressed != *compression) { if (f->compressed != *compression) {
warnx("Inconsistent compression flags in chunk. Not reassembling!"); DEBUG("Inconsistent compression flags in chunk. Not reassembling!");
return 0; return 0;
} }
}
if (fraglen > maxlen) { if (fraglen > maxlen) {
warnx("Data buffer too small! Reassembled %lu bytes.", datalen); DEBUG("Data buffer too small! Reassembled %lu bytes.", datalen);
return 0; return datalen;
} }
/* Move window along to avoid weird issues */
window_tick(w);
if (f->end == 1) { if (f->end == 1) {
// warnx("Found end of chunk! (seqID %u, chunk len %u, datalen %u)", f->seqID, i, datalen); DEBUG("Found end of chunk! (seqID %u, chunk len %lu, datalen %lu)", f->seqID, i, datalen);
end = 1;
break; break;
} }
/* Move window along to avoid weird issues */
if (INWINDOW_INDEX(w, woffs)) {
window_tick(w);
}
/* Clear fragment */
memset(f, 0, sizeof(fragment));
maxlen -= fraglen; maxlen -= fraglen;
curseq = (curseq + 1) % MAX_SEQ_ID; curseq = (curseq + 1) % MAX_SEQ_ID;
} }
if (end == 0) { /* no end of chunk found but reached end of data */
return 0;
}
DEBUG("Reassembling %lu bytes of data from %lu frags!", datalen, i + 1);
/* Clear all used fragments */
size_t p;
ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, {
memset(&w->frags[p], 0, sizeof(fragment));
});
w->chunk_start = WRAP(woffs + 1); w->chunk_start = WRAP(woffs + 1);
w->numitems -= i + 1; w->numitems -= i + 1;
return datalen; return datalen;
@ -197,17 +205,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, in
/* Returns next fragment to be sent or NULL if nothing (SEND) /* Returns next fragment to be sent or NULL if nothing (SEND)
* This also handles packet resends, timeouts etc. */ * This also handles packet resends, timeouts etc. */
fragment * fragment *
window_get_next_sending_fragment(struct frag_buffer *w, int other_ack) window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
{ {
fragment *f; fragment *f;
if (other_ack >= MAX_SEQ_ID || other_ack < 0) if (*other_ack >= MAX_SEQ_ID || *other_ack < 0)
other_ack = -1; *other_ack = -1;
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
f = &w->frags[WRAP(w->window_start + i)]; f = &w->frags[WRAP(w->window_start + i)];
if (f->acks >= 1) continue; if (f->acks >= 1) continue;
if (f->retries >= 1 && difftime(f->lastsent, time(NULL)) > ACK_TIMEOUT) { if (f->retries >= 1 && difftime(f->lastsent, time(NULL)) > ACK_TIMEOUT) {
/* Fragment sent before, not ACK'd */ /* Fragment sent before, not ACK'd */
warnx("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); DEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends);
w->resends ++; w->resends ++;
goto found; goto found;
} else if (f->retries == 0 && f->len > 0) { } else if (f->retries == 0 && f->len > 0) {
@ -216,15 +224,15 @@ window_get_next_sending_fragment(struct frag_buffer *w, int other_ack)
} }
} }
// warnx("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)",
// f->retries, f->seqID, f->len); f->retries, f->seqID, f->len);
// TODO: statistics for packet loss/not sending etc // TODO: statistics for packet loss/not sending etc
return NULL; return NULL;
found: found:
/* store other ACK into fragment so ACK is resent if fragment times out */ /* store other ACK into fragment so ACK is resent if fragment times out */
if (f->ack_other == -1) if (f->ack_other == -1)
f->ack_other = other_ack; f->ack_other = *other_ack, *other_ack = -1;
f->is_nack &= 1; f->is_nack &= 1;
f->start &= 1; f->start &= 1;
f->end &= 1; f->end &= 1;
@ -256,10 +264,11 @@ window_ack(struct frag_buffer *w, int seqid)
if (seqid < 0 || seqid > MAX_SEQ_ID) return; if (seqid < 0 || seqid > MAX_SEQ_ID) return;
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
f = &w->frags[AFTER(w, i)]; f = &w->frags[AFTER(w, i)];
if (f->seqID == seqid) { if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */
if (f->acks > 0) warnx("Duplicate ack for seqId %u", seqid); if (f->acks > 0) DEBUG("ACK: %d ACKs for seqId %u", f->acks, seqid);
f->acks ++; f->acks ++;
// warnx(" ack frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->ack, f->len, f->start, f->end); DEBUG(" ACK frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end);
break;
} }
} }
} }
@ -271,9 +280,10 @@ window_tick(struct frag_buffer *w)
{ {
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
if (w->frags[w->window_start].acks >= 1) { if (w->frags[w->window_start].acks >= 1) {
// warnx("moving window forwards 1; start = %lu-%lu, end = %lu-%lu, len = %lu", DEBUG("moving window forwards 1; start = %lu-%lu, end = %lu-%lu, len = %lu",
// w->window_start, AFTER(w, 1), w->window_end, AFTER(w, w->windowsize + 1), w->length); w->window_start, AFTER(w, 1), w->window_end, AFTER(w, w->windowsize + 1), w->length);
if (w->direction == WINDOW_SENDING) { if (w->direction == WINDOW_SENDING) {
DEBUG("Clearing old fragments in SENDING window.");
w->numitems --; /* Clear old fragments */ w->numitems --; /* Clear old fragments */
memset(&w->frags[w->window_start], 0, sizeof(fragment)); memset(&w->frags[w->window_start], 0, sizeof(fragment));
} }
@ -293,13 +303,13 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c
// Split data into thingies of <= fragsize // Split data into thingies of <= fragsize
size_t n = ((len - 1) / w->maxfraglen) + 1; size_t n = ((len - 1) / w->maxfraglen) + 1;
if (!data || n == 0 || len == 0 || n > window_buffer_available(w)) { if (!data || n == 0 || len == 0 || n > window_buffer_available(w)) {
warnx("Failed to append fragment (buffer too small!)"); DEBUG("Failed to append fragment (buffer too small!)");
return -1; return -1;
} }
compressed &= 1; compressed &= 1;
size_t offset = 0; size_t offset = 0;
static fragment f; static fragment f;
// warnx("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen); // DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen);
for (size_t i = 0; i < n; i++) { for (size_t i = 0; i < n; i++) {
memset(&f, 0, sizeof(f)); memset(&f, 0, sizeof(f));
f.len = MIN(len - offset, w->maxfraglen); f.len = MIN(len - offset, w->maxfraglen);
@ -311,8 +321,8 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c
f.ack_other = -1; f.ack_other = -1;
window_append_fragment(w, &f); window_append_fragment(w, &f);
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID; w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
// warnx(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1)); // DEBUG(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1));
// warnx(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset); DEBUG(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset);
offset += f.len; offset += f.len;
} }
return n; return n;

View File

@ -24,8 +24,12 @@
#define WINDOW_SENDING 1 #define WINDOW_SENDING 1
#define WINDOW_RECVING 0 #define WINDOW_RECVING 0
/* Enables LOTS of annoying debug output */
//#define WINDOW_DEBUG
typedef struct fragment { typedef struct fragment {
size_t len; /* Length of fragment data */ size_t len; /* Length of fragment data (0 if fragment unused) */
unsigned seqID; /* fragment sequence ID */ unsigned seqID; /* fragment sequence ID */
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
int is_nack; /* 1 if other way ACK is a NACK */ int is_nack; /* 1 if other way ACK is a NACK */
@ -54,6 +58,12 @@ struct frag_buffer {
int direction; /* Sending or recving */ int direction; /* Sending or recving */
}; };
#ifdef WINDOW_DEBUG
#define DEBUG(msg, ...) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#else
#define DEBUG(msg, ...)
#endif
#define AFTER(w, o) ((w->window_start + o) % w->length) #define AFTER(w, o) ((w->window_start + o) % w->length)
// Distance (going forwards) between a and b in window of length l // Distance (going forwards) between a and b in window of length l
@ -71,6 +81,15 @@ struct frag_buffer {
#define SEQ_OFFSET(start, a) ((a >= start) ? a - start : MAX_SEQ_ID + start - a - 1) #define SEQ_OFFSET(start, a) ((a >= start) ? a - start : MAX_SEQ_ID + start - a - 1)
#define WRAP(x) ((x) % w->length) #define WRAP(x) ((x) % w->length)
#define ITER_FORWARD(begin, end, max, pos, f) { \
if (end >= begin) \
for (pos = begin; pos < end && pos < max; pos++) {f}\
else {\
for (pos = begin; pos < max; pos++) {f}\
for (pos = 0; pos < end && pos < max; pos++) {f}\
}\
}
struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir); struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir);
void window_buffer_resize(struct frag_buffer *w, size_t length); void window_buffer_resize(struct frag_buffer *w, size_t length);
void window_buffer_destroy(struct frag_buffer *w); void window_buffer_destroy(struct frag_buffer *w);
@ -86,10 +105,10 @@ int window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
/* Reassembles first complete sequence of fragments into data. (RECV) /* Reassembles first complete sequence of fragments into data. (RECV)
* Returns length of data reassembled, or 0 if no data reassembled */ * Returns length of data reassembled, or 0 if no data reassembled */
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, int *compression); size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression);
/* Returns next fragment to be sent or NULL if nothing (SEND) */ /* Returns next fragment to be sent or NULL if nothing (SEND) */
fragment *window_get_next_sending_fragment(struct frag_buffer *w, int other_ack); fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack);
/* Gets the seqid of next fragment to be ACK'd (RECV) */ /* Gets the seqid of next fragment to be ACK'd (RECV) */
int window_get_next_ack(struct frag_buffer *w); int window_get_next_ack(struct frag_buffer *w);