mirror of
https://github.com/yarrick/iodine.git
synced 2024-11-25 11:05:15 +00:00
Added clear, reset and number of fragments to be sent; better debugging
This commit is contained in:
parent
33525e5086
commit
bd9966836e
105
src/window.c
105
src/window.c
@ -25,6 +25,8 @@
|
|||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "window.h"
|
#include "window.h"
|
||||||
|
|
||||||
|
int window_debug = 0;
|
||||||
|
|
||||||
struct frag_buffer *
|
struct frag_buffer *
|
||||||
window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir)
|
window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir)
|
||||||
{
|
{
|
||||||
@ -53,6 +55,20 @@ window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int di
|
|||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
window_buffer_reset(struct frag_buffer *w)
|
||||||
|
{
|
||||||
|
w->chunk_start = 0;
|
||||||
|
w->cur_seq_id = 0;
|
||||||
|
w->last_write = 0;
|
||||||
|
w->numitems = 0;
|
||||||
|
w->oos = 0;
|
||||||
|
w->resends = 0;
|
||||||
|
w->start_seq_id = 0;
|
||||||
|
w->window_start = 0;
|
||||||
|
w->window_end = AFTER(w, w->windowsize);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
window_buffer_resize(struct frag_buffer *w, size_t length)
|
window_buffer_resize(struct frag_buffer *w, size_t length)
|
||||||
{
|
{
|
||||||
@ -66,11 +82,7 @@ window_buffer_resize(struct frag_buffer *w, size_t length)
|
|||||||
errx(1, "Failed to resize window buffer!");
|
errx(1, "Failed to resize window buffer!");
|
||||||
}
|
}
|
||||||
w->length = length;
|
w->length = length;
|
||||||
w->numitems = 0;
|
window_buffer_reset(w);
|
||||||
w->window_start = 0;
|
|
||||||
w->start_seq_id = 0;
|
|
||||||
w->cur_seq_id = 0;
|
|
||||||
w->window_end = AFTER(w, w->windowsize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -81,6 +93,15 @@ window_buffer_destroy(struct frag_buffer *w)
|
|||||||
free(w);
|
free(w);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
window_buffer_clear(struct frag_buffer *w)
|
||||||
|
{
|
||||||
|
if (!w) return;
|
||||||
|
|
||||||
|
memset(w->frags, 0, w->length * sizeof(fragment));
|
||||||
|
window_buffer_reset(w);
|
||||||
|
}
|
||||||
|
|
||||||
/* Returns number of available fragment slots (NOT BYTES) */
|
/* Returns number of available fragment slots (NOT BYTES) */
|
||||||
size_t
|
size_t
|
||||||
window_buffer_available(struct frag_buffer *w)
|
window_buffer_available(struct frag_buffer *w)
|
||||||
@ -99,7 +120,8 @@ window_append_fragment(struct frag_buffer *w, fragment *src)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handles fragment received from the sending side (RECV) */
|
/* Handles fragment received from the sending side (RECV)
|
||||||
|
* Returns seq ID of packet to be ACKed immediately */
|
||||||
int
|
int
|
||||||
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
{
|
{
|
||||||
@ -109,8 +131,11 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
|||||||
startid = w->start_seq_id;
|
startid = w->start_seq_id;
|
||||||
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
|
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
|
||||||
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
|
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
|
||||||
DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n", f->seqID, startid, endid);
|
DEBUG("Dropping frag with seqID %u: not in window (%u-%u)\n",
|
||||||
return 0;
|
f->seqID, startid, endid);
|
||||||
|
w->oos++;
|
||||||
|
/* ACK duplicate so sender can move on ASAP */
|
||||||
|
return f->seqID;
|
||||||
}
|
}
|
||||||
/* Place fragment into correct location in buffer */
|
/* Place fragment into correct location in buffer */
|
||||||
size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
|
size_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
|
||||||
@ -119,14 +144,17 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
|||||||
fd = &w->frags[dest];
|
fd = &w->frags[dest];
|
||||||
if (fd->len != 0) {
|
if (fd->len != 0) {
|
||||||
DEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
|
DEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
|
||||||
return 0;
|
if (f->seqID == fd->seqID)
|
||||||
|
return f->seqID;
|
||||||
}
|
}
|
||||||
memcpy(fd, f, sizeof(fragment));
|
memcpy(fd, f, sizeof(fragment));
|
||||||
fd->retries = 0;
|
fd->retries = 0;
|
||||||
fd->ack_other = -1;
|
fd->ack_other = -1;
|
||||||
fd->acks = 0;
|
/* We assume this packet gets ACKed immediately on return of this function */
|
||||||
|
fd->acks = 1;
|
||||||
w->numitems ++;
|
w->numitems ++;
|
||||||
return 1;
|
|
||||||
|
return f->seqID;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reassembles first complete sequence of fragments into data. (RECV)
|
/* Reassembles first complete sequence of fragments into data. (RECV)
|
||||||
@ -139,7 +167,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
|
|||||||
dest = data;
|
dest = data;
|
||||||
if (w->direction != WINDOW_RECVING)
|
if (w->direction != WINDOW_RECVING)
|
||||||
return 0;
|
return 0;
|
||||||
if (w->frags[w->chunk_start].start == 0) {
|
if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) {
|
||||||
DEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!",
|
DEBUG("chunk_start (%lu)pointing to non-start fragment (seq %u, len %lu)!",
|
||||||
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
|
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
|
||||||
return 0;
|
return 0;
|
||||||
@ -191,29 +219,50 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
|
|||||||
if (end == 0) { /* no end of chunk found but reached end of data */
|
if (end == 0) { /* no end of chunk found but reached end of data */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
DEBUG("Reassembling %lu bytes of data from %lu frags!", datalen, i + 1);
|
DEBUG("Reassembling %lu bytes of data from %lu frags; compression %d!", datalen, i + 1, *compression);
|
||||||
/* Clear all used fragments */
|
/* Clear all used fragments */
|
||||||
size_t p;
|
size_t p;
|
||||||
ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, {
|
ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p,
|
||||||
memset(&w->frags[p], 0, sizeof(fragment));
|
memset(&w->frags[p], 0, sizeof(fragment));
|
||||||
});
|
);
|
||||||
w->chunk_start = WRAP(woffs + 1);
|
w->chunk_start = WRAP(woffs + 1);
|
||||||
w->numitems -= i + 1;
|
w->numitems -= i + 1;
|
||||||
return datalen;
|
return datalen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns number of fragments that can be sent immediately; effectively
|
||||||
|
* the same as window_get_next_sending_fragment but without changing anything. */
|
||||||
|
int
|
||||||
|
window_sending(struct frag_buffer *w)
|
||||||
|
{
|
||||||
|
fragment *f;
|
||||||
|
int tosend = 0;
|
||||||
|
if (w->numitems == 0)
|
||||||
|
return 0;
|
||||||
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
|
f = &w->frags[WRAP(w->window_start + i)];
|
||||||
|
if (f->len == 0 || f->acks >= 1) continue;
|
||||||
|
if ((f->retries == 0) != (difftime(time(NULL), f->lastsent) > ACK_TIMEOUT)) {
|
||||||
|
/* Fragment not sent xor timed out (to be re-sent) */
|
||||||
|
tosend++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tosend;
|
||||||
|
}
|
||||||
|
|
||||||
/* Returns next fragment to be sent or NULL if nothing (SEND)
|
/* Returns next fragment to be sent or NULL if nothing (SEND)
|
||||||
* This also handles packet resends, timeouts etc. */
|
* This also handles packet resends, timeouts etc. */
|
||||||
fragment *
|
fragment *
|
||||||
window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
||||||
{
|
{
|
||||||
fragment *f;
|
fragment *f = NULL;
|
||||||
if (*other_ack >= MAX_SEQ_ID || *other_ack < 0)
|
if (*other_ack >= MAX_SEQ_ID || *other_ack < 0)
|
||||||
*other_ack = -1;
|
*other_ack = -1;
|
||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
f = &w->frags[WRAP(w->window_start + i)];
|
f = &w->frags[WRAP(w->window_start + i)];
|
||||||
if (f->acks >= 1) continue;
|
if (f->acks >= 1) continue;
|
||||||
if (f->retries >= 1 && difftime(f->lastsent, time(NULL)) > ACK_TIMEOUT) {
|
/* TODO: use timeval for more precise timeouts */
|
||||||
|
if (f->retries >= 1 && difftime(time(NULL), f->lastsent) > ACK_TIMEOUT) {
|
||||||
/* Fragment sent before, not ACK'd */
|
/* Fragment sent before, not ACK'd */
|
||||||
DEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends);
|
DEBUG("Sending fragment %u again, %u retries so far, %u resent overall\n", f->seqID, f->retries, w->resends);
|
||||||
w->resends ++;
|
w->resends ++;
|
||||||
@ -224,16 +273,16 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)",
|
if (f)
|
||||||
f->retries, f->seqID, f->len);
|
DEBUG("Not sending any fragments (last frag checked: retries %u, seqid %u, len %lu)",
|
||||||
// TODO: statistics for packet loss/not sending etc
|
f->retries, f->seqID, f->len);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
found:
|
found:
|
||||||
/* store other ACK into fragment so ACK is resent if fragment times out */
|
/* store other ACK into fragment for sending; ignore any previous values.
|
||||||
if (f->ack_other == -1)
|
Don't resend ACKs because by the time we do, the other end will have
|
||||||
f->ack_other = *other_ack, *other_ack = -1;
|
resent the corresponding fragment so may as well not cause trouble. */
|
||||||
f->is_nack &= 1;
|
f->ack_other = *other_ack, *other_ack = -1;
|
||||||
f->start &= 1;
|
f->start &= 1;
|
||||||
f->end &= 1;
|
f->end &= 1;
|
||||||
f->retries++;
|
f->retries++;
|
||||||
@ -265,7 +314,8 @@ window_ack(struct frag_buffer *w, int seqid)
|
|||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
f = &w->frags[AFTER(w, i)];
|
f = &w->frags[AFTER(w, i)];
|
||||||
if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */
|
if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */
|
||||||
if (f->acks > 0) DEBUG("ACK: %d ACKs for seqId %u", f->acks, seqid);
|
if (f->acks > 0)
|
||||||
|
DEBUG("DUPE ACK: %d ACKs for seqId %u", f->acks, seqid);
|
||||||
f->acks ++;
|
f->acks ++;
|
||||||
DEBUG(" ACK frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end);
|
DEBUG(" ACK frag seq %u, ACKs %u, len %lu, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end);
|
||||||
break;
|
break;
|
||||||
@ -309,7 +359,7 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c
|
|||||||
compressed &= 1;
|
compressed &= 1;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
static fragment f;
|
static fragment f;
|
||||||
// DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen);
|
DEBUG("add data len %lu, %lu frags, max fragsize %u", len, n, w->maxfraglen);
|
||||||
for (size_t i = 0; i < n; i++) {
|
for (size_t i = 0; i < n; i++) {
|
||||||
memset(&f, 0, sizeof(f));
|
memset(&f, 0, sizeof(f));
|
||||||
f.len = MIN(len - offset, w->maxfraglen);
|
f.len = MIN(len - offset, w->maxfraglen);
|
||||||
@ -321,7 +371,6 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c
|
|||||||
f.ack_other = -1;
|
f.ack_other = -1;
|
||||||
window_append_fragment(w, &f);
|
window_append_fragment(w, &f);
|
||||||
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
|
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
|
||||||
// DEBUG(" a = %u, b = %u, a %% b = %u", (len - offset), (w->maxfraglen + 1), (len - offset) % (w->maxfraglen + 1));
|
|
||||||
DEBUG(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset);
|
DEBUG(" fragment len %lu, seqID %u, s %u, end %u, dOffs %lu", f.len, f.seqID, f.start, f.end, offset);
|
||||||
offset += f.len;
|
offset += f.len;
|
||||||
}
|
}
|
||||||
|
17
src/window.h
17
src/window.h
@ -32,7 +32,6 @@ typedef struct fragment {
|
|||||||
size_t len; /* Length of fragment data (0 if fragment unused) */
|
size_t len; /* Length of fragment data (0 if fragment unused) */
|
||||||
unsigned seqID; /* fragment sequence ID */
|
unsigned seqID; /* fragment sequence ID */
|
||||||
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
|
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
|
||||||
int is_nack; /* 1 if other way ACK is a NACK */
|
|
||||||
int compressed; /* compression flag */
|
int compressed; /* compression flag */
|
||||||
uint8_t start; /* start of chunk flag */
|
uint8_t start; /* start of chunk flag */
|
||||||
uint8_t end; /* end of chunk flag */
|
uint8_t end; /* end of chunk flag */
|
||||||
@ -55,11 +54,14 @@ struct frag_buffer {
|
|||||||
unsigned cur_seq_id; /* Most recent sequence ID */
|
unsigned cur_seq_id; /* Most recent sequence ID */
|
||||||
unsigned start_seq_id; /* Start of window sequence ID */
|
unsigned start_seq_id; /* Start of window sequence ID */
|
||||||
unsigned resends; /* number of fragments resent */
|
unsigned resends; /* number of fragments resent */
|
||||||
|
unsigned oos; /* Number of out-of-sequence fragments received */
|
||||||
int direction; /* Sending or recving */
|
int direction; /* Sending or recving */
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef WINDOW_DEBUG
|
extern int window_debug;
|
||||||
#define DEBUG(msg, ...) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__)
|
|
||||||
|
#ifdef DEBUG_BUILD
|
||||||
|
#define DEBUG(msg, ...) if (window_debug) fprintf(stderr, "[WINDOW-DEBUG] (%s:%d) " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__)
|
||||||
#else
|
#else
|
||||||
#define DEBUG(msg, ...)
|
#define DEBUG(msg, ...)
|
||||||
#endif
|
#endif
|
||||||
@ -94,6 +96,12 @@ struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsig
|
|||||||
void window_buffer_resize(struct frag_buffer *w, size_t length);
|
void window_buffer_resize(struct frag_buffer *w, size_t length);
|
||||||
void window_buffer_destroy(struct frag_buffer *w);
|
void window_buffer_destroy(struct frag_buffer *w);
|
||||||
|
|
||||||
|
/* Clears fragments and resets window stats */
|
||||||
|
void window_buffer_clear(struct frag_buffer *w);
|
||||||
|
|
||||||
|
/* Resets window stats without clearing fragments */
|
||||||
|
void window_buffer_reset(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Returns number of available fragment slots (NOT BYTES) */
|
/* Returns number of available fragment slots (NOT BYTES) */
|
||||||
size_t window_buffer_available(struct frag_buffer *w);
|
size_t window_buffer_available(struct frag_buffer *w);
|
||||||
|
|
||||||
@ -107,6 +115,9 @@ int window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
|
|||||||
* Returns length of data reassembled, or 0 if no data reassembled */
|
* Returns length of data reassembled, or 0 if no data reassembled */
|
||||||
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression);
|
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression);
|
||||||
|
|
||||||
|
/* Returns number of fragments to be sent */
|
||||||
|
int window_sending(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Returns next fragment to be sent or NULL if nothing (SEND) */
|
/* Returns next fragment to be sent or NULL if nothing (SEND) */
|
||||||
fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack);
|
fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user