diff --git a/src/window.c b/src/window.c index a99e061..54cee54 100644 --- a/src/window.c +++ b/src/window.c @@ -25,12 +25,6 @@ #include "common.h" #include "window.h" -void -window_buffer_reset(struct frag_buffer *w) -{ - -} - struct frag_buffer * window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir) { @@ -64,7 +58,7 @@ window_buffer_resize(struct frag_buffer *w, size_t length) { if (w->length == length) return; if (w->numitems > 0) { - warnx("Resizing window buffer with things still in it! This will cause problems!"); + DEBUG("Resizing window buffer with things still in it! This will cause problems!"); } if (w->frags) free(w->frags); w->frags = calloc(length, sizeof(fragment)); @@ -115,19 +109,19 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) startid = w->start_seq_id; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; if (!INWINDOW_SEQ(startid, endid, f->seqID)) { - warnx("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); + DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); return 0; } /* Place fragment into correct location in buffer */ size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); -// warnx(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); + DEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); /* Check if fragment already received */ fd = &w->frags[dest]; if (fd->len != 0) { - warnx("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); + DEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); return 0; } - memcpy(&fd, f, sizeof(fragment)); + memcpy(fd, f, sizeof(fragment)); fd->retries = 0; fd->ack_other = -1; fd->acks = 0; @@ -138,57 +132,71 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) /* Reassembles first complete sequence of fragments into data. (RECV) * Returns length of data reassembled, or 0 if no data reassembled */ size_t -window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, int *compression) +window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression) { size_t woffs, fraglen, datalen = 0; uint8_t *dest; //, *fdata_start; dest = data; + if (w->direction != WINDOW_RECVING) + return 0; if (w->frags[w->chunk_start].start == 0) { -// warnx("chunk_start pointing to non-start fragment (%u)!", w->frags[w->chunk_start].seqID); + DEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!", + w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len); return 0; } if (compression) *compression = 1; fragment *f; size_t i, curseq; + int end = 0; curseq = w->frags[w->chunk_start].seqID; for (i = 0; i < w->numitems; ++i) { woffs = WRAP(w->chunk_start + i); f = &w->frags[woffs]; fraglen = f->len; if (fraglen == 0 || !f->data || f->seqID != curseq) { -// warnx("data missing! Not reassembling!"); + DEBUG("data missing! Not reassembling!"); return 0; } -// warnx(" Fragment seq %u, data length %u, data offset %lu, total len %u, maxlen %u", -// f->seqID, fraglen, dest - data, datalen, maxlen); + DEBUG(" Fragment seq %u, data length %lu, data offset %lu, total len %lu, maxlen %lu", + f->seqID, fraglen, dest - data, datalen, maxlen); memcpy(dest, f->data, MIN(fraglen, maxlen)); dest += fraglen; datalen += fraglen; - if (compression) *compression &= f->compressed & 1; - if (f->compressed != *compression) { - warnx("Inconsistent compression flags in chunk. Not reassembling!"); - return 0; + if (compression) { + *compression &= f->compressed & 1; + if (f->compressed != *compression) { + DEBUG("Inconsistent compression flags in chunk. Not reassembling!"); + return 0; + } } if (fraglen > maxlen) { - warnx("Data buffer too small! Reassembled %lu bytes.", datalen); - return 0; + DEBUG("Data buffer too small! Reassembled %lu bytes.", datalen); + return datalen; } + /* Move window along to avoid weird issues */ + window_tick(w); + if (f->end == 1) { -// warnx("Found end of chunk! (seqID %u, chunk len %u, datalen %u)", f->seqID, i, datalen); + DEBUG("Found end of chunk! (seqID %u, chunk len %lu, datalen %lu)", f->seqID, i, datalen); + end = 1; break; } - /* Move window along to avoid weird issues */ - if (INWINDOW_INDEX(w, woffs)) { - window_tick(w); - } - /* Clear fragment */ - memset(f, 0, sizeof(fragment)); + maxlen -= fraglen; curseq = (curseq + 1) % MAX_SEQ_ID; } + if (end == 0) { /* no end of chunk found but reached end of data */ + return 0; + } + DEBUG("Reassembling %lu bytes of data from %lu frags!", datalen, i + 1); + /* Clear all used fragments */ + size_t p; + ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, { + memset(&w->frags[p], 0, sizeof(fragment)); + }); w->chunk_start = WRAP(woffs + 1); w->numitems -= i + 1; return datalen; @@ -197,17 +205,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, in /* Returns next fragment to be sent or NULL if nothing (SEND) * This also handles packet resends, timeouts etc. */ fragment * -window_get_next_sending_fragment(struct frag_buffer *w, int other_ack) +window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) { fragment *f; - if (other_ack >= MAX_SEQ_ID || other_ack < 0) - other_ack = -1; + if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) + *other_ack = -1; for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; if (f->acks >= 1) continue; if (f->retries >= 1 && difftime(f->lastsent, time(NULL)) > ACK_TIMEOUT) { /* Fragment sent before, not ACK'd */ - warnx("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); + DEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); w->resends ++; goto found; } else if (f->retries == 0 && f->len > 0) { @@ -216,15 +224,15 @@ window_get_next_sending_fragment(struct frag_buffer *w, int other_ack) } } -// warnx("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", -// f->retries, f->seqID, f->len); + DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", + f->retries, f->seqID, f->len); // TODO: statistics for packet loss/not sending etc return NULL; found: /* store other ACK into fragment so ACK is resent if fragment times out */ if (f->ack_other == -1) - f->ack_other = other_ack; + f->ack_other = *other_ack, *other_ack = -1; f->is_nack &= 1; f->start &= 1; f->end &= 1; @@ -256,10 +264,11 @@ window_ack(struct frag_buffer *w, int seqid) if (seqid < 0 || seqid > MAX_SEQ_ID) return; for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[AFTER(w, i)]; - if (f->seqID == seqid) { - if (f->acks > 0) warnx("Duplicate ack for seqId %u", seqid); + if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */ + if (f->acks > 0) DEBUG("ACK: %d ACKs for seqId %u", f->acks, seqid); f->acks ++; -// warnx(" ack frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->ack, f->len, f->start, f->end); + DEBUG(" ACK frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end); + break; } } } @@ -271,9 +280,10 @@ window_tick(struct frag_buffer *w) { for (size_t i = 0; i < w->windowsize; i++) { if (w->frags[w->window_start].acks >= 1) { -// warnx("moving window forwards 1; start = %lu-%lu, end = %lu-%lu, len = %lu", -// w->window_start, AFTER(w, 1), w->window_end, AFTER(w, w->windowsize + 1), w->length); + DEBUG("moving window forwards 1; start = %lu-%lu, end = %lu-%lu, len = %lu", + w->window_start, AFTER(w, 1), w->window_end, AFTER(w, w->windowsize + 1), w->length); if (w->direction == WINDOW_SENDING) { + DEBUG("Clearing old fragments in SENDING window."); w->numitems --; /* Clear old fragments */ memset(&w->frags[w->window_start], 0, sizeof(fragment)); } @@ -293,13 +303,13 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c // Split data into thingies of <= fragsize size_t n = ((len - 1) / w->maxfraglen) + 1; if (!data || n == 0 || len == 0 || n > window_buffer_available(w)) { - warnx("Failed to append fragment (buffer too small!)"); + DEBUG("Failed to append fragment (buffer too small!)"); return -1; } compressed &= 1; size_t offset = 0; static fragment f; -// warnx("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen); +// DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen); for (size_t i = 0; i < n; i++) { memset(&f, 0, sizeof(f)); f.len = MIN(len - offset, w->maxfraglen); @@ -311,8 +321,8 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c f.ack_other = -1; window_append_fragment(w, &f); w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID; -// warnx(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1)); -// warnx(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset); +// DEBUG(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1)); + DEBUG(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset); offset += f.len; } return n; diff --git a/src/window.h b/src/window.h index fc9e506..8fe9e08 100644 --- a/src/window.h +++ b/src/window.h @@ -24,8 +24,12 @@ #define WINDOW_SENDING 1 #define WINDOW_RECVING 0 + +/* Enables LOTS of annoying debug output */ +//#define WINDOW_DEBUG + typedef struct fragment { - size_t len; /* Length of fragment data */ + size_t len; /* Length of fragment data (0 if fragment unused) */ unsigned seqID; /* fragment sequence ID */ int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ int is_nack; /* 1 if other way ACK is a NACK */ @@ -54,6 +58,12 @@ struct frag_buffer { int direction; /* Sending or recving */ }; +#ifdef WINDOW_DEBUG +#define DEBUG(msg, ...) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__) +#else +#define DEBUG(msg, ...) +#endif + #define AFTER(w, o) ((w->window_start + o) % w->length) // Distance (going forwards) between a and b in window of length l @@ -71,6 +81,15 @@ struct frag_buffer { #define SEQ_OFFSET(start, a) ((a >= start) ? a - start : MAX_SEQ_ID + start - a - 1) #define WRAP(x) ((x) % w->length) +#define ITER_FORWARD(begin, end, max, pos, f) { \ + if (end >= begin) \ + for (pos = begin; pos < end && pos < max; pos++) {f}\ + else {\ + for (pos = begin; pos < max; pos++) {f}\ + for (pos = 0; pos < end && pos < max; pos++) {f}\ + }\ + } + struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir); void window_buffer_resize(struct frag_buffer *w, size_t length); void window_buffer_destroy(struct frag_buffer *w); @@ -86,10 +105,10 @@ int window_process_incoming_fragment(struct frag_buffer *w, fragment *f); /* Reassembles first complete sequence of fragments into data. (RECV) * Returns length of data reassembled, or 0 if no data reassembled */ -size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, unsigned maxlen, int *compression); +size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression); /* Returns next fragment to be sent or NULL if nothing (SEND) */ -fragment *window_get_next_sending_fragment(struct frag_buffer *w, int other_ack); +fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack); /* Gets the seqid of next fragment to be ACK'd (RECV) */ int window_get_next_ack(struct frag_buffer *w);