From bd9966836e1f83e4f86e1f8dad009c419a8e01ec Mon Sep 17 00:00:00 2001 From: frekky Date: Mon, 28 Sep 2015 12:57:33 +0800 Subject: [PATCH] Added clear, reset and number of fragments to be sent; better debugging --- src/window.c | 105 +++++++++++++++++++++++++++++++++++++-------------- src/window.h | 17 +++++++-- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src/window.c b/src/window.c index 54cee54..09d47fe 100644 --- a/src/window.c +++ b/src/window.c @@ -25,6 +25,8 @@ #include "common.h" #include "window.h" +int window_debug = 0; + struct frag_buffer * window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir) { @@ -53,6 +55,20 @@ window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int di return buf; } +void +window_buffer_reset(struct frag_buffer *w) +{ + w->chunk_start = 0; + w->cur_seq_id = 0; + w->last_write = 0; + w->numitems = 0; + w->oos = 0; + w->resends = 0; + w->start_seq_id = 0; + w->window_start = 0; + w->window_end = AFTER(w, w->windowsize); +} + void window_buffer_resize(struct frag_buffer *w, size_t length) { @@ -66,11 +82,7 @@ window_buffer_resize(struct frag_buffer *w, size_t length) errx(1, "Failed to resize window buffer!"); } w->length = length; - w->numitems = 0; - w->window_start = 0; - w->start_seq_id = 0; - w->cur_seq_id = 0; - w->window_end = AFTER(w, w->windowsize); + window_buffer_reset(w); } void @@ -81,6 +93,15 @@ window_buffer_destroy(struct frag_buffer *w) free(w); } +void +window_buffer_clear(struct frag_buffer *w) +{ + if (!w) return; + + memset(w->frags, 0, w->length * sizeof(fragment)); + window_buffer_reset(w); +} + /* Returns number of available fragment slots (NOT BYTES) */ size_t window_buffer_available(struct frag_buffer *w) @@ -99,7 +120,8 @@ window_append_fragment(struct frag_buffer *w, fragment *src) return 1; } -/* Handles fragment received from the sending side (RECV) */ +/* Handles fragment received from the sending side (RECV) + * Returns seq ID of packet to be ACKed immediately */ int window_process_incoming_fragment(struct frag_buffer *w, fragment *f) { @@ -109,8 +131,11 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) startid = w->start_seq_id; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; if (!INWINDOW_SEQ(startid, endid, f->seqID)) { - DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); - return 0; + DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", + f->seqID, startid, endid); + w->oos++; + /* ACK duplicate so sender can move on ASAP */ + return f->seqID; } /* Place fragment into correct location in buffer */ size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); @@ -119,14 +144,17 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) fd = &w->frags[dest]; if (fd->len != 0) { DEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); - return 0; + if (f->seqID == fd->seqID) + return f->seqID; } memcpy(fd, f, sizeof(fragment)); fd->retries = 0; fd->ack_other = -1; - fd->acks = 0; + /* We assume this packet gets ACKed immediately on return of this function */ + fd->acks = 1; w->numitems ++; - return 1; + + return f->seqID; } /* Reassembles first complete sequence of fragments into data. (RECV) @@ -139,7 +167,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int dest = data; if (w->direction != WINDOW_RECVING) return 0; - if (w->frags[w->chunk_start].start == 0) { + if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) { DEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!", w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len); return 0; @@ -191,29 +219,50 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int if (end == 0) { /* no end of chunk found but reached end of data */ return 0; } - DEBUG("Reassembling %lu bytes of data from %lu frags!", datalen, i + 1); + DEBUG("Reassembling %lu bytes of data from %lu frags; compression %d!", datalen, i + 1, *compression); /* Clear all used fragments */ size_t p; - ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, { - memset(&w->frags[p], 0, sizeof(fragment)); - }); + ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, + memset(&w->frags[p], 0, sizeof(fragment)); + ); w->chunk_start = WRAP(woffs + 1); w->numitems -= i + 1; return datalen; } +/* Returns number of fragments that can be sent immediately; effectively + * the same as window_get_next_sending_fragment but without changing anything. */ +int +window_sending(struct frag_buffer *w) +{ + fragment *f; + int tosend = 0; + if (w->numitems == 0) + return 0; + for (size_t i = 0; i < w->windowsize; i++) { + f = &w->frags[WRAP(w->window_start + i)]; + if (f->len == 0 || f->acks >= 1) continue; + if ((f->retries == 0) != (difftime(time(NULL), f->lastsent) > ACK_TIMEOUT)) { + /* Fragment not sent xor timed out (to be re-sent) */ + tosend++; + } + } + return tosend; +} + /* Returns next fragment to be sent or NULL if nothing (SEND) * This also handles packet resends, timeouts etc. */ fragment * window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) { - fragment *f; + fragment *f = NULL; if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) *other_ack = -1; for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; if (f->acks >= 1) continue; - if (f->retries >= 1 && difftime(f->lastsent, time(NULL)) > ACK_TIMEOUT) { + /* TODO: use timeval for more precise timeouts */ + if (f->retries >= 1 && difftime(time(NULL), f->lastsent) > ACK_TIMEOUT) { /* Fragment sent before, not ACK'd */ DEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); w->resends ++; @@ -224,16 +273,16 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) } } - DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", - f->retries, f->seqID, f->len); - // TODO: statistics for packet loss/not sending etc + if (f) + DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", + f->retries, f->seqID, f->len); return NULL; found: - /* store other ACK into fragment so ACK is resent if fragment times out */ - if (f->ack_other == -1) - f->ack_other = *other_ack, *other_ack = -1; - f->is_nack &= 1; + /* store other ACK into fragment for sending; ignore any previous values. + Don't resend ACKs because by the time we do, the other end will have + resent the corresponding fragment so may as well not cause trouble. */ + f->ack_other = *other_ack, *other_ack = -1; f->start &= 1; f->end &= 1; f->retries++; @@ -265,7 +314,8 @@ window_ack(struct frag_buffer *w, int seqid) for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[AFTER(w, i)]; if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */ - if (f->acks > 0) DEBUG("ACK: %d ACKs for seqId %u", f->acks, seqid); + if (f->acks > 0) + DEBUG("DUPE ACK: %d ACKs for seqId %u", f->acks, seqid); f->acks ++; DEBUG(" ACK frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end); break; @@ -309,7 +359,7 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c compressed &= 1; size_t offset = 0; static fragment f; -// DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen); + DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen); for (size_t i = 0; i < n; i++) { memset(&f, 0, sizeof(f)); f.len = MIN(len - offset, w->maxfraglen); @@ -321,7 +371,6 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c f.ack_other = -1; window_append_fragment(w, &f); w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID; -// DEBUG(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1)); DEBUG(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset); offset += f.len; } diff --git a/src/window.h b/src/window.h index 8fe9e08..999a6f5 100644 --- a/src/window.h +++ b/src/window.h @@ -32,7 +32,6 @@ typedef struct fragment { size_t len; /* Length of fragment data (0 if fragment unused) */ unsigned seqID; /* fragment sequence ID */ int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ - int is_nack; /* 1 if other way ACK is a NACK */ int compressed; /* compression flag */ uint8_t start; /* start of chunk flag */ uint8_t end; /* end of chunk flag */ @@ -55,11 +54,14 @@ struct frag_buffer { unsigned cur_seq_id; /* Most recent sequence ID */ unsigned start_seq_id; /* Start of window sequence ID */ unsigned resends; /* number of fragments resent */ + unsigned oos; /* Number of out-of-sequence fragments received */ int direction; /* Sending or recving */ }; -#ifdef WINDOW_DEBUG -#define DEBUG(msg, ...) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__) +extern int window_debug; + +#ifdef DEBUG_BUILD +#define DEBUG(msg, ...) if (window_debug) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__) #else #define DEBUG(msg, ...) #endif @@ -94,6 +96,12 @@ struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsig void window_buffer_resize(struct frag_buffer *w, size_t length); void window_buffer_destroy(struct frag_buffer *w); +/* Clears fragments and resets window stats */ +void window_buffer_clear(struct frag_buffer *w); + +/* Resets window stats without clearing fragments */ +void window_buffer_reset(struct frag_buffer *w); + /* Returns number of available fragment slots (NOT BYTES) */ size_t window_buffer_available(struct frag_buffer *w); @@ -107,6 +115,9 @@ int window_process_incoming_fragment(struct frag_buffer *w, fragment *f); * Returns length of data reassembled, or 0 if no data reassembled */ size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression); +/* Returns number of fragments to be sent */ +int window_sending(struct frag_buffer *w); + /* Returns next fragment to be sent or NULL if nothing (SEND) */ fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack);