From faf7d277a8dad9943b835c1c394d9162652063a2 Mon Sep 17 00:00:00 2001 From: frekky Date: Sat, 3 Oct 2015 22:11:58 +0800 Subject: [PATCH] Server-side query-answer logic now handled by qmem_max_wait --- src/server.c | 296 +++++++++++++++++++++++++++++---------------------- src/server.h | 2 + 2 files changed, 173 insertions(+), 125 deletions(-) diff --git a/src/server.c b/src/server.c index f26e076..5b9de9f 100644 --- a/src/server.c +++ b/src/server.c @@ -44,6 +44,7 @@ #include "login.h" #include "tun.h" #include "fw_query.h" +#include "util.h" #include "server.h" #include "window.h" @@ -191,12 +192,7 @@ qmem_append(int dns_fd, int userid, struct query *q) buf->start = (buf->start + 1) % QMEM_LEN; } - if (debug >= 5) { - time_t dnstimeout_ms; - dnstimeout_ms = users[userid].dns_timeout.tv_sec * 1000; - dnstimeout_ms += users[userid].dns_timeout.tv_usec / 1000; - QMEM_DEBUG(5, userid, "add query ID %d, timeout %lu ms", q->id, dnstimeout_ms); - } + QMEM_DEBUG(5, userid, "add query ID %d, timeout %lu ms", q->id, timeval_to_ms(&users[userid].dns_timeout)); /* Copy query into buffer */ memcpy(&buf->queries[buf->end], q, sizeof(struct query)); @@ -207,84 +203,130 @@ qmem_append(int dns_fd, int userid, struct query *q) static void qmem_answered(int userid) -/* Last query added has been answered */ +/* Call when oldest/first/earliest query added has been answered */ { struct query_buffer *buf; + size_t answered; buf = &users[userid].qmem; - buf->start_pending = (buf->start_pending + 1) % QMEM_LEN; - if (buf->num_pending > 0) - buf->num_pending -= 1; - QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[buf->start_pending].id); + if (buf->num_pending == 0) { + /* Most likely caused by bugs somewhere else. */ + QMEM_DEBUG(3, userid, "can't answer query that has already been answered! Fix bugs."); + return; + } + answered = buf->start_pending; + buf->start_pending = (buf->start_pending + 1) % QMEM_LEN; + buf->num_pending -= 1; + + QMEM_DEBUG(3, userid, "query ID %d answered", buf->queries[answered].id); } -static struct query * +struct query * qmem_get_next_response(int userid) -/* Gets oldest query to be responded to (for lazy mode) or NULL if none available */ +/* Gets oldest query to be responded to (for lazy mode) or NULL if none available + * The query is NOT marked as "answered" since that is done later. */ { struct query_buffer *buf; struct query *q; buf = &users[userid].qmem; - if (buf->length == 0) + if (buf->length == 0 || buf->num_pending == 0) return NULL; q = &buf->queries[buf->start_pending]; - buf->start_pending = (buf->start_pending + 1) % QMEM_LEN; - if (buf->num_pending > 0) - buf->num_pending -= 1; - QMEM_DEBUG(3, userid, "responding lazily to query ID %d", q->id); + QMEM_DEBUG(3, userid, "next response using cached query: ID %d", q->id); return q; } static struct timeval -qmem_max_wait(int *touser, struct query **sendq) -/* Gets max interval before anything has to be sent to any user */ +qmem_max_wait(struct dnsfd *dns_fds, int *touser, struct query **sendq) +/* Gets max interval before the next query has to be responded to + * Response(s) are sent automatically for queries if: + * - the query has timed out + * - the user has data to send, pending ACKs or ping and spare pending queries + * - the user has excess pending queries (>downstream window size) + * Returns largest safe time to wait before next timeout + * TODO respond to excess pending queries */ { struct timeval now, timeout, soonest, tmp; soonest.tv_sec = 10; soonest.tv_usec = 0; int userid, qnum, nextuser = -1; struct query *q = NULL, *nextq = NULL; + size_t sending, total, sent; + struct tun_user *u; gettimeofday(&now, NULL); for (userid = 0; userid < created_users; userid++) { if (!user_active(userid)) continue; - qnum = users[userid].qmem.start_pending; - if (users[userid].qmem.num_pending == 0 || !users[userid].lazy) - continue; - for (; qnum != users[userid].qmem.end; qnum = (qnum + 1) % QMEM_LEN) { - // TODO are queries always going to be in time order already? - q = &users[userid].qmem.queries[qnum]; - timeradd(&q->time_recv, &users[userid].dns_timeout, &timeout); - if (!timercmp(&now, &timeout, <)) { - /* if timeout has been reached, must send ASAP */ - soonest.tv_sec = 0; - soonest.tv_usec = 0; - nextuser = userid; - nextq = q; - /* no need to check other users */ - userid = created_users; - break; + u = &users[userid]; + qnum = u->qmem.start_pending; + + if (u->qmem.num_pending == 0 || !u->lazy) + continue; + /* Keep track of how many fragments we can send */ + total = window_sending(u->outgoing); + if (u->qmem.num_pending > u->outgoing->windowsize) { + /* calculate number of "excess" queries */ + total = MAX(total, u->qmem.num_pending - u->outgoing->windowsize); + } + sending = total; + sent = 0; + + for (; qnum != u->qmem.end; qnum = (qnum + 1) % QMEM_LEN) { + q = &u->qmem.queries[qnum]; + + /* queries will always be in time order */ + timeradd(&q->time_recv, &u->dns_timeout, &timeout); + if (sending > 0 || !timercmp(&now, &timeout, <) || + u->next_upstream_ack >= 0 || u->send_ping_next) { + /* respond to a query with ping/data if: + * - query has timed out (ping, or data if available) + * - user has pending data (always data) + * - user has pending ACK (either) + * - user has pending ping (always ping, with data if available) */ + + if (debug >= 3) { + struct timeval age; + timersub(&q->time_recv, &now, &age); + QMEM_DEBUG(3, userid, "Auto response to cached query: ID %d, %ld ms old, timeout %ld ms", + q->id, timeval_to_ms(&age), timeval_to_ms(&u->dns_timeout)); + } + + sent++; + QMEM_DEBUG(4, userid, "ANSWER q id %d, ping %d, ACK %d; sent %lu of %lu + sending another %lu", + q->id, u->send_ping_next, u->next_upstream_ack, sent, total, sending); + + send_data_or_ping(dns_fds, userid, q, u->send_ping_next, 1, 0); + + if (u->send_ping_next) + u->send_ping_next = 0; + + if (sending > 0) + sending--; + continue; } + timersub(&timeout, &now, &tmp); if (timercmp(&tmp, &soonest, <)) { - /* time until timeout is smaller */ + /* the oldest non-timed-out query in the buffer will be the + * soonest to timeout for this user; we can skip the rest */ soonest = tmp; nextuser = userid; nextq = q; + break; } } } if (debug >= 5) { - time_t soonest_ms = soonest.tv_sec * 1000; - soonest_ms += soonest.tv_usec / 1000; - if (nextq && nextuser > 0) { + time_t soonest_ms = timeval_to_ms(&soonest); + if (nextq && nextuser >= 0) { QMEM_DEBUG(5, nextuser, "can wait for %lu ms, will send id %d", soonest_ms, nextq->id); } else { if (nextuser < 0) nextuser = 0; + /* sanity check: soonest_ms should always be default value here (ie. 10000) */ QMEM_DEBUG(5, nextuser, "Don't need to send anything to any users, waiting %lu ms", soonest_ms); } } @@ -414,7 +456,7 @@ forward_query(int bind_fd, struct query *q) myaddr->sin_port = htons(bind_port); if (debug >= 2) { - fprintf(stderr, "TX: NS reply \n"); + fprintf(stderr, "TX: NS reply\n"); } if (sendto(bind_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) { @@ -449,35 +491,23 @@ send_version_response(int fd, version_ack_t ack, uint32_t payload, int userid, s } void -send_data_or_ping(int tun_fd, struct dnsfd *dns_fds, int userid, struct query *q, +send_data_or_ping(struct dnsfd *dns_fds, int userid, struct query *q, int ping, int respond_now, int immediate) /* Sends current fragment to user, or a ping if no data available. ping: 1=force send ping (even if data available), 0=only send if no data. respond_now: 1=must answer query now, 0=leave in qmem if no data available - immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem - Updates next_upstream_ack if new ACK needed. */ + immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem */ { uint8_t pkt[MAX_FRAGSIZE + DOWNSTREAM_PING_HDR]; size_t datalen, headerlen; fragment *f; - int compressed = 0; struct frag_buffer *out, *in; in = users[userid].incoming; out = users[userid].outgoing; - datalen = window_reassemble_data(in, pkt, sizeof(pkt), &compressed); - window_tick(in); window_tick(out); - /* Update time info */ - users[userid].last_pkt = time(NULL); - - if (datalen > 0) { - /* Data reassembled successfully + cleared out of buffer */ - handle_full_packet(tun_fd, dns_fds, userid, pkt, datalen, compressed); - } - f = window_get_next_sending_fragment(out, &users[userid].next_upstream_ack); /* Build downstream data/ping header (see doc/proto_xxxxxxxx.txt) for details */ @@ -529,40 +559,79 @@ send_data_or_ping(int tun_fd, struct dnsfd *dns_fds, int userid, struct query *q save_to_dnscache(userid, q, (char *)pkt, datalen + headerlen); #endif #ifdef QMEM_LEN + /* mark query as answered */ qmem_answered(userid); #endif - /* this query has been used */ - q->id = 0; window_tick(out); } -static int -user_send_data(int userid, int tun_fd, struct dnsfd *dns_fds, uint8_t *data, size_t datalen, int compressed) -/* Appends data to a user's outgoing queue and sends it if queries are waiting */ +void +user_process_incoming_data(int tun_fd, struct dnsfd *dns_fds, int userid, int ack) { - struct query *q; - if (users[userid].conn == CONN_DNS_NULL) { + uint8_t pkt[65536]; + size_t datalen; + int compressed = 0; - window_add_outgoing_data(users[userid].outgoing, data, datalen, compressed); + window_ack(users[userid].outgoing, ack); + window_tick(users[userid].outgoing); - /* Start sending immediately if queries are waiting */ -#ifdef QMEM_LEN - while (users[userid].qmem.num_pending > 0 && - window_sending(users[userid].outgoing)) { - q = qmem_get_next_response(userid); - if (q == NULL) - break; - send_data_or_ping(tun_fd, dns_fds, userid, q, 0, 1, 0); + datalen = window_reassemble_data(users[userid].incoming, pkt, sizeof(pkt), &compressed); + window_tick(users[userid].incoming); + + /* Update time info */ + users[userid].last_pkt = time(NULL); + + if (datalen > 0) { + /* Data reassembled successfully + cleared out of buffer */ + handle_full_packet(tun_fd, dns_fds, userid, pkt, datalen, compressed); + } +} + +static int +user_send_data(int userid, struct dnsfd *dns_fds, uint8_t *indata, + size_t len, int compressed) +/* Appends data to a user's outgoing queue and sends it (in raw mode only) */ +{ + size_t datalen; + int ret = 0; + uint8_t out[65536], *data; + + data = indata; + datalen = len; + + /* use compressed or uncompressed packet to match user settings */ + if (users[userid].down_compression && !compressed) { + datalen = sizeof(out); + compress2(out, &datalen, indata, len, 9); + data = out; + } else if (!users[userid].down_compression && compressed) { + datalen = sizeof(out); + ret = uncompress(out, &datalen, indata, len); + if (ret != Z_OK) { + if (debug >= 1) { + warnx("Uncompress == %d: %lu bytes to user %d!", ret, len, userid); + } + return 0; } -#endif + } - return datalen; - } else { /* CONN_RAW_UDP */ + compressed = users[userid].down_compression; + + if (users[userid].conn == CONN_DNS_NULL && data && datalen) { + /* append new data to user's outgoing queue; sent later in qmem_max_wait */ + ret = window_add_outgoing_data(users[userid].outgoing, data, datalen, compressed); + + } else if (data && datalen) { /* CONN_RAW_UDP */ + if (!compressed && debug >= 1) { + warnx("Sending in RAW mode uncompressed to user %d!", userid); + } int dns_fd = get_dns_fd(dns_fds, &users[userid].host); send_raw(dns_fd, data, datalen, userid, RAW_HDR_CMD_DATA, &users[userid].host, users[userid].hostlen); - return datalen; + ret = 1; } + + return ret; } static int @@ -615,9 +684,7 @@ tunnel_bind(int bind_fd, struct dnsfd *dns_fds) static int tunnel_tun(int tun_fd, struct dnsfd *dns_fds) { - unsigned long outlen; struct ip *header; - static uint8_t out[64*1024]; static uint8_t in[64*1024]; int userid; int read; @@ -635,14 +702,7 @@ tunnel_tun(int tun_fd, struct dnsfd *dns_fds) fprintf(stderr, "IN: %d byte pkt from tun to user %d; compression %d\n", read, userid, users[userid].down_compression); - if (users[userid].down_compression) { - outlen = sizeof(out); - compress2(out, &outlen, in, read, 9); - - return user_send_data(userid, tun_fd, dns_fds, out, outlen, 1); - } else { - return user_send_data(userid, tun_fd, dns_fds, in, read, 0); - } + return user_send_data(userid, dns_fds, in, read, 0); } static int @@ -736,13 +796,8 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time) while (running) { int maxfd; - /* TODO: adjust time based on query timeouts (lazy mode) */ - tv = qmem_max_wait(&userid, &answer_now); - - if (tv.tv_sec == 0 && tv.tv_usec == 0) { - /* We need to respond to an old query immediately; do so now. */ - send_data_or_ping(tun_fd, dns_fds, userid, answer_now, 0, 1, 0); - } + /* max wait time based on pending queries */ + tv = qmem_max_wait(dns_fds, &userid, &answer_now); FD_ZERO(&fds); maxfd = 0; @@ -762,8 +817,7 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time) maxfd = MAX(bind_fd, maxfd); } - /* Don't read from tun if no users can accept data anyway; - tun queue/TCP buffers are larger than our outgoing queues */ + /* Don't read from tun if all users have filled outpacket queues */ if(!all_users_waiting_to_send()) { FD_SET(tun_fd, &fds); maxfd = MAX(tun_fd, maxfd); @@ -839,16 +893,11 @@ handle_full_packet(int tun_fd, struct dnsfd *dns_fds, int userid, uint8_t *data, /* send the uncompressed packet to tun device */ write_tun(tun_fd, rawdata, rawlen); } else { - /* use compressed or uncompressed packet to match user settings */ - if (users[touser].down_compression) { - if (!compressed) { - len = sizeof(out); - compress2(out, &len, rawdata, rawlen, 9); - data = out; - } - user_send_data(touser, tun_fd, dns_fds, data, len, 1); + /* don't re-compress if possible */ + if (users[touser].down_compression && compressed) { + user_send_data(touser, dns_fds, data, len, 1); } else { - user_send_data(touser, tun_fd, dns_fds, rawdata, rawlen, 0); + user_send_data(touser, dns_fds, rawdata, rawlen, 0); } } } else { @@ -936,7 +985,6 @@ raw_decode(uint8_t *packet, size_t len, struct query *q, int dns_fd, struct dnsf { int raw_user; - warnx("raw_decode len %lu", len); /* minimum length */ if (len < RAW_HDR_LEN) return 0; /* should start with header */ @@ -1196,14 +1244,14 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query } if(in[0] == 'V' || in[0] == 'v') { /* Version request */ - uint32_t version; + uint32_t version = !PROTOCOL_VERSION; read = unpack_data(unpacked, sizeof(unpacked), in + 1, domain_len - 1, b32); /* Version greeting, compare and send ack/nak */ if (read > 4) { /* Received V + 32bits version (network byte order) */ version = ntohl(*(uint32_t *) unpacked); - } + } /* if invalid pkt, just send VNAK */ if (version == PROTOCOL_VERSION) { userid = find_available_user(); @@ -1236,6 +1284,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query window_buffer_clear(u->outgoing); window_buffer_clear(u->incoming); u->next_upstream_ack = -1; + u->send_ping_next = 0; #ifdef QMEM_LEN qmem_init(userid); #endif @@ -1390,7 +1439,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query } else if(in[0] == 'O' || in[0] == 'o') { /* Protocol options */ int bits = 0; int numopts; - char num[2], *opts; + char *opts; int tmp_lazy, tmp_downenc, tmp_comp; if (domain_len < 7) { /* len at least 7, example: "oa1tcmc" */ @@ -1405,12 +1454,11 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query return; /* illegal id */ } - num[0] = in[2]; - num[1] = 0; - numopts = atoi(num); + numopts = in[2] - '0'; - if (domain_len != numopts + 6 || numopts == 0) { + if (domain_len < numopts + 6 || numopts == 0 || numopts > 9) { write_dns(dns_fd, q, "BADLEN", 6, 'T'); + return; /* invalid packet */ } /* Temporary variables: don't change anything until all options parsed */ @@ -1605,12 +1653,10 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query unsigned timeout_ms; struct timeval timeout; - /* We can't handle id=0, that's "no packet" to us. So drop + /* We can't handle id=0, that's "no packet" to the dnscache. So drop request completely. Note that DNS servers rewrite the id. We'll drop 1 in 64k times. If DNS server retransmits with different id, then all okay. - Else client won't retransmit, and we'll just keep the - previous ping in cache, no problem either. TODO don't use ID=0 to check if query */ if (q->id == 0) return; @@ -1646,8 +1692,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query dn_seq = unpacked[5]; timeout_ms = ntohs(*(uint16_t *) (unpacked + 6)); - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms - timeout.tv_sec * 1000) * 1000; + timeout = ms_to_timeval(timeout_ms); respond = unpacked[8] & 1; @@ -1661,10 +1706,11 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query userid, dn_seq, dn_winsize, up_seq, up_winsize, dn_ack, timeout_ms, unpacked[8]); } - window_ack(users[userid].outgoing, dn_ack); + user_process_incoming_data(tun_fd, dns_fds, userid, dn_ack); - /* Send resonse; q can be left in qmem if no data (q is still fresh) */ - send_data_or_ping(tun_fd, dns_fds, userid, q, respond, 0, 1); + /* Leave query in qmem, response is done in qmem_max_wait. + * Set the ping flag if it needs to respond */ + users[userid].send_ping_next = respond; } else if (isxdigit(in[0])) { /* Upstream data packet */ int code = 0; @@ -1729,17 +1775,17 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query warnx("frag seq %3u, datalen %5lu, ACK %3d, compression %1d, s%1d e%1d", f.seqID, f.len, f.ack_other, f.compressed, f.start, f.end); - /* if waiting for an ACK to be sent back upstream (on incoming buffer) */ + /* if already waiting for an ACK to be sent back upstream (on incoming buffer) */ if (users[userid].next_upstream_ack >= 0) { /* Shouldn't normally happen; will always be reset after sending a packet. */ warnx("[WARNING] next_upstream_ack == %d for user %d.", users[userid].next_upstream_ack, userid); } users[userid].next_upstream_ack = window_process_incoming_fragment(users[userid].incoming, &f); - window_ack(users[userid].outgoing, f.ack_other); + user_process_incoming_data(tun_fd, dns_fds, userid, f.ack_other); - /* Respond/ACK data packet immediately; query is fresh */ - send_data_or_ping(tun_fd, dns_fds, userid, q, 0, 1, 1); + /* Nothing to do. ACK (and response to this query) is sent + * later in qmem_max_wait. */ } } @@ -1765,7 +1811,7 @@ handle_ns_request(int dns_fd, struct query *q) } if (debug >= 2) { - fprintf(stderr, "TX: client %s ID %5d, type %d, name %s, %d bytes NS reply\n", + fprintf(stderr, "TX: NS reply client %s ID %5d, type %d, name %s, %d bytes\n", format_addr(&q->from, q->fromlen), q->id, q->type, q->name, len); } if (sendto(dns_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) { @@ -1799,7 +1845,7 @@ handle_a_request(int dns_fd, struct query *q, int fakeip) } if (debug >= 2) { - fprintf(stderr, "TX: client %s ID %5d, type %d, name %s, %d bytes A reply\n", + fprintf(stderr, "TX: A reply client %s ID %5d, type %d, name %s, %d bytes\n", format_addr(&q->from, q->fromlen), q->id, q->type, q->name, len); } if (sendto(dns_fd, buf, len, 0, (struct sockaddr*)&q->from, q->fromlen) <= 0) { diff --git a/src/server.h b/src/server.h index 193df54..a539eaf 100644 --- a/src/server.h +++ b/src/server.h @@ -124,4 +124,6 @@ void handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct q void handle_ns_request(int dns_fd, struct query *q); void handle_a_request(int dns_fd, struct query *q, int fakeip); +void send_data_or_ping(struct dnsfd *, int, struct query *, int, int, int); + #endif /* __SERVER_H__ */