Updated fragment timeouts to use timeval for better precision

This commit is contained in:
frekky 2015-10-17 22:30:46 +08:00
parent 4983dadfdd
commit 8354ce28aa
2 changed files with 57 additions and 39 deletions

View File

@ -17,6 +17,7 @@
#include <unistd.h> #include <unistd.h>
#include <stdint.h> #include <stdint.h>
#include <time.h> #include <time.h>
#include <sys/time.h>
#include <stdio.h> #include <stdio.h>
#include <err.h> #include <err.h>
#include <string.h> #include <string.h>
@ -51,6 +52,8 @@ window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int di
buf->maxfraglen = fragsize; buf->maxfraglen = fragsize;
buf->window_end = AFTER(buf, windowsize); buf->window_end = AFTER(buf, windowsize);
buf->direction = dir; buf->direction = dir;
buf->timeout.tv_sec = 5;
buf->timeout.tv_usec = 0;
return buf; return buf;
} }
@ -121,8 +124,9 @@ window_append_fragment(struct frag_buffer *w, fragment *src)
} }
/* Handles fragment received from the sending side (RECV) /* Handles fragment received from the sending side (RECV)
* Returns seq ID of packet to be ACKed immediately */ * Returns index of fragment in window or <0 if dropped
int * The next ACK MUST be for this fragment */
ssize_t
window_process_incoming_fragment(struct frag_buffer *w, fragment *f) window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
{ {
/* Check if packet is in window */ /* Check if packet is in window */
@ -131,30 +135,31 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
startid = w->start_seq_id; startid = w->start_seq_id;
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
if (!INWINDOW_SEQ(startid, endid, f->seqID)) { if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid);
f->seqID, startid, endid);
w->oos++; w->oos++;
/* ACK duplicate so sender can move on ASAP */ return -1;
return f->seqID;
} }
/* Place fragment into correct location in buffer */ /* Place fragment into correct location in buffer */
size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); WDEBUG(" Putting frag seq %u into frags[%lu + %u = %lu]",
f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest);
/* Check if fragment already received */ /* Check if fragment already received */
fd = &w->frags[dest]; fd = &w->frags[dest];
if (fd->len != 0) { if (fd->len != 0) {
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
if (f->seqID == fd->seqID) if (f->seqID == fd->seqID)
return f->seqID; return -1;
} }
memcpy(fd, f, sizeof(fragment)); memcpy(fd, f, sizeof(fragment));
fd->retries = 0;
fd->ack_other = -1;
/* We assume this packet gets ACKed immediately on return of this function */
fd->acks = 1;
w->numitems ++; w->numitems ++;
return f->seqID; fd->retries = 0;
fd->ack_other = -1;
/* We assume this packet gets ACKed immediately on return of this function */
fd->acks = 1;
return dest;
} }
/* Reassembles first complete sequence of fragments into data. (RECV) /* Reassembles first complete sequence of fragments into data. (RECV)
@ -235,14 +240,21 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
size_t size_t
window_sending(struct frag_buffer *w) window_sending(struct frag_buffer *w)
{ {
struct timeval timeout, now;
fragment *f; fragment *f;
size_t tosend = 0; size_t tosend = 0;
if (w->numitems == 0) if (w->numitems == 0)
return 0; return 0;
gettimeofday(&now, NULL);
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
f = &w->frags[WRAP(w->window_start + i)]; f = &w->frags[WRAP(w->window_start + i)];
if (f->len == 0 || f->acks >= 1) continue; if (f->len == 0 || f->acks >= 1) continue;
if (f->retries < 1 || difftime(time(NULL), f->lastsent) >= ACK_TIMEOUT) {
timeradd(&w->timeout, &f->lastsent, &timeout);
if (f->retries < 1 || !timercmp(&now, &timeout, <)) {
/* Fragment not sent or timed out (to be re-sent) */ /* Fragment not sent or timed out (to be re-sent) */
tosend++; tosend++;
} }
@ -255,14 +267,21 @@ window_sending(struct frag_buffer *w)
fragment * fragment *
window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
{ {
struct timeval timeout, now;
fragment *f = NULL; fragment *f = NULL;
if (*other_ack >= MAX_SEQ_ID || *other_ack < 0) if (*other_ack >= MAX_SEQ_ID || *other_ack < 0)
*other_ack = -1; *other_ack = -1;
gettimeofday(&now, NULL);
for (size_t i = 0; i < w->windowsize; i++) { for (size_t i = 0; i < w->windowsize; i++) {
f = &w->frags[WRAP(w->window_start + i)]; f = &w->frags[WRAP(w->window_start + i)];
if (f->acks >= 1) continue; if (f->acks >= 1) continue;
/* TODO: use timeval for more precise timeouts */
if (f->retries >= 1 && difftime(time(NULL), f->lastsent) >= ACK_TIMEOUT) { timeradd(&w->timeout, &f->lastsent, &timeout);
if (f->retries >= 1 && !timercmp(&now, &timeout, <)) {
/* Fragment sent before, not ACK'd */ /* Fragment sent before, not ACK'd */
WDEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends); WDEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends);
w->resends ++; w->resends ++;
@ -271,7 +290,6 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
/* Fragment not sent */ /* Fragment not sent */
goto found; goto found;
} }
} }
if (f) if (f)
WDEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)", WDEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)",
@ -286,7 +304,7 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
f->start &= 1; f->start &= 1;
f->end &= 1; f->end &= 1;
f->retries++; f->retries++;
time(&f->lastsent); gettimeofday(&f->lastsent, NULL);
return f; return f;
} }

View File

@ -19,7 +19,6 @@
#define MAX_SEQ_ID 256 #define MAX_SEQ_ID 256
#define MAX_FRAGSIZE 4096 #define MAX_FRAGSIZE 4096
#define ACK_TIMEOUT 5
#define WINDOW_SENDING 1 #define WINDOW_SENDING 1
#define WINDOW_RECVING 0 #define WINDOW_RECVING 0
@ -29,33 +28,34 @@
//#define WINDOW_DEBUG //#define WINDOW_DEBUG
typedef struct fragment { typedef struct fragment {
size_t len; /* Length of fragment data (0 if fragment unused) */ size_t len; /* Length of fragment data (0 if fragment unused) */
unsigned seqID; /* fragment sequence ID */ unsigned seqID; /* fragment sequence ID */
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */ int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
int compressed; /* compression flag */ int compressed; /* compression flag */
uint8_t start; /* start of chunk flag */ uint8_t start; /* start of chunk flag */
uint8_t end; /* end of chunk flag */ uint8_t end; /* end of chunk flag */
uint8_t data[MAX_FRAGSIZE]; /* fragment data */ uint8_t data[MAX_FRAGSIZE]; /* fragment data */
unsigned retries; /* number of times fragment has been sent */ unsigned retries; /* number of times fragment has been sent */
time_t lastsent; /* timestamp of most recent send attempt TODO: millisecond precision*/ struct timeval lastsent; /* timestamp of most recent send attempt */
int acks; /* number of times packet has been ack'd (should be <= 1) */ int acks; /* number of times packet has been ack'd (should be <= 1) */
} fragment; } fragment;
struct frag_buffer { struct frag_buffer {
fragment *frags; /* pointer to array of data fragments */ fragment *frags; /* pointer to array of data fragments */
unsigned windowsize; /* Max number of packets in flight */ unsigned windowsize; /* Max number of packets in flight */
unsigned maxfraglen; /* Max fragment size */ unsigned maxfraglen; /* Max fragment size */
size_t length; /* Length of buffer */ size_t length; /* Length of buffer */
size_t numitems; /* number of non-empty fragments stored in buffer */ size_t numitems; /* number of non-empty fragments stored in buffer */
size_t window_start; /* Start of window */ size_t window_start; /* Start of window */
size_t window_end; /* End of window (index) */ size_t window_end; /* End of window (index) */
size_t last_write; /* Last fragment read/written */ size_t last_write; /* Last fragment read/written */
size_t chunk_start; /* Start of current chunk of fragments, ie where fragno = 0 */ size_t chunk_start; /* Start of current chunk of fragments, ie where fragno = 0 */
unsigned cur_seq_id; /* Most recent sequence ID */ unsigned cur_seq_id; /* Most recent sequence ID */
unsigned start_seq_id; /* Start of window sequence ID */ unsigned start_seq_id; /* Start of window sequence ID */
unsigned resends; /* number of fragments resent */ unsigned resends; /* number of fragments resent */
unsigned oos; /* Number of out-of-sequence fragments received */ unsigned oos; /* Number of out-of-sequence fragments received */
int direction; /* Sending or recving */ int direction; /* Sending or recving */
struct timeval timeout; /* Fragment timeout before resend */
}; };
extern int window_debug; extern int window_debug;
@ -113,7 +113,7 @@ size_t window_buffer_available(struct frag_buffer *w);
int window_append_fragment(struct frag_buffer *w, fragment *src); int window_append_fragment(struct frag_buffer *w, fragment *src);
/* Handles fragment received from the sending side (RECV) */ /* Handles fragment received from the sending side (RECV) */
int window_process_incoming_fragment(struct frag_buffer *w, fragment *f); ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
/* Reassembles first complete sequence of fragments into data. (RECV) /* Reassembles first complete sequence of fragments into data. (RECV)
* Returns length of data reassembled, or 0 if no data reassembled */ * Returns length of data reassembled, or 0 if no data reassembled */