Server-side query-answer logic now handled by qmem_max_wait

This commit is contained in:
frekky 2015-10-03 22:11:58 +08:00
parent 51bf36c863
commit faf7d277a8
2 changed files with 173 additions and 125 deletions

View File

@ -44,6 +44,7 @@
#include "login.h"
#include "tun.h"
#include "fw_query.h"
#include "util.h"
#include "server.h"
#include "window.h"
@ -191,12 +192,7 @@ qmem_append(int dns_fd, int userid, struct query *q)
buf->start = (buf->start + 1) % QMEM_LEN;
}
if (debug >= 5) {
time_t dnstimeout_ms;
dnstimeout_ms = users[userid].dns_timeout.tv_sec * 1000;
dnstimeout_ms += users[userid].dns_timeout.tv_usec / 1000;
QMEM_DEBUG(5, userid, "add query ID %d, timeout %lu ms", q->id, dnstimeout_ms);
}
QMEM_DEBUG(5, userid, "add query ID %d, timeout %lu ms", q->id, timeval_to_ms(&users[userid].dns_timeout));
/* Copy query into buffer */
memcpy(&buf->queries[buf->end], q, sizeof(struct query));
@ -207,84 +203,130 @@ qmem_append(int dns_fd, int userid, struct query *q)
static void
qmem_answered(int userid)
/* Last query added has been answered */
/* Call when oldest/first/earliest query added has been answered */
{
struct query_buffer *buf;
size_t answered;
buf = &users[userid].qmem;
if (buf->num_pending == 0) {
/* Most likely caused by bugs somewhere else. */
QMEM_DEBUG(3, userid, "can't answer query that has already been answered! Fix bugs.");
return;
}
answered = buf->start_pending;
buf->start_pending = (buf->start_pending + 1) % QMEM_LEN;
if (buf->num_pending > 0)
buf->num_pending -= 1;
QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[buf->start_pending].id);
QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[answered].id);
}
static struct query *
struct query *
qmem_get_next_response(int userid)
/* Gets oldest query to be responded to (for lazy mode) or NULL if none available */
/* Gets oldest query to be responded to (for lazy mode) or NULL if none available
* The query is NOT marked as "answered" since that is done later. */
{
struct query_buffer *buf;
struct query *q;
buf = &users[userid].qmem;
if (buf->length == 0)
if (buf->length == 0 || buf->num_pending == 0)
return NULL;
q = &buf->queries[buf->start_pending];
buf->start_pending = (buf->start_pending + 1) % QMEM_LEN;
if (buf->num_pending > 0)
buf->num_pending -= 1;
QMEM_DEBUG(3, userid, "responding lazily to query ID %d", q->id);
QMEM_DEBUG(3, userid, "next response using cached query: ID %d", q->id);
return q;
}
static struct timeval
qmem_max_wait(int *touser, struct query **sendq)
/* Gets max interval before anything has to be sent to any user */
qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq)
/* Gets max interval before the next query has to be responded to
* Response(s) are sent automatically for queries if:
* - the query has timed out
* - the user has data to send, pending ACKs or ping and spare pending queries
* - the user has excess pending queries (>downstream window size)
* Returns largest safe time to wait before next timeout
* TODO respond to excess pending queries */
{
struct timeval now, timeout, soonest, tmp;
soonest.tv_sec = 10;
soonest.tv_usec = 0;
int userid, qnum, nextuser = -1;
struct query *q = NULL, *nextq = NULL;
size_t sending, total, sent;
struct tun_user *u;
gettimeofday(&now, NULL);
for (userid = 0; userid < created_users; userid++) {
if (!user_active(userid))
continue;
qnum = users[userid].qmem.start_pending;
if (users[userid].qmem.num_pending == 0 || !users[userid].lazy)
continue;
for (; qnum != users[userid].qmem.end; qnum = (qnum + 1) % QMEM_LEN) {
// TODO are queries always going to be in time order already?
q = &users[userid].qmem.queries[qnum];
timeradd(&q->time_recv, &users[userid].dns_timeout, &timeout);
if (!timercmp(&now, &timeout, <)) {
/* if timeout has been reached, must send ASAP */
soonest.tv_sec = 0;
soonest.tv_usec = 0;
nextuser = userid;
nextq = q;
/* no need to check other users */
userid = created_users;
break;
u = &users[userid];
qnum = u->qmem.start_pending;
if (u->qmem.num_pending == 0 || !u->lazy)
continue;
/* Keep track of how many fragments we can send */
total = window_sending(u->outgoing);
if (u->qmem.num_pending > u->outgoing->windowsize) {
/* calculate number of "excess" queries */
total = MAX(total, u->qmem.num_pending - u->outgoing->windowsize);
}
sending = total;
sent = 0;
for (; qnum != u->qmem.end; qnum = (qnum + 1) % QMEM_LEN) {
q = &u->qmem.queries[qnum];
/* queries will always be in time order */
timeradd(&q->time_recv, &u->dns_timeout, &timeout);
if (sending > 0 || !timercmp(&now, &timeout, <) ||
u->next_upstream_ack >= 0 || u->send_ping_next) {
/* respond to a query with ping/data if:
* - query has timed out (ping, or data if available)
* - user has pending data (always data)
* - user has pending ACK (either)
* - user has pending ping (always ping, with data if available) */
if (debug >= 3) {
struct timeval age;
timersub(&q->time_recv, &now, &age);
QMEM_DEBUG(3, userid, "Auto response to cached query: ID %d, %ld ms old, timeout %ld ms",
q->id, timeval_to_ms(&age), timeval_to_ms(&u->dns_timeout));
}
sent++;
QMEM_DEBUG(4, userid, "ANSWER q id %d, ping %d, ACK %d; sent %lu of %lu + sending another %lu",
q->id, u->send_ping_next, u->next_upstream_ack, sent, total, sending);
send_data_or_ping(dns_fds, userid, q, u->send_ping_next, 1, 0);
if (u->send_ping_next)
u->send_ping_next = 0;
if (sending > 0)
sending--;
continue;
}
timersub(&timeout, &now, &tmp);
if (timercmp(&tmp, &soonest, <)) {
/* time until timeout is smaller */
/* the oldest non-timed-out query in the buffer will be the
* soonest to timeout for this user; we can skip the rest */
soonest = tmp;
nextuser = userid;
nextq = q;
break;
}
}
}
if (debug >= 5) {
time_t soonest_ms = soonest.tv_sec * 1000;
soonest_ms += soonest.tv_usec / 1000;
if (nextq && nextuser > 0) {
time_t soonest_ms = timeval_to_ms(&soonest);
if (nextq && nextuser >= 0) {
QMEM_DEBUG(5, nextuser, "can wait for %lu ms, will send id %d", soonest_ms, nextq->id);
} else {
if (nextuser < 0)
nextuser = 0;
/* sanity check: soonest_ms should always be default value here (ie. 10000) */
QMEM_DEBUG(5, nextuser, "Don't need to send anything to any users, waiting %lu ms", soonest_ms);
}
}
@ -414,7 +456,7 @@ forward_query(int bind_fd, struct query *q)
myaddr->sin_port = htons(bind_port);
if (debug >= 2) {
fprintf(stderr, "TX: NS reply \n");
fprintf(stderr, "TX: NS reply\n");
}
if (sendto(bind_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) {
@ -449,35 +491,23 @@ send_version_response(int fd, version_ack_t ack, uint32_t payload, int userid, s
}
void
send_data_or_ping(int tun_fd, struct dnsfd *dns_fds, int userid, struct query *q,
send_data_or_ping(struct dnsfd *dns_fds, int userid, struct query *q,
int ping, int respond_now, int immediate)
/* Sends current fragment to user, or a ping if no data available.
ping: 1=force send ping (even if data available), 0=only send if no data.
respond_now: 1=must answer query now, 0=leave in qmem if no data available
immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem
Updates next_upstream_ack if new ACK needed. */
immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem */
{
uint8_t pkt[MAX_FRAGSIZE + DOWNSTREAM_PING_HDR];
size_t datalen, headerlen;
fragment *f;
int compressed = 0;
struct frag_buffer *out, *in;
in = users[userid].incoming;
out = users[userid].outgoing;
datalen = window_reassemble_data(in, pkt, sizeof(pkt), &compressed);
window_tick(in);
window_tick(out);
/* Update time info */
users[userid].last_pkt = time(NULL);
if (datalen > 0) {
/* Data reassembled successfully + cleared out of buffer */
handle_full_packet(tun_fd, dns_fds, userid, pkt, datalen, compressed);
}
f = window_get_next_sending_fragment(out, &users[userid].next_upstream_ack);
/* Build downstream data/ping header (see doc/proto_xxxxxxxx.txt) for details */
@ -529,40 +559,79 @@ send_data_or_ping(int tun_fd, struct dnsfd *dns_fds, int userid, struct query *q
save_to_dnscache(userid, q, (char *)pkt, datalen + headerlen);
#endif
#ifdef QMEM_LEN
/* mark query as answered */
qmem_answered(userid);
#endif
/* this query has been used */
q->id = 0;
window_tick(out);
}
static int
user_send_data(int userid, int tun_fd, struct dnsfd *dns_fds, uint8_t *data, size_t datalen, int compressed)
/* Appends data to a user's outgoing queue and sends it if queries are waiting */
void
user_process_incoming_data(int tun_fd, struct dnsfd *dns_fds, int userid, int ack)
{
struct query *q;
if (users[userid].conn == CONN_DNS_NULL) {
uint8_t pkt[65536];
size_t datalen;
int compressed = 0;
window_add_outgoing_data(users[userid].outgoing, data, datalen, compressed);
window_ack(users[userid].outgoing, ack);
window_tick(users[userid].outgoing);
/* Start sending immediately if queries are waiting */
#ifdef QMEM_LEN
while (users[userid].qmem.num_pending > 0 &&
window_sending(users[userid].outgoing)) {
q = qmem_get_next_response(userid);
if (q == NULL)
break;
send_data_or_ping(tun_fd, dns_fds, userid, q, 0, 1, 0);
datalen = window_reassemble_data(users[userid].incoming, pkt, sizeof(pkt), &compressed);
window_tick(users[userid].incoming);
/* Update time info */
users[userid].last_pkt = time(NULL);
if (datalen > 0) {
/* Data reassembled successfully + cleared out of buffer */
handle_full_packet(tun_fd, dns_fds, userid, pkt, datalen, compressed);
}
#endif
}
return datalen;
} else { /* CONN_RAW_UDP */
static int
user_send_data(int userid, struct dnsfd *dns_fds, uint8_t *indata,
size_t len, int compressed)
/* Appends data to a user's outgoing queue and sends it (in raw mode only) */
{
size_t datalen;
int ret = 0;
uint8_t out[65536], *data;
data = indata;
datalen = len;
/* use compressed or uncompressed packet to match user settings */
if (users[userid].down_compression && !compressed) {
datalen = sizeof(out);
compress2(out, &datalen, indata, len, 9);
data = out;
} else if (!users[userid].down_compression && compressed) {
datalen = sizeof(out);
ret = uncompress(out, &datalen, indata, len);
if (ret != Z_OK) {
if (debug >= 1) {
warnx("Uncompress == %d: %lu bytes to user %d!", ret, len, userid);
}
return 0;
}
}
compressed = users[userid].down_compression;
if (users[userid].conn == CONN_DNS_NULL && data && datalen) {
/* append new data to user's outgoing queue; sent later in qmem_max_wait */
ret = window_add_outgoing_data(users[userid].outgoing, data, datalen, compressed);
} else if (data && datalen) { /* CONN_RAW_UDP */
if (!compressed && debug >= 1) {
warnx("Sending in RAW mode uncompressed to user %d!", userid);
}
int dns_fd = get_dns_fd(dns_fds, &users[userid].host);
send_raw(dns_fd, data, datalen, userid, RAW_HDR_CMD_DATA,
&users[userid].host, users[userid].hostlen);
return datalen;
ret = 1;
}
return ret;
}
static int
@ -615,9 +684,7 @@ tunnel_bind(int bind_fd, struct dnsfd *dns_fds)
static int
tunnel_tun(int tun_fd, struct dnsfd *dns_fds)
{
unsigned long outlen;
struct ip *header;
static uint8_t out[64*1024];
static uint8_t in[64*1024];
int userid;
int read;
@ -635,14 +702,7 @@ tunnel_tun(int tun_fd, struct dnsfd *dns_fds)
fprintf(stderr, "IN: %d byte pkt from tun to user %d; compression %d\n",
read, userid, users[userid].down_compression);
if (users[userid].down_compression) {
outlen = sizeof(out);
compress2(out, &outlen, in, read, 9);
return user_send_data(userid, tun_fd, dns_fds, out, outlen, 1);
} else {
return user_send_data(userid, tun_fd, dns_fds, in, read, 0);
}
return user_send_data(userid, dns_fds, in, read, 0);
}
static int
@ -736,13 +796,8 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time)
while (running) {
int maxfd;
/* TODO: adjust time based on query timeouts (lazy mode) */
tv = qmem_max_wait(&userid, &answer_now);
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
/* We need to respond to an old query immediately; do so now. */
send_data_or_ping(tun_fd, dns_fds, userid, answer_now, 0, 1, 0);
}
/* max wait time based on pending queries */
tv = qmem_max_wait(dns_fds, &userid, &answer_now);
FD_ZERO(&fds);
maxfd = 0;
@ -762,8 +817,7 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time)
maxfd = MAX(bind_fd, maxfd);
}
/* Don't read from tun if no users can accept data anyway;
tun queue/TCP buffers are larger than our outgoing queues */
/* Don't read from tun if all users have filled outpacket queues */
if(!all_users_waiting_to_send()) {
FD_SET(tun_fd, &fds);
maxfd = MAX(tun_fd, maxfd);
@ -839,16 +893,11 @@ handle_full_packet(int tun_fd, struct dnsfd *dns_fds, int userid, uint8_t *data,
/* send the uncompressed packet to tun device */
write_tun(tun_fd, rawdata, rawlen);
} else {
/* use compressed or uncompressed packet to match user settings */
if (users[touser].down_compression) {
if (!compressed) {
len = sizeof(out);
compress2(out, &len, rawdata, rawlen, 9);
data = out;
}
user_send_data(touser, tun_fd, dns_fds, data, len, 1);
/* don't re-compress if possible */
if (users[touser].down_compression && compressed) {
user_send_data(touser, dns_fds, data, len, 1);
} else {
user_send_data(touser, tun_fd, dns_fds, rawdata, rawlen, 0);
user_send_data(touser, dns_fds, rawdata, rawlen, 0);
}
}
} else {
@ -936,7 +985,6 @@ raw_decode(uint8_t *packet, size_t len, struct query *q, int dns_fd, struct dnsf
{
int raw_user;
warnx("raw_decode len %lu", len);
/* minimum length */
if (len < RAW_HDR_LEN) return 0;
/* should start with header */
@ -1196,14 +1244,14 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
}
if(in[0] == 'V' || in[0] == 'v') { /* Version request */
uint32_t version;
uint32_t version = !PROTOCOL_VERSION;
read = unpack_data(unpacked, sizeof(unpacked), in + 1, domain_len - 1, b32);
/* Version greeting, compare and send ack/nak */
if (read > 4) {
/* Received V + 32bits version (network byte order) */
version = ntohl(*(uint32_t *) unpacked);
}
} /* if invalid pkt, just send VNAK */
if (version == PROTOCOL_VERSION) {
userid = find_available_user();
@ -1236,6 +1284,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
window_buffer_clear(u->outgoing);
window_buffer_clear(u->incoming);
u->next_upstream_ack = -1;
u->send_ping_next = 0;
#ifdef QMEM_LEN
qmem_init(userid);
#endif
@ -1390,7 +1439,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
} else if(in[0] == 'O' || in[0] == 'o') { /* Protocol options */
int bits = 0;
int numopts;
char num[2], *opts;
char *opts;
int tmp_lazy, tmp_downenc, tmp_comp;
if (domain_len < 7) { /* len at least 7, example: "oa1tcmc" */
@ -1405,12 +1454,11 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
return; /* illegal id */
}
num[0] = in[2];
num[1] = 0;
numopts = atoi(num);
numopts = in[2] - '0';
if (domain_len != numopts + 6 || numopts == 0) {
if (domain_len < numopts + 6 || numopts == 0 || numopts > 9) {
write_dns(dns_fd, q, "BADLEN", 6, 'T');
return; /* invalid packet */
}
/* Temporary variables: don't change anything until all options parsed */
@ -1605,12 +1653,10 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
unsigned timeout_ms;
struct timeval timeout;
/* We can't handle id=0, that's "no packet" to us. So drop
/* We can't handle id=0, that's "no packet" to the dnscache. So drop
request completely. Note that DNS servers rewrite the id.
We'll drop 1 in 64k times. If DNS server retransmits with
different id, then all okay.
Else client won't retransmit, and we'll just keep the
previous ping in cache, no problem either.
TODO don't use ID=0 to check if query */
if (q->id == 0)
return;
@ -1646,8 +1692,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
dn_seq = unpacked[5];
timeout_ms = ntohs(*(uint16_t *) (unpacked + 6));
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms - timeout.tv_sec * 1000) * 1000;
timeout = ms_to_timeval(timeout_ms);
respond = unpacked[8] & 1;
@ -1661,10 +1706,11 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
userid, dn_seq, dn_winsize, up_seq, up_winsize, dn_ack, timeout_ms, unpacked[8]);
}
window_ack(users[userid].outgoing, dn_ack);
user_process_incoming_data(tun_fd, dns_fds, userid, dn_ack);
/* Send resonse; q can be left in qmem if no data (q is still fresh) */
send_data_or_ping(tun_fd, dns_fds, userid, q, respond, 0, 1);
/* Leave query in qmem, response is done in qmem_max_wait.
* Set the ping flag if it needs to respond */
users[userid].send_ping_next = respond;
} else if (isxdigit(in[0])) { /* Upstream data packet */
int code = 0;
@ -1729,17 +1775,17 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
warnx("frag seq %3u, datalen %5lu, ACK %3d, compression %1d, s%1d e%1d",
f.seqID, f.len, f.ack_other, f.compressed, f.start, f.end);
/* if waiting for an ACK to be sent back upstream (on incoming buffer) */
/* if already waiting for an ACK to be sent back upstream (on incoming buffer) */
if (users[userid].next_upstream_ack >= 0) {
/* Shouldn't normally happen; will always be reset after sending a packet. */
warnx("[WARNING] next_upstream_ack == %d for user %d.", users[userid].next_upstream_ack, userid);
}
users[userid].next_upstream_ack = window_process_incoming_fragment(users[userid].incoming, &f);
window_ack(users[userid].outgoing, f.ack_other);
user_process_incoming_data(tun_fd, dns_fds, userid, f.ack_other);
/* Respond/ACK data packet immediately; query is fresh */
send_data_or_ping(tun_fd, dns_fds, userid, q, 0, 1, 1);
/* Nothing to do. ACK (and response to this query) is sent
* later in qmem_max_wait. */
}
}
@ -1765,7 +1811,7 @@ handle_ns_request(int dns_fd, struct query *q)
}
if (debug >= 2) {
fprintf(stderr, "TX: client %s ID %5d, type %d, name %s, %d bytes NS reply\n",
fprintf(stderr, "TX: NS reply client %s ID %5d, type %d, name %s, %d bytes\n",
format_addr(&q->from, q->fromlen), q->id, q->type, q->name, len);
}
if (sendto(dns_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) {
@ -1799,7 +1845,7 @@ handle_a_request(int dns_fd, struct query *q, int fakeip)
}
if (debug >= 2) {
fprintf(stderr, "TX: client %s ID %5d, type %d, name %s, %d bytes A reply\n",
fprintf(stderr, "TX: A reply client %s ID %5d, type %d, name %s, %d bytes\n",
format_addr(&q->from, q->fromlen), q->id, q->type, q->name, len);
}
if (sendto(dns_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) {

View File

@ -124,4 +124,6 @@ void handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct q
void handle_ns_request(int dns_fd, struct query *q);
void handle_a_request(int dns_fd, struct query *q, int fakeip);
void send_data_or_ping(struct dnsfd *, int, struct query *, int, int, int);
#endif /* __SERVER_H__ */