Server-side sliding window implementation mostly finished. Requires

testing.
This commit is contained in:
frekky 2015-08-21 23:23:24 +08:00
parent 844abefcf8
commit d8c08191cc
3 changed files with 134 additions and 96 deletions

View File

@ -294,6 +294,8 @@ main(int argc, char **argv)
retval = 0;
server_init();
#ifdef WINDOWS32
WSAStartup(req_version, &wsa_data);
#endif

0
src/osflags Executable file → Normal file
View File

View File

@ -44,6 +44,7 @@
#include "tun.h"
#include "fw_query.h"
#include "server.h"
#include "window.h"
#ifdef HAVE_SYSTEMD
# include <systemd/sd-daemon.h>
@ -119,15 +120,6 @@ send_raw(int fd, uint8_t *buf, size_t buflen, int user, int cmd, struct query *q
sendto(fd, packet, len, 0, (struct sockaddr *) &q->from, q->fromlen);
}
static void
start_new_outpacket(int userid, uint8_t *data, size_t datalen)
/* Copies data to .outpacket and resets all counters.
data is expected to be compressed already. */
{
// TODO: window add to outgoing data
}
int
answer_from_qmem(int dns_fd, struct query *q, unsigned char *qmem_cmc,
unsigned short *qmem_type, int qmem_len,
@ -378,10 +370,41 @@ forward_query(int bind_fd, struct query *q)
}
}
void
send_ping_response(int dns_fd, int userid, struct query *q)
{
static uint8_t pkt[10];
size_t datalen = 5;
/* Build downstream data + ping header (see doc/proto_xxxxxxxx.txt) for details */
pkt[0] = users[userid].outgoing->start_seq_id & 0xFF;
pkt[1] = users[userid].incoming->start_seq_id & 0xFF;
pkt[2] = (1 << 5); /* ping flag */
pkt[3] = users[userid].outgoing->windowsize & 0xFF;
pkt[4] = users[userid].incoming->windowsize & 0xFF;
write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc);
if (q->id2 != 0) { /* rotate pending duplicate queries */
q->id = q->id2;
q->fromlen = q->fromlen2;
memcpy(&(q->from), &(q->from2), q->fromlen2);
if (debug >= 1)
warnx("OUT again to last duplicate");
write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc);
}
save_to_qmem_pingordata(userid, q);
#ifdef DNSCACHE_LEN
save_to_dnscache(userid, q, (char *)pkt, datalen + 2);
#endif
q->id = 0; /* this query is used */
}
static int
send_frag_or_dataless(int dns_fd, int userid, struct query *q)
/* Sends current fragment to user, or dataless packet if there is no
current fragment available (-> normal "quiet" ping reply).
/* Sends current fragment to user, or a ping if no data available.
Does not update anything, except:
- discards q always (query is used)
- forgets entire users[userid].outpacket if it was sent in one go,
@ -390,43 +413,60 @@ send_frag_or_dataless(int dns_fd, int userid, struct query *q)
0 = don't call us again for now.
*/
{
char pkt[4096];
int datalen = 0;
static uint8_t pkt[MAX_FRAGSIZE + DOWNSTREAM_HDR];
int ping = 0;
size_t datalen;
fragment *f;
struct frag_buffer *out;
/* TODO: If re-sent too many times, drop entire packet */
out = users[userid].outgoing;
/* Build downstream data header (see doc/proto_xxxxxxxx.txt) */
f = window_get_next_sending_fragment(out, users[userid].next_upstream_ack);
if (!f && user_sending(userid)) {
/* Need to tell client to sync stuff - send data header with ping stuff */
ping = 1;
} /* TODO: If re-sent too many times, drop stuff */
// /* First byte is 1 bit compression flag, 3 bits upstream seqno, 4 bits upstream fragment */
// pkt[0] = (1<<7) | ((users[userid].inpacket.seqno & 7) << 4) |
// (users[userid].inpacket.fragment & 15);
// /* Second byte is 3 bits downstream seqno, 4 bits downstream fragment, 1 bit last flag */
// pkt[1] = ((users[userid].outpacket.seqno & 7) << 5) |
// ((users[userid].outpacket.fragment & 15) << 1) | (last & 1);
/* Build downstream data header (see doc/proto_xxxxxxxx.txt) for details */
pkt[0] = f->seqID & 0xFF;
pkt[1] = f->ack_other & 0xFF;
pkt[2] = ((ping & 1) << 5) | ((f->compressed & 1) << 4) | ((f->ack_other < 0 ? 0 : 1) << 3)
| (f->is_nack << 2) | (f->start << 1) | f->end;
if (debug >= 1) {
// TODO: display packet data
if (ping) {
pkt[3] = out->windowsize & 0xFF;
pkt[4] = users[userid].incoming->windowsize & 0xFF;
datalen = 5;
} else {
datalen = DOWNSTREAM_HDR + f->len;
if (datalen > sizeof(pkt)) {
warnx("send_frag_or_dataless: fragment too large to send!");
return 0;
}
write_dns(dns_fd, q, pkt, datalen + 2, users[userid].downenc);
memcpy(&pkt, f->data, f->len);
}
write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc);
if (q->id2 != 0) {
if (q->id2 != 0) { /* reply to any duplicates */
q->id = q->id2;
q->fromlen = q->fromlen2;
memcpy(&(q->from), &(q->from2), q->fromlen2);
if (debug >= 1)
warnx("OUT again to last duplicate");
write_dns(dns_fd, q, pkt, datalen + 2, users[userid].downenc);
write_dns(dns_fd, q, (char *)pkt, datalen, users[userid].downenc);
}
save_to_qmem_pingordata(userid, q);
#ifdef DNSCACHE_LEN
save_to_dnscache(userid, q, pkt, datalen + 2);
save_to_dnscache(userid, q, (char *)pkt, datalen + 2);
#endif
/* this query has been */
q->id = 0;
window_tick(out);
q->id = 0; /* this query is used */
return 0; /* don't call us again */
/* call again if we have more things to send */
return users[userid].outgoing->numitems > 0;
}
static void
@ -446,6 +486,7 @@ send_version_response(int fd, version_ack_t ack, uint32_t payload, int userid, s
break;
}
// TODO: use htonl for compatibility with big-endian systems
out[4] = ((payload >> 24) & 0xff);
out[5] = ((payload >> 16) & 0xff);
out[6] = ((payload >> 8) & 0xff);
@ -526,7 +567,7 @@ tunnel_tun(int tun_fd, struct dnsfd *dns_fds)
if (users[userid].conn == CONN_DNS_NULL) {
start_new_outpacket(userid, out, outlen);
window_add_outgoing_data(users[userid].outgoing, out, outlen, 1);
/* Start sending immediately if query is waiting */
if (users[userid].q_sendrealsoon.id != 0) {
@ -633,15 +674,14 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time)
tv.tv_usec = 0;
/* Adjust timeout if there is anything to send realsoon.
Clients won't be sending new data until we send our ack,
Clients won't be sending new data until we send our ack, TODO: adjust stuff in this function
so don't keep them waiting long. This only triggers at
final upstream fragments, which is about once per eight
requests during heavy upstream traffic.
20msec: ~8 packs every 1/50sec = ~400 DNSreq/sec,
20msec: ~8 packets every 1/50sec = ~400 DNSreq/sec,
or ~1200bytes every 1/50sec = ~0.5 Mbit/sec upstream */
for (userid = 0; userid < created_users; userid++) {
if (users[userid].active && !users[userid].disabled &&
users[userid].last_pkt + 60 > time(NULL)) {
if (user_active(userid)) {
users[userid].q_sendrealsoon_new = 0;
if (users[userid].q_sendrealsoon.id != 0) {
tv.tv_sec = 0;
@ -683,7 +723,7 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time)
return 1;
}
if (i==0) {
if (i == 0) {
if (max_idle_time) {
/* only trigger the check if that's worth ( ie, no need to loop over if there
is something to send */
@ -714,11 +754,8 @@ server_tunnel(int tun_fd, struct dnsfd *dns_fds, int bind_fd, int max_idle_time)
/* Send realsoon's if tun or dns didn't already */
for (userid = 0; userid < created_users; userid++)
if (users[userid].active && !users[userid].disabled &&
users[userid].last_pkt + 60 > time(NULL) &&
users[userid].q_sendrealsoon.id != 0 &&
users[userid].conn == CONN_DNS_NULL &&
!users[userid].q_sendrealsoon_new) {
if (user_active(userid) && users[userid].q_sendrealsoon.id != 0 &&
users[userid].conn == CONN_DNS_NULL && !users[userid].q_sendrealsoon_new) {
int dns_fd = get_dns_fd(dns_fds, &users[userid].q_sendrealsoon.from);
send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon);
}
@ -749,7 +786,7 @@ handle_full_packet(int tun_fd, struct dnsfd *dns_fds, int userid, uint8_t *data,
/* send the compressed(!) packet to other client */
if (users[touser].conn == CONN_DNS_NULL) {
if (window_buffer_available(users[touser].outgoing) * users[touser].outgoing->maxfraglen >= len) {
start_new_outpacket(touser, data, len);
window_add_outgoing_data(users[touser].outgoing, data, len, 1);
/* Start sending immediately if query is waiting */
if (users[touser].q_sendrealsoon.id != 0) {
@ -781,9 +818,7 @@ handle_raw_login(uint8_t *packet, size_t len, struct query *q, int fd, int useri
/* can't use check_authenticated_user_and_ip() since IP address will be different,
so duplicate here except IP address */
if (userid < 0 || userid >= created_users) return;
if (!users[userid].active || users[userid].disabled) return;
if (!users[userid].authenticated) return;
if (users[userid].last_pkt + 60 < time(NULL)) return;
if (!check_authenticated_user_and_ip(userid, q)) return;
if (debug >= 1) {
fprintf(stderr, "IN login raw, len %lu, from user %d\n", len, userid);
@ -1128,10 +1163,10 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
char in[512];
char logindata[16];
char out[64*1024];
char unpacked[64*1024];
static char unpacked[64*1024];
char *tmp[2];
int userid;
int read;
size_t read;
userid = -1;
@ -1181,6 +1216,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
// TODO: client specified window size
users[userid].incoming = window_buffer_init(128, 10, MAX_FRAGSIZE, WINDOW_RECVING);
users[userid].outgoing = window_buffer_init(16, 10, users[userid].fragsize, WINDOW_SENDING);
users[userid].next_upstream_ack = -1;
#ifdef DNSCACHE_LEN
{
for (i = 0; i < DNSCACHE_LEN; i++) {
@ -1517,9 +1553,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
}
return;
} else if(in[0] == 'P' || in[0] == 'p') {
int dn_seq;
int dn_frag;
int dn_seq, up_seq, dn_wins, up_wins;
int didsend = 0;
int respond;
/* We can't handle id=0, that's "no packet" to us. So drop
request completely. Note that DNS servers rewrite the id.
@ -1531,7 +1567,7 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
return;
read = unpack_data(unpacked, sizeof(unpacked), &(in[1]), domain_len - 1, b32);
if (read < 4)
if (read < 7)
return;
/* Ping packet, store userid */
@ -1591,22 +1627,22 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
return;
}
// TODO new ping header
dn_seq = unpacked[1] >> 4;
dn_frag = unpacked[1] & 15;
dn_seq = unpacked[1];
up_seq = unpacked[2];
up_wins = unpacked[3];
dn_wins = unpacked[4];
respond = unpacked[5] & 1;
if (debug >= 1) {
fprintf(stderr, "PING pkt from user %d, ack for downstream %d/%d\n",
userid, dn_seq, dn_frag);
fprintf(stderr, "PING pkt from user %d, down %d/%d, up %d/%d\n", userid, dn_seq, dn_wins, up_seq, up_wins);
}
// TODO: process incoming ACK for downstream data
/* If there is a query that must be returned real soon, do it.
May contain new downstream data if the ping had a new ack.
Otherwise, may also be re-sending old data. */
if (users[userid].q_sendrealsoon.id != 0) {
send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon);
if (respond) send_ping_response(dns_fd, userid, &users[userid].q_sendrealsoon);
else send_frag_or_dataless(dns_fd, userid, &users[userid].q_sendrealsoon);
}
/* We need to store a new query, so if there still is an
@ -1616,6 +1652,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
(This is duplicate data if we had q_sendrealsoon above.) */
if (users[userid].q.id != 0) {
didsend = 1;
if (respond)
send_ping_response(dns_fd, userid, &users[userid].q);
else
if (send_frag_or_dataless(dns_fd, userid, &users[userid].q) == 1)
/* new packet from queue, send immediately */
didsend = 0;
@ -1634,9 +1673,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
} else if((in[0] >= '0' && in[0] <= '9') /* Data packet */
|| (in[0] >= 'a' && in[0] <= 'f')
|| (in[0] >= 'A' && in[0] <= 'F')) {
int upstream_ok = 1;
int didsend = 0;
int code = -1;
static fragment f;
/* Need 6 char header + >=1 char data */
if (domain_len < 7)
@ -1705,42 +1744,43 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
}
users[userid].q_sendrealsoon.id2 = q->id;
users[userid].q_sendrealsoon.fromlen2 = q->fromlen;
memcpy(&(users[userid].q_sendrealsoon.from2),
&(q->from), q->fromlen);
memcpy(&(users[userid].q_sendrealsoon.from2), &(q->from), q->fromlen);
return;
}
/* Decode data header */
// up_seq = (b32_8to5(in[1]) >> 2) & 7;
// up_frag = ((b32_8to5(in[1]) & 3) << 2) | ((b32_8to5(in[2]) >> 3) & 3);
// dn_seq = (b32_8to5(in[2]) & 7);
// dn_frag = b32_8to5(in[3]) >> 1;
// lastfrag = b32_8to5(in[3]) & 1; TODO: new data header
/* Decode upstream data header - see docs/proto_XXXXXXXX.txt */
/* First byte (after userid) = CMC (ignored?) */
f.seqID = (b32_8to5(in[2]) << 2) | (b32_8to5(in[3]) >> 2);
f.ack_other = (b32_8to5(in[5]) & 8) ? ((b32_8to5(in[3]) & 3) << 6)
| (b32_8to5(in[4]) << 1) | (b32_8to5(in[5]) & 1) : -1;
f.is_nack = (b32_8to5(in[5]) >> 2) & 1;
f.start = (b32_8to5(in[5]) >> 1) & 1;
f.end = b32_8to5(in[5]) & 1;
// TODO: handle following scenarios
/* Got repeated old packet _with data_, probably
because client didn't receive our ack. So re-send
our ack(+data) immediately to keep things flowing
fast.
If it's a _really_ old frag, it's a nameserver
that tries again, and sending our current (non-
matching) fragno won't be a problem. */
/* Decode remainder of data with user encoding */
read = unpack_data(unpacked, sizeof(unpacked), in + UPSTREAM_HDR,
domain_len - UPSTREAM_HDR, users[userid].encoder);
/* Duplicate of recent upstream data packet; probably
need to answer this to keep DNS server happy */
// upstream_ok = 0;
f.len = MIN(read, MAX_FRAGSIZE);
memcpy(f.data, unpacked, read);
/* Really new packet has arrived, no recent duplicate */
/* Forget any old packet, even if incomplete */
window_process_incoming_fragment(users[userid].incoming, &f);
if (upstream_ok) {
/* decode with this user's encoding */
read = unpack_data(unpacked, sizeof(unpacked), &(in[UPSTREAM_HDR]),
domain_len - 5, users[userid].encoder);
// TODO append fragment to window
window_ack(users[userid].outgoing, f.ack_other);
/* if waiting for an ACK to be sent back upstream (on incoming buffer) */
if (users[userid].next_upstream_ack < 0) {
users[userid].next_upstream_ack = window_get_next_ack(users[userid].incoming);
}
// TODO reassemble data
read = window_reassemble_data(users[userid].incoming, (uint8_t *)unpacked, sizeof(unpacked), NULL);
if (read > 0) { /* Data reassembled successfully + cleared out of buffer */
handle_full_packet(tun_fd, dns_fds, userid, (uint8_t *)unpacked, read);
}
window_tick(users[userid].incoming);
/* If there is a query that must be returned real soon, do it.
Includes an ack of the just received upstream fragment,
@ -1801,13 +1841,9 @@ handle_null_request(int tun_fd, int dns_fd, struct dnsfd *dns_fds, struct query
if (!didsend) // TODO: also check if sending
send_frag_or_dataless(dns_fd, userid, &users[userid].q);
else if (!didsend || !users[userid].lazy) {
if (upstream_ok) { /* rotate pending queries */
memcpy(&(users[userid].q_sendrealsoon), &(users[userid].q), sizeof(struct query));
users[userid].q_sendrealsoon_new = 1;
users[userid].q.id = 0; /* used */
} else {
send_frag_or_dataless(dns_fd, userid, &users[userid].q);
}
}
}
}