From d8c08191ccb2e72141036930e3e3a66fa18948c7 Mon Sep 17 00:00:00 2001 From: frekky Date: Fri, 21 Aug 2015 23:23:24 +0800 Subject: [PATCH] Server-side sliding window implementation mostly finished. Requires testing. --- src/iodined.c | 2 + src/osflags | 0 src/server.c | 228 +++++++++++++++++++++++++++++--------------------- 3 files changed, 134 insertions(+), 96 deletions(-) mode change 100755 => 100644 src/osflags diff --git a/src/iodined.c b/src/iodined.c index d0f3f7e..5e7edca 100644 --- a/src/iodined.c +++ b/src/iodined.c @@ -294,6 +294,8 @@ main(int argc, char **argv) retval = 0; + server_init(); + #ifdef WINDOWS32 WSAStartup(req_version, &wsa_data); #endif diff --git a/src/osflags b/src/osflags old mode 100755 new mode 100644 diff --git a/src/server.c b/src/server.c index 39be412..724917d 100644 --- a/src/server.c +++ b/src/server.c @@ -44,6 +44,7 @@ #include "tun.h" #include "fw_query.h" #include "server.h" +#include "window.h" #ifdef HAVE_SYSTEMD # include @@ -119,15 +120,6 @@ send_raw(int fd, uint8_t *buf, size_t buflen, int user, int cmd, struct query *q sendto(fd, packet, len, 0, (struct sockaddr *) &q->from, q->fromlen); } - -static void -start_new_outpacket(int userid, uint8_t *data, size_t datalen) -/* Copies data to .outpacket and resets all counters. - data is expected to be compressed already. */ -{ - // TODO: window add to outgoing data -} - int answer_from_qmem(int dns_fd, struct query *q, unsigned char *qmem_cmc, unsigned short *qmem_type, int qmem_len, @@ -378,10 +370,41 @@ forward_query(int bind_fd, struct query *q) } } +void +send_ping_response(int dns_fd, int userid, struct query *q) +{ + static uint8_t pkt[10]; + size_t datalen = 5; + /* Build downstream data + ping header (see doc/proto_xxxxxxxx.txt) for details */ + pkt[0] = users[userid].outgoing->start_seq_id & 0xFF; + pkt[1] = users[userid].incoming->start_seq_id & 0xFF; + pkt[2] = (1 << 5); /* ping flag */ + pkt[3] = users[userid].outgoing->windowsize & 0xFF; + pkt[4] = users[userid].incoming->windowsize & 0xFF; + + write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc); + + if (q->id2 != 0) { /* rotate pending duplicate queries */ + q->id = q->id2; + q->fromlen = q->fromlen2; + memcpy(&(q->from), &(q->from2), q->fromlen2); + if (debug >= 1) + warnx("OUT again to last duplicate"); + write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc); + } + + save_to_qmem_pingordata(userid, q); + +#ifdef DNSCACHE_LEN + save_to_dnscache(userid, q, (char *)pkt, datalen + 2); +#endif + + q->id = 0; /* this query is used */ +} + static int send_frag_or_dataless(int dns_fd, int userid, struct query *q) -/* Sends current fragment to user, or dataless packet if there is no - current fragment available (-> normal "quiet" ping reply). +/* Sends current fragment to user, or a ping if no data available. Does not update anything, except: - discards q always (query is used) - forgets entire users[userid].outpacket if it was sent in one go, @@ -390,43 +413,60 @@ send_frag_or_dataless(int dns_fd, int userid, struct query *q) 0 = don't call us again for now. */ { - char pkt[4096]; - int datalen = 0; + static uint8_t pkt[MAX_FRAGSIZE + DOWNSTREAM_HDR]; + int ping = 0; + size_t datalen; + fragment *f; + struct frag_buffer *out; - /* TODO: If re-sent too many times, drop entire packet */ + out = users[userid].outgoing; - /* Build downstream data header (see doc/proto_xxxxxxxx.txt) */ + f = window_get_next_sending_fragment(out, users[userid].next_upstream_ack); + if (!f && user_sending(userid)) { + /* Need to tell client to sync stuff - send data header with ping stuff */ + ping = 1; + } /* TODO: If re-sent too many times, drop stuff */ -// /* First byte is 1 bit compression flag, 3 bits upstream seqno, 4 bits upstream fragment */ -// pkt[0] = (1<<7) | ((users[userid].inpacket.seqno & 7) << 4) | -// (users[userid].inpacket.fragment & 15); -// /* Second byte is 3 bits downstream seqno, 4 bits downstream fragment, 1 bit last flag */ -// pkt[1] = ((users[userid].outpacket.seqno & 7) << 5) | -// ((users[userid].outpacket.fragment & 15) << 1) | (last & 1); + /* Build downstream data header (see doc/proto_xxxxxxxx.txt) for details */ + pkt[0] = f->seqID & 0xFF; + pkt[1] = f->ack_other & 0xFF; + pkt[2] = ((ping & 1) << 5) | ((f->compressed & 1) << 4) | ((f->ack_other < 0 ? 0 : 1) << 3) + | (f->is_nack << 2) | (f->start << 1) | f->end; - if (debug >= 1) { - // TODO: display packet data + if (ping) { + pkt[3] = out->windowsize & 0xFF; + pkt[4] = users[userid].incoming->windowsize & 0xFF; + datalen = 5; + } else { + datalen = DOWNSTREAM_HDR + f->len; + if (datalen > sizeof(pkt)) { + warnx("send_frag_or_dataless: fragment too large to send!"); + return 0; + } + memcpy(&pkt, f->data, f->len); } - write_dns(dns_fd, q, pkt, datalen + 2, users[userid].downenc); + write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc); - if (q->id2 != 0) { + if (q->id2 != 0) { /* reply to any duplicates */ q->id = q->id2; q->fromlen = q->fromlen2; memcpy(&(q->from), &(q->from2), q->fromlen2); if (debug >= 1) warnx("OUT again to last duplicate"); - write_dns(dns_fd, q, pkt, datalen + 2, users[userid].downenc); + write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc); } save_to_qmem_pingordata(userid, q); #ifdef DNSCACHE_LEN - save_to_dnscache(userid, q, pkt, datalen + 2); + save_to_dnscache(userid, q, (char *)pkt, datalen + 2); #endif + /* this query has been */ + q->id = 0; + window_tick(out); - q->id = 0; /* this query is used */ - - return 0; /* don't call us again */ + /* call again if we have more things to send */ + return users[userid].outgoing->numitems > 0; } static void @@ -446,6 +486,7 @@ send_version_response(int fd, version_ack_t ack, uint32_t payload, int userid, s break; } + // TODO: use htonl for compatibility with big-endian systems out[4] = ((payload >> 24) & 0xff); out[5] = ((payload >> 16) & 0xff); out[6] = ((payload >> 8) & 0xff); @@ -526,7 +567,7 @@ tunnel_tun(int tun_fd, struct dnsfd *dns_fds) if (users[userid].conn == CONN_DNS_NULL) { - start_new_outpacket(userid, out, outlen); + window_add_outgoing_data(users[userid].outgoing, out, outlen, 1); /* Start sending immediately if query is waiting */ if (users[userid].q_sendrealsoon.id != 0) { @@ -633,15 +674,14 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time) tv.tv_usec = 0; /* Adjust timeout if there is anything to send realsoon. - Clients won't be sending new data until we send our ack, + Clients won't be sending new data until we send our ack, TODO: adjust stuff in this function so don't keep them waiting long. This only triggers at final upstream fragments, which is about once per eight requests during heavy upstream traffic. - 20msec: ~8 packs every 1/50sec = ~400 DNSreq/sec, + 20msec: ~8 packets every 1/50sec = ~400 DNSreq/sec, or ~1200bytes every 1/50sec = ~0.5 Mbit/sec upstream */ for (userid = 0; userid < created_users; userid++) { - if (users[userid].active && !users[userid].disabled && - users[userid].last_pkt + 60 > time(NULL)) { + if (user_active(userid)) { users[userid].q_sendrealsoon_new = 0; if (users[userid].q_sendrealsoon.id != 0) { tv.tv_sec = 0; @@ -683,7 +723,7 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time) return 1; } - if (i==0) { + if (i == 0) { if (max_idle_time) { /* only trigger the check if that's worth ( ie, no need to loop over if there is something to send */ @@ -714,11 +754,8 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time) /* Send realsoon's if tun or dns didn't already */ for (userid = 0; userid < created_users; userid++) - if (users[userid].active && !users[userid].disabled && - users[userid].last_pkt + 60 > time(NULL) && - users[userid].q_sendrealsoon.id != 0 && - users[userid].conn == CONN_DNS_NULL && - !users[userid].q_sendrealsoon_new) { + if (user_active(userid) && users[userid].q_sendrealsoon.id != 0 && + users[userid].conn == CONN_DNS_NULL && !users[userid].q_sendrealsoon_new) { int dns_fd = get_dns_fd(dns_fds, &users[userid].q_sendrealsoon.from); send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); } @@ -749,7 +786,7 @@ handle_full_packet(int tun_fd, struct dnsfd *dns_fds, int userid, uint8_t *data, /* send the compressed(!) packet to other client */ if (users[touser].conn == CONN_DNS_NULL) { if (window_buffer_available(users[touser].outgoing) * users[touser].outgoing->maxfraglen >= len) { - start_new_outpacket(touser, data, len); + window_add_outgoing_data(users[touser].outgoing, data, len, 1); /* Start sending immediately if query is waiting */ if (users[touser].q_sendrealsoon.id != 0) { @@ -781,9 +818,7 @@ handle_raw_login(uint8_t *packet, size_t len, struct query *q, int fd, int useri /* can't use check_authenticated_user_and_ip() since IP address will be different, so duplicate here except IP address */ if (userid < 0 || userid >= created_users) return; - if (!users[userid].active || users[userid].disabled) return; - if (!users[userid].authenticated) return; - if (users[userid].last_pkt + 60 < time(NULL)) return; + if (!check_authenticated_user_and_ip(userid, q)) return; if (debug >= 1) { fprintf(stderr, "IN login raw, len %lu, from user %d\n", len, userid); @@ -1128,10 +1163,10 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query char in[512]; char logindata[16]; char out[64*1024]; - char unpacked[64*1024]; + static char unpacked[64*1024]; char *tmp[2]; int userid; - int read; + size_t read; userid = -1; @@ -1181,6 +1216,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query // TODO: client specified window size users[userid].incoming = window_buffer_init(128, 10, MAX_FRAGSIZE, WINDOW_RECVING); users[userid].outgoing = window_buffer_init(16, 10, users[userid].fragsize, WINDOW_SENDING); + users[userid].next_upstream_ack = -1; #ifdef DNSCACHE_LEN { for (i = 0; i < DNSCACHE_LEN; i++) { @@ -1517,9 +1553,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query } return; } else if(in[0] == 'P' || in[0] == 'p') { - int dn_seq; - int dn_frag; + int dn_seq, up_seq, dn_wins, up_wins; int didsend = 0; + int respond; /* We can't handle id=0, that's "no packet" to us. So drop request completely. Note that DNS servers rewrite the id. @@ -1531,7 +1567,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query return; read = unpack_data(unpacked, sizeof(unpacked), &(in[1]), domain_len - 1, b32); - if (read < 4) + if (read < 7) return; /* Ping packet, store userid */ @@ -1591,22 +1627,22 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query return; } - // TODO new ping header - dn_seq = unpacked[1] >> 4; - dn_frag = unpacked[1] & 15; + dn_seq = unpacked[1]; + up_seq = unpacked[2]; + up_wins = unpacked[3]; + dn_wins = unpacked[4]; + respond = unpacked[5] & 1; if (debug >= 1) { - fprintf(stderr, "PING pkt from user %d, ack for downstream %d/%d\n", - userid, dn_seq, dn_frag); + fprintf(stderr, "PING pkt from user %d, down %d/%d, up %d/%d\n", userid, dn_seq, dn_wins, up_seq, up_wins); } - // TODO: process incoming ACK for downstream data - /* If there is a query that must be returned real soon, do it. May contain new downstream data if the ping had a new ack. Otherwise, may also be re-sending old data. */ if (users[userid].q_sendrealsoon.id != 0) { - send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); + if (respond) send_ping_response(dns_fd, userid, &users[userid].q_sendrealsoon); + else send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon); } /* We need to store a new query, so if there still is an @@ -1616,9 +1652,12 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query (This is duplicate data if we had q_sendrealsoon above.) */ if (users[userid].q.id != 0) { didsend = 1; - if (send_frag_or_dataless(dns_fd, userid, &users[userid].q) == 1) - /* new packet from queue, send immediately */ - didsend = 0; + if (respond) + send_ping_response(dns_fd, userid, &users[userid].q); + else + if (send_frag_or_dataless(dns_fd, userid, &users[userid].q) == 1) + /* new packet from queue, send immediately */ + didsend = 0; } /* Save new query and time info */ @@ -1634,9 +1673,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query } else if((in[0] >= '0' && in[0] <= '9') /* Data packet */ || (in[0] >= 'a' && in[0] <= 'f') || (in[0] >= 'A' && in[0] <= 'F')) { - int upstream_ok = 1; int didsend = 0; int code = -1; + static fragment f; /* Need 6 char header + >=1 char data */ if (domain_len < 7) @@ -1705,42 +1744,43 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query } users[userid].q_sendrealsoon.id2 = q->id; users[userid].q_sendrealsoon.fromlen2 = q->fromlen; - memcpy(&(users[userid].q_sendrealsoon.from2), - &(q->from), q->fromlen); + memcpy(&(users[userid].q_sendrealsoon.from2), &(q->from), q->fromlen); return; } - /* Decode data header */ -// up_seq = (b32_8to5(in[1]) >> 2) & 7; -// up_frag = ((b32_8to5(in[1]) & 3) << 2) | ((b32_8to5(in[2]) >> 3) & 3); -// dn_seq = (b32_8to5(in[2]) & 7); -// dn_frag = b32_8to5(in[3]) >> 1; -// lastfrag = b32_8to5(in[3]) & 1; TODO: new data header + /* Decode upstream data header - see docs/proto_XXXXXXXX.txt */ + /* First byte (after userid) = CMC (ignored?) */ + f.seqID = (b32_8to5(in[2]) << 2) | (b32_8to5(in[3]) >> 2); + f.ack_other = (b32_8to5(in[5]) & 8) ? ((b32_8to5(in[3]) & 3) << 6) + | (b32_8to5(in[4]) << 1) | (b32_8to5(in[5]) & 1) : -1; + f.is_nack = (b32_8to5(in[5]) >> 2) & 1; + f.start = (b32_8to5(in[5]) >> 1) & 1; + f.end = b32_8to5(in[5]) & 1; - // TODO: handle following scenarios - /* Got repeated old packet _with data_, probably - because client didn't receive our ack. So re-send - our ack(+data) immediately to keep things flowing - fast. - If it's a _really_ old frag, it's a nameserver - that tries again, and sending our current (non- - matching) fragno won't be a problem. */ + /* Decode remainder of data with user encoding */ + read = unpack_data(unpacked, sizeof(unpacked), in + UPSTREAM_HDR, + domain_len - UPSTREAM_HDR, users[userid].encoder); - /* Duplicate of recent upstream data packet; probably - need to answer this to keep DNS server happy */ -// upstream_ok = 0; + f.len = MIN(read, MAX_FRAGSIZE); + memcpy(f.data, unpacked, read); - /* Really new packet has arrived, no recent duplicate */ - /* Forget any old packet, even if incomplete */ + window_process_incoming_fragment(users[userid].incoming, &f); - if (upstream_ok) { - /* decode with this user's encoding */ - read = unpack_data(unpacked, sizeof(unpacked), &(in[UPSTREAM_HDR]), - domain_len - 5, users[userid].encoder); - // TODO append fragment to window + window_ack(users[userid].outgoing, f.ack_other); + + /* if waiting for an ACK to be sent back upstream (on incoming buffer) */ + if (users[userid].next_upstream_ack < 0) { + users[userid].next_upstream_ack = window_get_next_ack(users[userid].incoming); } - // TODO reassemble data + + read = window_reassemble_data(users[userid].incoming, (uint8_t *)unpacked, sizeof(unpacked), NULL); + + if (read > 0) { /* Data reassembled successfully + cleared out of buffer */ + handle_full_packet(tun_fd, dns_fds, userid, (uint8_t *)unpacked, read); + } + + window_tick(users[userid].incoming); /* If there is a query that must be returned real soon, do it. Includes an ack of the just received upstream fragment, @@ -1801,13 +1841,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query if (!didsend) // TODO: also check if sending send_frag_or_dataless(dns_fd, userid, &users[userid].q); else if (!didsend || !users[userid].lazy) { - if (upstream_ok) { /* rotate pending queries */ - memcpy(&(users[userid].q_sendrealsoon), &(users[userid].q), sizeof(struct query)); - users[userid].q_sendrealsoon_new = 1; - users[userid].q.id = 0; /* used */ - } else { - send_frag_or_dataless(dns_fd, userid, &users[userid].q); - } + memcpy(&(users[userid].q_sendrealsoon), &(users[userid].q), sizeof(struct query)); + users[userid].q_sendrealsoon_new = 1; + users[userid].q.id = 0; /* used */ } } }