From 8354ce28aaedad1a2126d447b534bb985808f09c Mon Sep 17 00:00:00 2001 From: frekky Date: Sat, 17 Oct 2015 22:30:46 +0800 Subject: [PATCH] Updated fragment timeouts to use timeval for better precision --- src/window.c | 56 ++++++++++++++++++++++++++++++++++------------------ src/window.h | 40 ++++++++++++++++++------------------- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/src/window.c b/src/window.c index 0ca76c2..4cd6c6d 100644 --- a/src/window.c +++ b/src/window.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,8 @@ window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int di buf->maxfraglen = fragsize; buf->window_end = AFTER(buf, windowsize); buf->direction = dir; + buf->timeout.tv_sec = 5; + buf->timeout.tv_usec = 0; return buf; } @@ -121,8 +124,9 @@ window_append_fragment(struct frag_buffer *w, fragment *src) } /* Handles fragment received from the sending side (RECV) - * Returns seq ID of packet to be ACKed immediately */ -int + * Returns index of fragment in window or <0 if dropped + * The next ACK MUST be for this fragment */ +ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f) { /* Check if packet is in window */ @@ -131,30 +135,31 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) startid = w->start_seq_id; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; if (!INWINDOW_SEQ(startid, endid, f->seqID)) { - WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", - f->seqID, startid, endid); + WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid); w->oos++; - /* ACK duplicate so sender can move on ASAP */ - return f->seqID; + return -1; } /* Place fragment into correct location in buffer */ - size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); - WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); + ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); + WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", + f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); /* Check if fragment already received */ fd = &w->frags[dest]; if (fd->len != 0) { WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); if (f->seqID == fd->seqID) - return f->seqID; + return -1; } memcpy(fd, f, sizeof(fragment)); - fd->retries = 0; - fd->ack_other = -1; - /* We assume this packet gets ACKed immediately on return of this function */ - fd->acks = 1; w->numitems ++; - return f->seqID; + fd->retries = 0; + fd->ack_other = -1; + + /* We assume this packet gets ACKed immediately on return of this function */ + fd->acks = 1; + + return dest; } /* Reassembles first complete sequence of fragments into data. (RECV) @@ -235,14 +240,21 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int size_t window_sending(struct frag_buffer *w) { + struct timeval timeout, now; fragment *f; size_t tosend = 0; + if (w->numitems == 0) return 0; + + gettimeofday(&now, NULL); + for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; if (f->len == 0 || f->acks >= 1) continue; - if (f->retries < 1 || difftime(time(NULL), f->lastsent) >= ACK_TIMEOUT) { + + timeradd(&w->timeout, &f->lastsent, &timeout); + if (f->retries < 1 || !timercmp(&now, &timeout, <)) { /* Fragment not sent or timed out (to be re-sent) */ tosend++; } @@ -255,14 +267,21 @@ window_sending(struct frag_buffer *w) fragment * window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) { + struct timeval timeout, now; fragment *f = NULL; + if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) *other_ack = -1; + + gettimeofday(&now, NULL); + for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; if (f->acks >= 1) continue; - /* TODO: use timeval for more precise timeouts */ - if (f->retries >= 1 && difftime(time(NULL), f->lastsent) >= ACK_TIMEOUT) { + + timeradd(&w->timeout, &f->lastsent, &timeout); + + if (f->retries >= 1 && !timercmp(&now, &timeout, <)) { /* Fragment sent before, not ACK'd */ WDEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); w->resends ++; @@ -271,7 +290,6 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) /* Fragment not sent */ goto found; } - } if (f) WDEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", @@ -286,7 +304,7 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) f->start &= 1; f->end &= 1; f->retries++; - time(&f->lastsent); + gettimeofday(&f->lastsent, NULL); return f; } diff --git a/src/window.h b/src/window.h index d5831dd..520d0eb 100644 --- a/src/window.h +++ b/src/window.h @@ -19,7 +19,6 @@ #define MAX_SEQ_ID 256 #define MAX_FRAGSIZE 4096 -#define ACK_TIMEOUT 5 #define WINDOW_SENDING 1 #define WINDOW_RECVING 0 @@ -29,33 +28,34 @@ //#define WINDOW_DEBUG typedef struct fragment { - size_t len; /* Length of fragment data (0 if fragment unused) */ - unsigned seqID; /* fragment sequence ID */ - int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ - int compressed; /* compression flag */ - uint8_t start; /* start of chunk flag */ - uint8_t end; /* end of chunk flag */ + size_t len; /* Length of fragment data (0 if fragment unused) */ + unsigned seqID; /* fragment sequence ID */ + int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ + int compressed; /* compression flag */ + uint8_t start; /* start of chunk flag */ + uint8_t end; /* end of chunk flag */ uint8_t data[MAX_FRAGSIZE]; /* fragment data */ - unsigned retries; /* number of times fragment has been sent */ - time_t lastsent; /* timestamp of most recent send attempt TODO: millisecond precision*/ - int acks; /* number of times packet has been ack'd (should be <= 1) */ + unsigned retries; /* number of times fragment has been sent */ + struct timeval lastsent; /* timestamp of most recent send attempt */ + int acks; /* number of times packet has been ack'd (should be <= 1) */ } fragment; struct frag_buffer { - fragment *frags; /* pointer to array of data fragments */ + fragment *frags; /* pointer to array of data fragments */ unsigned windowsize; /* Max number of packets in flight */ unsigned maxfraglen; /* Max fragment size */ - size_t length; /* Length of buffer */ - size_t numitems; /* number of non-empty fragments stored in buffer */ + size_t length; /* Length of buffer */ + size_t numitems; /* number of non-empty fragments stored in buffer */ size_t window_start; /* Start of window */ - size_t window_end; /* End of window (index) */ - size_t last_write; /* Last fragment read/written */ - size_t chunk_start; /* Start of current chunk of fragments, ie where fragno = 0 */ + size_t window_end; /* End of window (index) */ + size_t last_write; /* Last fragment read/written */ + size_t chunk_start; /* Start of current chunk of fragments, ie where fragno = 0 */ unsigned cur_seq_id; /* Most recent sequence ID */ unsigned start_seq_id; /* Start of window sequence ID */ - unsigned resends; /* number of fragments resent */ - unsigned oos; /* Number of out-of-sequence fragments received */ - int direction; /* Sending or recving */ + unsigned resends; /* number of fragments resent */ + unsigned oos; /* Number of out-of-sequence fragments received */ + int direction; /* Sending or recving */ + struct timeval timeout; /* Fragment timeout before resend */ }; extern int window_debug; @@ -113,7 +113,7 @@ size_t window_buffer_available(struct frag_buffer *w); int window_append_fragment(struct frag_buffer *w, fragment *src); /* Handles fragment received from the sending side (RECV) */ -int window_process_incoming_fragment(struct frag_buffer *w, fragment *f); +ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f); /* Reassembles first complete sequence of fragments into data. (RECV) * Returns length of data reassembled, or 0 if no data reassembled */