Fixed option handshake and query tracking (now works)

This commit is contained in:
frekky 2015-10-03 22:09:34 +08:00
parent f60660a7ce
commit 1b85d23087
2 changed files with 75 additions and 64 deletions

View File

@ -57,6 +57,7 @@
#include "tun.h" #include "tun.h"
#include "version.h" #include "version.h"
#include "window.h" #include "window.h"
#include "util.h"
#include "client.h" #include "client.h"
int debug; int debug;
@ -86,6 +87,7 @@ static int next_downstream_ack;
/* Remembering queries we sent for tracking purposes */ /* Remembering queries we sent for tracking purposes */
static struct query_tuple *pending_queries; static struct query_tuple *pending_queries;
static size_t num_pending;
static time_t max_timeout_ms; static time_t max_timeout_ms;
/* Server response timeout in ms */ /* Server response timeout in ms */
@ -310,49 +312,74 @@ client_rotate_nameserver()
} }
/* Client-side query tracking for lazy mode */ /* Client-side query tracking for lazy mode */
static int
num_pending() /* Handy macro for printing stats with messages */
#define QTRACK_DEBUG(l, ...) \
if (debug >= l) {\
fprintf(stderr, "[QTRACK (%lu/%lu), ? %lu, TO %lu, S %lu/%lu] ", num_pending, PENDING_QUERIES_LENGTH, \
num_untracked, num_timeouts, window_sending(outbuf), outbuf->numitems); \
fprintf(stderr, __VA_ARGS__);\
fprintf(stderr, "\n");\
}
static void
check_pending_queries()
/* Updates pending queries list */
{ {
int num = 0; num_pending = 0;
struct timeval now, qtimeout, max_timeout; struct timeval now, qtimeout, max_timeout;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
/* Max timeout for queries is max interval + 1 second extra */ /* Max timeout for queries is max interval + 1 second extra */
max_timeout.tv_sec = (max_timeout_ms / 1000) + 1; max_timeout = ms_to_timeval(max_timeout_ms);
max_timeout.tv_usec = (max_timeout_ms - max_timeout.tv_sec * 1000) * 1000;
for (int i = 0; i < PENDING_QUERIES_LENGTH; i++) { for (int i = 0; i < PENDING_QUERIES_LENGTH; i++) {
if (pending_queries[i].time.tv_sec > 0) { if (pending_queries[i].time.tv_sec > 0 && pending_queries[i].id >= 0) {
timeradd(&pending_queries[i].time, &max_timeout, &qtimeout); timeradd(&pending_queries[i].time, &max_timeout, &qtimeout);
if (timercmp(&qtimeout, &now, >)) { if (!timercmp(&qtimeout, &now, >)) {
num++;
} else {
/* Query has timed out, clear it */ /* Query has timed out, clear it */
pending_queries[i].time.tv_sec = 0; pending_queries[i].time.tv_sec = 0;
num_timeouts++; num_timeouts++;
} }
num_pending++;
} }
} }
return num;
} }
static void static void
query_sent_now(int id) query_sent_now(int id)
{ {
int i = 0, found = 0;
if (!pending_queries) if (!pending_queries)
return; return;
if (id < 0 || id > 65535) if (id < 0 || id > 65535)
return; return;
for (int i = 0; i < PENDING_QUERIES_LENGTH; i++) { /* Replace any empty queries first, then timed out ones if necessary */
if (pending_queries[i].time.tv_sec == 0) { for (i = 0; i < PENDING_QUERIES_LENGTH; i++) {
pending_queries[i].id = id; if (pending_queries[i].id < 0) {
gettimeofday(&pending_queries[i].time, NULL); found = 1;
id = -1;
break; break;
} }
} }
if (id > 0 && debug >= 1) if (!found) {
warnx("Too many queries sent! Failed to add id %d.", id); for (i = 0; i < PENDING_QUERIES_LENGTH; i++) {
if (pending_queries[i].time.tv_sec == 0) {
found = 1;
break;
}
}
}
/* if no slots found after both checks */
if (!found) {
QTRACK_DEBUG(1, "Buffer full! Failed to add id %d.", id);
} else {
/* Add query into found location */
pending_queries[i].id = id;
gettimeofday(&pending_queries[i].time, NULL);
num_pending ++;
QTRACK_DEBUG(4, "Adding query id %d into pending_queries[%d]", id, i);
id = -1;
}
} }
static void static void
@ -363,14 +390,16 @@ got_response(int id, int immediate)
static size_t num_rtt_timeouts; static size_t num_rtt_timeouts;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
if (debug >= 4) QTRACK_DEBUG(4, "Got answer id %d (%s)", id, immediate ? "immediate" : "lazy");
warnx("got_response: request id %d (%s)", id, immediate ? "immediate" : "lazy");
for (int i = 0; i < PENDING_QUERIES_LENGTH; i++) { for (int i = 0; i < PENDING_QUERIES_LENGTH; i++) {
if (id >= 0 && pending_queries[i].id == id) { if (id >= 0 && pending_queries[i].id == id) {
if (num_pending > 0)
num_pending--;
QTRACK_DEBUG(5, " found answer id %d in pending queries", id);
id = -1; id = -1;
if (pending_queries[i].time.tv_sec == 0 && num_timeouts > 0) { if (pending_queries[i].time.tv_sec == 0 && num_timeouts > 0) {
/* If query has timed out but is still stored */ /* If query has timed out but is still stored - just in case */
num_timeouts --; num_timeouts --;
immediate = 0; immediate = 0;
} }
@ -380,8 +409,7 @@ got_response(int id, int immediate)
This lets us determine and adjust server lazy response time This lets us determine and adjust server lazy response time
during the session much more accurately. */ during the session much more accurately. */
timersub(&now, &pending_queries[i].time, &rtt); timersub(&now, &pending_queries[i].time, &rtt);
rtt_ms = rtt.tv_sec * 1000 + rtt.tv_usec / 1000; rtt_total_ms += timeval_to_ms(&rtt);
rtt_total_ms += rtt_ms;
num_immediate++; num_immediate++;
if (autodetect_server_timeout) { if (autodetect_server_timeout) {
@ -404,11 +432,17 @@ got_response(int id, int immediate)
} }
} }
} }
/* Remove query info from buffer to mark it as answered */
pending_queries[i].id = -1;
pending_queries[i].time.tv_sec = 0;
break; break;
} }
} }
if (id > 0) if (id > 0) {
QTRACK_DEBUG(4, " got untracked response to id %d.", id);
num_untracked++; num_untracked++;
}
} }
static int static int
@ -521,12 +555,6 @@ send_packet(int fd, char cmd, const uint8_t *data, const size_t datalen)
return send_query(fd, buf); return send_query(fd, buf);
} }
static inline int
is_sending()
{
return window_sending(outbuf) > 0;
}
void void
send_ping(int fd, int ping_response, int ack) send_ping(int fd, int ping_response, int ack)
{ {
@ -1014,7 +1042,8 @@ tunnel_tun(int tun_fd, int dns_fd)
/* Check if outgoing buffer can hold data */ /* Check if outgoing buffer can hold data */
if (window_buffer_available(outbuf) < (read / MAX_FRAGSIZE) + 1) { if (window_buffer_available(outbuf) < (read / MAX_FRAGSIZE) + 1) {
if (debug >= 2) if (debug >= 2)
fprintf(stderr, " Outgoing buffer full (%lu/%lu), not adding data!\n", outbuf->numitems, outbuf->length); fprintf(stderr, " Outgoing buffer full (%lu/%lu), not adding data!\n",
outbuf->numitems, outbuf->length);
return -1; return -1;
} }
@ -1127,6 +1156,7 @@ tunnel_dns(int tun_fd, int dns_fd)
got_response(q.id, immediate); got_response(q.id, immediate);
window_ack(outbuf, f.ack_other); window_ack(outbuf, f.ack_other);
window_tick(outbuf);
/* In lazy mode, we shouldn't get immediate replies to our most-recent /* In lazy mode, we shouldn't get immediate replies to our most-recent
query, only during heavy data transfer. Since this means the server query, only during heavy data transfer. Since this means the server
@ -1185,7 +1215,7 @@ client_tunnel(int tun_fd, int dns_fd)
fd_set fds; fd_set fds;
int rv; int rv;
int i; int i;
int sending, pending; int sending;
if (conn != CONN_DNS_NULL) { if (conn != CONN_DNS_NULL) {
compression_up = 1; compression_up = 1;
@ -1206,31 +1236,24 @@ client_tunnel(int tun_fd, int dns_fd)
num_immediate = 1; num_immediate = 1;
num_timeouts = 0; num_timeouts = 0;
num_untracked = 0; num_untracked = 0;
num_pending = 0;
send_query_recvcnt = 0; send_query_recvcnt = 0;
send_query_sendcnt = 0; send_query_sendcnt = 0;
/* set default server timeout */ if (debug >= 5)
if (debug >= 4)
window_debug = debug - 3; window_debug = debug - 3;
while (running) { while (running) {
tv.tv_sec = max_timeout_ms / 1000; tv = ms_to_timeval(max_timeout_ms);
tv.tv_usec = (max_timeout_ms - tv.tv_sec * 1000) * 1000;
/* TODO: adjust min send interval based on DNS server droppiness /* TODO: adjust min send interval based on DNS server droppiness
* (eg. from sending lots of requests simultaneously) * (eg. from sending lots of requests simultaneously)
* TODO: adjust number of pending queries based on current data rate */ * TODO: adjust number of pending queries based on current data rate */
sending = window_sending(outbuf); sending = window_sending(outbuf);
pending = num_pending(); check_pending_queries();
if (sending || (pending < windowsize_down && lazymode) ) { if (sending || (num_pending < windowsize_down && lazymode) || next_downstream_ack >= 0) {
if (debug >= 3) {
warnx("Waiting to send %d frags or fill server lazy buffer with (%d - %lu) queries.",
sending, pending, windowsize_down);
}
/* Upstream data traffic */ /* Upstream data traffic */
if (sending) { if (sending > 0) {
/* More to send - next fragment */ /* More to send - next fragment */
send_next_frag(dns_fd); send_next_frag(dns_fd);
} else { } else {
@ -1238,6 +1261,7 @@ client_tunnel(int tun_fd, int dns_fd)
send_ping(dns_fd, 0, next_downstream_ack); send_ping(dns_fd, 0, next_downstream_ack);
next_downstream_ack = -1; next_downstream_ack = -1;
} }
QTRACK_DEBUG(3, "Sent a query to fill server lazy buffer to %lu.", windowsize_down);
tv.tv_sec = 0; tv.tv_sec = 0;
tv.tv_usec = 2000; tv.tv_usec = 2000;
@ -1271,20 +1295,7 @@ client_tunnel(int tun_fd, int dns_fd)
err(1, "select < 0"); err(1, "select < 0");
if (i == 0) { if (i == 0) {
/* TODO improve timeout handling based on stats */ /* TODO check number of timeouts and do something about it */
if (!window_sending(outbuf) && outbuf->numitems > 0) {
if (outbuf->resends < 3) {
send_next_frag(dns_fd);
} else {
outbuf->resends = 0;
send_ping(dns_fd, 1, -1);
}
} else {
send_ping(dns_fd, 0, next_downstream_ack);
next_downstream_ack = -1;
}
send_ping_soon = 0;
} else { } else {
if (FD_ISSET(tun_fd, &fds)) { if (FD_ISSET(tun_fd, &fds)) {
@ -2195,9 +2206,6 @@ handshake_switch_options(int dns_fd, int lazy, int compression, char denc)
} else if (strncmp("BADCODEC", in, 8) == 0) { } else if (strncmp("BADCODEC", in, 8) == 0) {
fprintf(stderr, "Server rejected the selected options.\n"); fprintf(stderr, "Server rejected the selected options.\n");
goto opt_revert; goto opt_revert;
} else if (strncasecmp(opts, in + 3, 3) != 0) {
fprintf(stderr, "Server failed to change options.\n");
goto opt_revert;
} }
fprintf(stderr, "Switched server options successfully. (%s)\n", opts); fprintf(stderr, "Switched server options successfully. (%s)\n", opts);
lazymode = lazy; lazymode = lazy;
@ -2214,8 +2222,11 @@ handshake_switch_options(int dns_fd, int lazy, int compression, char denc)
fprintf(stderr, "No reply from server on codec switch.\n"); fprintf(stderr, "No reply from server on codec switch.\n");
opt_revert: opt_revert:
fprintf(stderr, "Falling back to previous configuration, downstream codec %s.\n", comp_status = compression_down ? "enabled" : "disabled";
dataenc->name); lazy_status = lazymode ? "lazy" : "immediate";
fprintf(stderr, "Falling back to previous configuration: downstream codec %s, %s mode, compression %s.\n",
dataenc->name, lazy_status, comp_status);
} }
static int static int

View File

@ -20,7 +20,7 @@
extern int debug; extern int debug;
#define PENDING_QUERIES_LENGTH (MAX(windowsize_up, windowsize_down) * 2) #define PENDING_QUERIES_LENGTH (MAX(windowsize_up, windowsize_down) * 3)
struct query_tuple { struct query_tuple {
int id; /* DNS query / response ID */ int id; /* DNS query / response ID */