From 25f00c1c7a2165223a52ff69af0d77bd7cb1b17c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 20:30:59 -0600 Subject: [PATCH 01/12] disable keys in link protocol traffic, since these are already known, *breaks backwards compat* --- src/yggdrasil/peer.go | 16 ++++++++-------- src/yggdrasil/wire.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 98f9cfb2..d42430a5 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -204,8 +204,8 @@ func (p *peer) sendPacket(packet []byte) { func (p *peer) sendLinkPacket(packet []byte) { bs, nonce := boxSeal(&p.shared, packet, nil) linkPacket := wire_linkProtoTrafficPacket{ - toKey: p.box, - fromKey: p.core.boxPub, + //toKey: p.box, + //fromKey: p.core.boxPub, nonce: *nonce, payload: bs, } @@ -218,12 +218,12 @@ func (p *peer) handleLinkTraffic(bs []byte) { if !packet.decode(bs) { return } - if packet.toKey != p.core.boxPub { - return - } - if packet.fromKey != p.box { - return - } + //if packet.toKey != p.core.boxPub { + // return + //} + //if packet.fromKey != p.box { + // return + //} payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce) if !isOK { return diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 6d574541..f50ec431 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -380,16 +380,16 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { } type wire_linkProtoTrafficPacket struct { - toKey boxPubKey - fromKey boxPubKey + //toKey boxPubKey + //fromKey boxPubKey nonce boxNonce payload []byte } func (p *wire_linkProtoTrafficPacket) encode() []byte { bs := wire_encode_uint64(wire_LinkProtocolTraffic) - bs = append(bs, p.toKey[:]...) - bs = append(bs, p.fromKey[:]...) + //bs = append(bs, p.toKey[:]...) + //bs = append(bs, p.fromKey[:]...) bs = append(bs, p.nonce[:]...) bs = append(bs, p.payload...) return bs @@ -402,10 +402,10 @@ func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_LinkProtocolTraffic: return false - case !wire_chop_slice(p.toKey[:], &bs): - return false - case !wire_chop_slice(p.fromKey[:], &bs): - return false + //case !wire_chop_slice(p.toKey[:], &bs): + // return false + //case !wire_chop_slice(p.fromKey[:], &bs): + // return false case !wire_chop_slice(p.nonce[:], &bs): return false } From 8ba11b86bbadbc8c8f6d5322225ff21f7c2ae892 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 20:44:23 -0600 Subject: [PATCH 02/12] remove duplicate tcp connections --- src/yggdrasil/tcp.go | 48 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a190679f..c5654132 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -23,11 +23,14 @@ type tcpInterface struct { serv *net.TCPListener mutex sync.Mutex // Protecting the below calls map[string]struct{} + conns map[tcpInfo](chan struct{}) } -type tcpKeys struct { - box boxPubKey - sig sigPubKey +type tcpInfo struct { + box boxPubKey + sig sigPubKey + localAddr string // net.IPAddr.String(), not TCPAddr, don't care about port + remoteAddr string } func (iface *tcpInterface) init(core *Core, addr string) { @@ -41,6 +44,7 @@ func (iface *tcpInterface) init(core *Core, addr string) { panic(err) } iface.calls = make(map[string]struct{}) + iface.conns = make(map[tcpInfo](chan struct{})) go iface.listener() } @@ -102,8 +106,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { if n < len(keys) { /*panic("Partial key packet?") ;*/ return } - ks := tcpKeys{} - if !tcp_chop_keys(&ks.box, &ks.sig, &keys) { /*panic("Invalid key packet?") ;*/ + info := tcpInfo{} + if !tcp_chop_keys(&info.box, &info.sig, &keys) { /*panic("Invalid key packet?") ;*/ return } // Quit the parent call if this is a connection to ourself @@ -115,16 +119,40 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } return true } - if equiv(ks.box[:], iface.core.boxPub[:]) { + if equiv(info.box[:], iface.core.boxPub[:]) { return } // testing - if equiv(ks.sig[:], iface.core.sigPub[:]) { + if equiv(info.sig[:], iface.core.sigPub[:]) { return } + // Check if we already have a connection to this node, close and block if yes + local := sock.LocalAddr().(*net.TCPAddr) + laddr := net.IPAddr{ + IP: local.IP, + Zone: local.Zone, + } + info.localAddr = laddr.String() + remote := sock.RemoteAddr().(*net.TCPAddr) + raddr := net.IPAddr{ + IP: remote.IP, + Zone: remote.Zone, + } + info.remoteAddr = raddr.String() + iface.mutex.Lock() + if blockChan, isIn := iface.conns[info]; isIn { + iface.mutex.Unlock() + sock.Close() + <-blockChan + return + } + blockChan := make(chan struct{}) + iface.conns[info] = blockChan + iface.mutex.Unlock() + defer close(blockChan) // Note that multiple connections to the same node are allowed // E.g. over different interfaces linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&ks.box, &ks.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) in := func(bs []byte) { p.handlePacket(bs, linkIn) } @@ -197,8 +225,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { p.core.peers.mutex.Unlock() close(linkIn) }() - them := sock.RemoteAddr() - themNodeID := getNodeID(&ks.box) + them := sock.RemoteAddr().(*net.TCPAddr) + themNodeID := getNodeID(&info.box) themAddr := address_addrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) From 430d49d8a46665dc76e26c2740b6e6fe22190c2a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 21:59:08 -0600 Subject: [PATCH 03/12] send fewer link announcements, fix a ttl encoding bug, and remove unnecessary key field from the wire format for dht req/res --- src/yggdrasil/peer.go | 32 +++++++++++++++++++++++++++----- src/yggdrasil/router.go | 14 ++++++++------ src/yggdrasil/wire.go | 21 +++++++++++---------- 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d42430a5..37b87c03 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -73,6 +73,7 @@ type peer struct { // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick throttle uint8 + lastSend time.Time // To throttle sends, use only from linkLoop goroutine } const peer_Throttle = 1 @@ -126,6 +127,8 @@ func (ps *peers) newPeer(box *boxPubKey, func (p *peer) linkLoop(in <-chan []byte) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + p.lastSend = time.Now() + var lastRSeq uint64 for { select { case packet, ok := <-in: @@ -139,9 +142,25 @@ func (p *peer) linkLoop(in <-chan []byte) { if p.port == 0 { continue } // Don't send announces on selfInterface - // Maybe we shouldn't time out, and instead wait for a kill signal? p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - p.sendSwitchAnnounce() + var update bool + switch { + case p.msgAnc == nil: + update = true + case lastRSeq != p.msgAnc.seq: + update = true + case p.msgAnc.rseq != p.myMsg.seq: + update = true + case time.Since(p.lastSend) > 3*time.Second: + update = true + } + if update { + if p.msgAnc != nil { + lastRSeq = p.msgAnc.seq + } + p.lastSend = time.Now() + p.sendSwitchAnnounce() + } } } } @@ -186,11 +205,12 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if to == nil { return } - newTTLLen := wire_uint64_len(newTTL) // This mutates the packet in-place if the length of the TTL changes! + ttlSlice := wire_encode_uint64(newTTL) + newTTLLen := len(ttlSlice) shift := ttlLen - newTTLLen - wire_put_uint64(newTTL, packet[ttlBegin+shift:]) copy(packet[shift:], packet[:pTypeLen]) + copy(packet[ttlBegin+shift:], ttlSlice) packet = packet[shift:] to.sendPacket(packet) } @@ -418,7 +438,9 @@ func (p *peer) sendSwitchAnnounce() { anc.seq = p.myMsg.seq anc.len = uint64(len(p.myMsg.locator.coords)) //anc.Deg = p.myMsg.Degree - //anc.RSeq = p.myMsg.RSeq + if p.msgAnc != nil { + anc.rseq = p.msgAnc.seq + } packet := anc.encode() p.sendLinkPacket(packet) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 8ca5d2b9..8bfa73b8 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -295,9 +295,10 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { if !req.decode(bs) { return } - if req.key != *fromKey { - return - } + //if req.key != *fromKey { + // return + //} + req.key = *fromKey r.core.dht.handleReq(&req) } @@ -306,9 +307,10 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { if !res.decode(bs) { return } - if res.key != *fromKey { - return - } + //if res.key != *fromKey { + // return + //} + res.key = *fromKey r.core.dht.handleRes(&res) } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index f50ec431..c4a6485b 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -129,7 +129,7 @@ type msgAnnounce struct { seq uint64 len uint64 //Deg uint64 - //RSeq uint64 + rseq uint64 } func (m *msgAnnounce) encode() []byte { @@ -139,7 +139,7 @@ func (m *msgAnnounce) encode() []byte { bs = append(bs, wire_encode_uint64(m.seq)...) bs = append(bs, wire_encode_uint64(m.len)...) //bs = append(bs, wire_encode_uint64(m.Deg)...) - //bs = append(bs, wire_encode_uint64(m.RSeq)...) + bs = append(bs, wire_encode_uint64(m.rseq)...) return bs } @@ -159,8 +159,9 @@ func (m *msgAnnounce) decode(bs []byte) bool { return false case !wire_chop_uint64(&m.len, &bs): return false - //case !wire_chop_uint64(&m.Deg, &bs): return false - //case !wire_chop_uint64(&m.RSeq, &bs): return false + //case !wire_chop_uint64(&m.Deg, &bs): return false + case !wire_chop_uint64(&m.rseq, &bs): + return false } m.tstamp = wire_intFromUint(tstamp) return true @@ -467,7 +468,7 @@ func (p *sessionPing) decode(bs []byte) bool { func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupRequest) - bs = append(bs, r.key[:]...) + //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) return bs @@ -480,8 +481,8 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - case !wire_chop_slice(r.key[:], &bs): - return false + //case !wire_chop_slice(r.key[:], &bs): + // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): @@ -494,7 +495,7 @@ func (r *dhtReq) decode(bs []byte) bool { func (r *dhtRes) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupResponse) - bs = append(bs, r.key[:]...) + //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) for _, info := range r.infos { @@ -512,8 +513,8 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - case !wire_chop_slice(r.key[:], &bs): - return false + //case !wire_chop_slice(r.key[:], &bs): + // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): From 0fc74016354106ed985c064db58c2bd52c37a075 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 23:14:23 -0600 Subject: [PATCH 04/12] check root before adding peers to the switch lookupTable, instead of during each lookup --- src/yggdrasil/switch.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 04b86709..e24217de 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -390,6 +390,9 @@ func (t *switchTable) updateTable() { } for _, pinfo := range t.data.peers { //if !pinfo.forward { continue } + if pinfo.locator.root != newTable.self.root { + continue + } loc := pinfo.locator.clone() loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link newTable.elems = append(newTable.elems, tableElem{ @@ -422,9 +425,6 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { // score is in units of bandwidth / distance bestScore := float64(-1) for _, info := range table.elems { - if info.locator.root != table.self.root { - continue - } dist := info.locator.dist(dest) //getDist(info.locator.coords) if !(dist < myDist) { continue From a66a29779a2567d698e12ae46dc0b34a2daf901e Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 23:41:42 -0600 Subject: [PATCH 05/12] Slightly nicer way to throttle peer announcements --- src/yggdrasil/peer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 37b87c03..297e613d 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -127,7 +127,7 @@ func (ps *peers) newPeer(box *boxPubKey, func (p *peer) linkLoop(in <-chan []byte) { ticker := time.NewTicker(time.Second) defer ticker.Stop() - p.lastSend = time.Now() + var counter uint8 var lastRSeq uint64 for { select { @@ -151,7 +151,7 @@ func (p *peer) linkLoop(in <-chan []byte) { update = true case p.msgAnc.rseq != p.myMsg.seq: update = true - case time.Since(p.lastSend) > 3*time.Second: + case counter%4 == 0: update = true } if update { @@ -161,6 +161,7 @@ func (p *peer) linkLoop(in <-chan []byte) { p.lastSend = time.Now() p.sendSwitchAnnounce() } + counter = (counter + 1) % 4 } } } From 71150fcb8697d936a0c5f41337d8526aed744a4f Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 17 Feb 2018 23:57:24 -0600 Subject: [PATCH 06/12] ignore root tstamp updates if we just updated, to throttle the rate at which updates (and new signatures) can propagate --- src/yggdrasil/switch.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index e24217de..a297e893 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -11,11 +11,6 @@ package yggdrasil // TODO? use a pre-computed lookup table (python version had this) // A little annoying to do with constant changes from bandwidth estimates -// FIXME (!) throttle how often root updates are accepted -// If the root starts spaming with new timestamps, it should only affect their neighbors -// The rest of the network should see announcements at a somewhat reasonable rate -// Maybe no faster than 2x the usual update interval - import "time" import "sync" import "sync/atomic" @@ -23,6 +18,8 @@ import "sync/atomic" //import "fmt" const switch_timeout = time.Minute +const switch_updateInterval = switch_timeout / 2 +const switch_throttle = switch_updateInterval / 2 // You should be able to provide crypto signatures for this // 1 signature per coord, from the *sender* to that coord @@ -219,7 +216,7 @@ func (t *switchTable) cleanRoot() { } // Or, if we are the root, possibly update our timestamp if t.data.locator.root == t.key && - now.Sub(t.time) > switch_timeout/2 { + now.Sub(t.time) > switch_updateInterval { //fmt.Println("root is self and old, updating", t.data.locator.Root) doUpdate = true } @@ -343,9 +340,11 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig updateRoot = true case cost < pCost: updateRoot = true - case sender.port == t.parent && - (msg.locator.tstamp > t.data.locator.tstamp || - !equiv(&msg.locator, &t.data.locator)): + case sender.port != t.parent: // do nothing + case !equiv(&msg.locator, &t.data.locator): + updateRoot = true + case now.Sub(t.time) < switch_throttle: // do nothing + case msg.locator.tstamp > t.data.locator.tstamp: updateRoot = true } if updateRoot { From 4045597516eaaefc46e69f0da5802e6fa15b024c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 19 Feb 2018 19:34:51 -0600 Subject: [PATCH 07/12] Use larger UDP chunks for link-local IP and let the OS fragment it. Switch to UDP for link-local peers. Minor code cleanup for TCP. --- src/yggdrasil/tcp.go | 40 ++++++++++++++++----------------- src/yggdrasil/udp.go | 52 ++++++++++++++++++++++++------------------- src/yggdrasil/wire.go | 2 +- yggdrasil.go | 6 ++--- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index c5654132..8bada261 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -158,6 +158,23 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) + send := func(msg []byte) { + buf := net.Buffers{tcp_msg[:], + wire_encode_uint64(uint64(len(msg))), + msg} + size := 0 + for _, bs := range buf { + size += len(bs) + } + start := time.Now() + buf.WriteTo(sock) + timed := time.Since(start) + pType, _ := wire_decode_uint64(msg) + if pType == wire_LinkProtocolTraffic { + p.updateBandwidth(size, timed) + } + util_putBytes(msg) + } go func() { var stack [][]byte put := func(msg []byte) { @@ -167,25 +184,6 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { stack = stack[1:] } } - send := func() { - msg := stack[len(stack)-1] - stack = stack[:len(stack)-1] - buf := net.Buffers{tcp_msg[:], - wire_encode_uint64(uint64(len(msg))), - msg} - size := 0 - for _, bs := range buf { - size += len(bs) - } - start := time.Now() - buf.WriteTo(sock) - timed := time.Since(start) - pType, _ := wire_decode_uint64(msg) - if pType == wire_LinkProtocolTraffic { - p.updateBandwidth(size, timed) - } - util_putBytes(msg) - } for msg := range out { put(msg) for len(stack) > 0 { @@ -197,7 +195,9 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } put(msg) default: - send() + msg := stack[len(stack)-1] + stack = stack[:len(stack)-1] + send(msg) } } } diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 990a0890..5c2a7ab1 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -46,16 +46,17 @@ func (c *connAddr) toUDPAddr() *net.UDPAddr { } type connInfo struct { - name string - addr connAddr - peer *peer - linkIn chan []byte - keysIn chan *udpKeys - timeout int // count of how many heartbeats have been missed - in func([]byte) - out chan []byte - countIn uint8 - countOut uint8 + name string + addr connAddr + peer *peer + linkIn chan []byte + keysIn chan *udpKeys + timeout int // count of how many heartbeats have been missed + in func([]byte) + out chan []byte + countIn uint8 + countOut uint8 + chunkSize uint16 } type udpKeys struct { @@ -73,6 +74,8 @@ func (iface *udpInterface) init(core *Core, addr string) { if err != nil { panic(err) } + //iface.sock.SetReadBuffer(1048576) + //iface.sock.SetWriteBuffer(1048576) iface.conns = make(map[connAddr]*connInfo) go iface.reader() } @@ -162,12 +165,16 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String()) conn = &connInfo{ - name: themString, - addr: connAddr(addr), - peer: iface.core.peers.newPeer(&ks.box, &ks.sig), - linkIn: make(chan []byte, 1), - keysIn: make(chan *udpKeys, 1), - out: make(chan []byte, 32), + name: themString, + addr: connAddr(addr), + peer: iface.core.peers.newPeer(&ks.box, &ks.sig), + linkIn: make(chan []byte, 1), + keysIn: make(chan *udpKeys, 1), + out: make(chan []byte, 32), + chunkSize: 576 - 60 - 8 - 3, // max save - max ip - udp header - chunk overhead + } + if udpAddr.IP.IsLinkLocalUnicast() { + conn.chunkSize = 65535 - 8 - 3 } /* conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } @@ -236,8 +243,8 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { for msg := range conn.out { chunks = chunks[:0] bs := msg - for len(bs) > udp_chunkSize { - chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:] + for len(bs) > int(conn.chunkSize) { + chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:] } chunks = append(chunks, bs) //iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg)) @@ -284,7 +291,7 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) { } func (iface *udpInterface) reader() { - bs := make([]byte, 2048) // This needs to be large enough for everything... + bs := make([]byte, 65536) // This needs to be large enough for everything... for { //iface.core.log.Println("Starting read") n, udpAddr, err := iface.sock.ReadFromUDP(bs) @@ -293,9 +300,7 @@ func (iface *udpInterface) reader() { panic(err) break } - if n > 1500 { - panic(n) - } + //iface.core.log.Println("DEBUG: recv len:", n) //msg := append(util_getBytes(), bs[:n]...) msg := bs[:n] var addr connAddr @@ -319,7 +324,8 @@ func (iface *udpInterface) reader() { //////////////////////////////////////////////////////////////////////////////// -const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size +//const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size +//const udp_chunkSize = 65535 - 3 - 8 func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { if len(bs) >= 3 { diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index c4a6485b..379ef249 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -21,7 +21,7 @@ const ( wire_DHTLookupResponse // inside protocol traffic header wire_SearchRequest // inside protocol traffic header wire_SearchResponse // inside protocol traffic header - //wire_Keys // udp key packet (boxPub, sigPub) + wire_Keys // udp key packet (boxPub, sigPub) ) // Encode uint64 using a variable length scheme diff --git a/yggdrasil.go b/yggdrasil.go index 5abd6b09..959ba474 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -177,7 +177,7 @@ func (n *node) listen() { saddr := addr.String() //if _, isIn := n.peers[saddr]; isIn { continue } //n.peers[saddr] = struct{}{} - n.core.DEBUG_addTCPConn(saddr) // FIXME? can result in 2 connections per peer + n.core.DEBUG_maybeSendUDPKeys(saddr) // FIXME? can result in 2 connections per peer //fmt.Println("DEBUG:", "added multicast peer:", saddr) } } @@ -188,8 +188,8 @@ func (n *node) announce() { panic(err) } var anAddr net.TCPAddr - tcpAddr := n.core.DEBUG_getGlobalTCPAddr() - anAddr.Port = tcpAddr.Port + myAddr := n.core.DEBUG_getGlobalUDPAddr() + anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) if err != nil { panic(err) From a21a039b5709e3ea5b244a14eb5ee99f5ad14125 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 19 Feb 2018 19:47:11 -0600 Subject: [PATCH 08/12] remove unused field from peer --- src/yggdrasil/peer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 297e613d..c122ab1c 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -73,7 +73,6 @@ type peer struct { // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick throttle uint8 - lastSend time.Time // To throttle sends, use only from linkLoop goroutine } const peer_Throttle = 1 @@ -158,7 +157,6 @@ func (p *peer) linkLoop(in <-chan []byte) { if p.msgAnc != nil { lastRSeq = p.msgAnc.seq } - p.lastSend = time.Now() p.sendSwitchAnnounce() } counter = (counter + 1) % 4 From a81c361484234f4a5e157d0f96dd24458d28bd44 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 19 Feb 2018 23:22:36 -0600 Subject: [PATCH 09/12] tcp reconnect bufix, test with bufio, and switch back to tcp auto-peering by default to continue testing --- src/yggdrasil/tcp.go | 38 ++++++++++++++++++++++++++------------ yggdrasil.go | 6 +++--- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 8bada261..803ca7be 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -15,6 +15,7 @@ import "time" import "errors" import "sync" import "fmt" +import "bufio" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense @@ -148,7 +149,12 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { blockChan := make(chan struct{}) iface.conns[info] = blockChan iface.mutex.Unlock() - defer close(blockChan) + defer func() { + iface.mutex.Lock() + delete(iface.conns, info) + iface.mutex.Unlock() + close(blockChan) + }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces linkIn := make(chan []byte, 1) @@ -158,23 +164,29 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) + buf := bufio.NewWriterSize(sock, 65535) send := func(msg []byte) { - buf := net.Buffers{tcp_msg[:], - wire_encode_uint64(uint64(len(msg))), - msg} - size := 0 - for _, bs := range buf { - size += len(bs) - } + msgLen := wire_encode_uint64(uint64(len(msg))) + before := buf.Buffered() start := time.Now() - buf.WriteTo(sock) + buf.Write(tcp_msg[:]) + buf.Write(msgLen) + buf.Write(msg) timed := time.Since(start) - pType, _ := wire_decode_uint64(msg) - if pType == wire_LinkProtocolTraffic { - p.updateBandwidth(size, timed) + after := buf.Buffered() + written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after + if written > 0 { + p.updateBandwidth(written, timed) } util_putBytes(msg) } + flush := func() { + size := buf.Buffered() + start := time.Now() + buf.Flush() + timed := time.Since(start) + p.updateBandwidth(size, timed) + } go func() { var stack [][]byte put := func(msg []byte) { @@ -191,6 +203,7 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { select { case msg, ok := <-out: if !ok { + flush() return } put(msg) @@ -200,6 +213,7 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { send(msg) } } + flush() } }() p.out = func(msg []byte) { diff --git a/yggdrasil.go b/yggdrasil.go index 959ba474..110cd37f 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -118,7 +118,7 @@ func generateConfig() *nodeConfig { cfg.Multicast = true cfg.LinkLocal = "" cfg.IfName = "auto" - cfg.IfMTU = 65535 + cfg.IfMTU = 1280 //65535 if runtime.GOOS == "windows" { cfg.IfTAPMode = true } else { @@ -177,7 +177,7 @@ func (n *node) listen() { saddr := addr.String() //if _, isIn := n.peers[saddr]; isIn { continue } //n.peers[saddr] = struct{}{} - n.core.DEBUG_maybeSendUDPKeys(saddr) // FIXME? can result in 2 connections per peer + n.core.DEBUG_addTCPConn(saddr) // FIXME? can result in 2 connections per peer //fmt.Println("DEBUG:", "added multicast peer:", saddr) } } @@ -188,7 +188,7 @@ func (n *node) announce() { panic(err) } var anAddr net.TCPAddr - myAddr := n.core.DEBUG_getGlobalUDPAddr() + myAddr := n.core.DEBUG_getGlobalTCPAddr() anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) if err != nil { From d3dc7765f2a00cfc5a798861a83386f39f6c8d0b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 20 Feb 2018 17:31:12 -0600 Subject: [PATCH 10/12] trying to debug UDP+large MTU issues --- src/yggdrasil/tcp.go | 2 +- src/yggdrasil/udp.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 803ca7be..9e01dc6c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -164,7 +164,7 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) - buf := bufio.NewWriterSize(sock, 65535) + buf := bufio.NewWriterSize(sock, tcp_msgSize) send := func(msg []byte) { msgLen := wire_encode_uint64(uint64(len(msg))) before := buf.Buffered() diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 5c2a7ab1..fc7ed49f 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -171,10 +171,14 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { linkIn: make(chan []byte, 1), keysIn: make(chan *udpKeys, 1), out: make(chan []byte, 32), - chunkSize: 576 - 60 - 8 - 3, // max save - max ip - udp header - chunk overhead + chunkSize: 576 - 60 - 8 - 3, // max safe - max ip - udp header - chunk overhead } if udpAddr.IP.IsLinkLocalUnicast() { - conn.chunkSize = 65535 - 8 - 3 + ifce, err := net.InterfaceByName(udpAddr.Zone) + if ifce != nil && err == nil { + conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3 + } + //conn.chunkSize = 65535 - 8 - 3 } /* conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } @@ -262,6 +266,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { conn.countOut += 1 conn.peer.updateBandwidth(len(msg), timed) util_putBytes(msg) + //iface.core.log.Println("DEBUG: sent:", len(msg)) } }() //*/ @@ -296,6 +301,7 @@ func (iface *udpInterface) reader() { //iface.core.log.Println("Starting read") n, udpAddr, err := iface.sock.ReadFromUDP(bs) //iface.core.log.Println("Read", n, udpAddr.String(), err) + //iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n) if err != nil { panic(err) break From bb3ae8b39beeb3aedf247add9153c1cd0e737938 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 23 Feb 2018 12:46:22 -0600 Subject: [PATCH 11/12] temporarily limit PMTU to 2048, as a workaround to some pathelogical behavior where a TCP stream carried over a UDP peer can throttle down to 0 in the presence of significant packet loss --- src/yggdrasil/session.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 47a4e987..5aeeacb6 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -152,6 +152,15 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { sinfo.myNonce = *newBoxNonce() sinfo.theirMTU = 1280 sinfo.myMTU = uint16(ss.core.tun.mtu) + if sinfo.myMTU > 2048 { + // FIXME this is a temporary workaround to an issue with UDP peers + // UDP links need to fragment packets (within ygg) to get them over the wire + // For some reason, TCP streams over UDP peers can get stuck in a bad state + // When this happens, TCP throttles back, and each TCP retransmission loses fragments + // On my wifi network, it seems to happen around the 22nd-23rd fragment of a large packet + // By setting the path MTU to something small, this should (hopefully) mitigate the issue + sinfo.myMTU = 2048 + } higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { From 0470f6f1c11fa17f520b753355c48fa7fe3210db Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 23 Feb 2018 13:04:52 -0600 Subject: [PATCH 12/12] cleanup --- src/yggdrasil/peer.go | 8 ------- src/yggdrasil/router.go | 6 ------ src/yggdrasil/udp.go | 46 ----------------------------------------- src/yggdrasil/wire.go | 19 ----------------- yggdrasil.go | 2 +- 5 files changed, 1 insertion(+), 80 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index c122ab1c..7f1de6b2 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -223,8 +223,6 @@ func (p *peer) sendPacket(packet []byte) { func (p *peer) sendLinkPacket(packet []byte) { bs, nonce := boxSeal(&p.shared, packet, nil) linkPacket := wire_linkProtoTrafficPacket{ - //toKey: p.box, - //fromKey: p.core.boxPub, nonce: *nonce, payload: bs, } @@ -237,12 +235,6 @@ func (p *peer) handleLinkTraffic(bs []byte) { if !packet.decode(bs) { return } - //if packet.toKey != p.core.boxPub { - // return - //} - //if packet.fromKey != p.box { - // return - //} payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce) if !isOK { return diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 8bfa73b8..c25d64a7 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -295,9 +295,6 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { if !req.decode(bs) { return } - //if req.key != *fromKey { - // return - //} req.key = *fromKey r.core.dht.handleReq(&req) } @@ -307,9 +304,6 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { if !res.decode(bs) { return } - //if res.key != *fromKey { - // return - //} res.key = *fromKey r.core.dht.handleRes(&res) } diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index fc7ed49f..0a04e7f0 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -74,8 +74,6 @@ func (iface *udpInterface) init(core *Core, addr string) { if err != nil { panic(err) } - //iface.sock.SetReadBuffer(1048576) - //iface.sock.SetWriteBuffer(1048576) iface.conns = make(map[connAddr]*connInfo) go iface.reader() } @@ -99,7 +97,6 @@ func (iface *udpInterface) startConn(info *connInfo) { defer ticker.Stop() defer func() { // Cleanup - // FIXME this still leaks a peer struct iface.mutex.Lock() delete(iface.conns, info.addr) iface.mutex.Unlock() @@ -178,51 +175,18 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { if ifce != nil && err == nil { conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3 } - //conn.chunkSize = 65535 - 8 - 3 } - /* - conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } - conn.peer.out = func (msg []byte) { - start := time.Now() - iface.sock.WriteToUDP(msg, udpAddr) - timed := time.Since(start) - conn.peer.updateBandwidth(len(msg), timed) - util_putBytes(msg) - } // Old version, always one syscall per packet - //*/ - /* - conn.peer.out = func (msg []byte) { - defer func() { recover() }() - select { - case conn.out<-msg: - default: util_putBytes(msg) - } - } - go func () { - for msg := range conn.out { - start := time.Now() - iface.sock.WriteToUDP(msg, udpAddr) - timed := time.Since(start) - conn.peer.updateBandwidth(len(msg), timed) - util_putBytes(msg) - } - }() - //*/ - //* var inChunks uint8 var inBuf []byte conn.in = func(bs []byte) { //defer util_putBytes(bs) chunks, chunk, count, payload := udp_decode(bs) - //iface.core.log.Println("DEBUG:", addr, chunks, chunk, count, len(payload)) - //iface.core.log.Println("DEBUG: payload:", payload) if count != conn.countIn { inChunks = 0 inBuf = inBuf[:0] conn.countIn = count } if chunk <= chunks && chunk == inChunks+1 { - //iface.core.log.Println("GOING:", addr, chunks, chunk, count, len(payload)) inChunks += 1 inBuf = append(inBuf, payload...) if chunks != chunk { @@ -230,7 +194,6 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { } msg := append(util_getBytes(), inBuf...) conn.peer.handlePacket(msg, conn.linkIn) - //iface.core.log.Println("DONE:", addr, chunks, chunk, count, len(payload)) } } conn.peer.out = func(msg []byte) { @@ -251,7 +214,6 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:] } chunks = append(chunks, bs) - //iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg)) if len(chunks) > 255 { continue } @@ -266,7 +228,6 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { conn.countOut += 1 conn.peer.updateBandwidth(len(msg), timed) util_putBytes(msg) - //iface.core.log.Println("DEBUG: sent:", len(msg)) } }() //*/ @@ -298,16 +259,12 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) { func (iface *udpInterface) reader() { bs := make([]byte, 65536) // This needs to be large enough for everything... for { - //iface.core.log.Println("Starting read") n, udpAddr, err := iface.sock.ReadFromUDP(bs) - //iface.core.log.Println("Read", n, udpAddr.String(), err) //iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n) if err != nil { panic(err) break } - //iface.core.log.Println("DEBUG: recv len:", n) - //msg := append(util_getBytes(), bs[:n]...) msg := bs[:n] var addr connAddr addr.fromUDPAddr(udpAddr) @@ -330,9 +287,6 @@ func (iface *udpInterface) reader() { //////////////////////////////////////////////////////////////////////////////// -//const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size -//const udp_chunkSize = 65535 - 3 - 8 - func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { if len(bs) >= 3 { chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:] diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 379ef249..167a97d5 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -21,7 +21,6 @@ const ( wire_DHTLookupResponse // inside protocol traffic header wire_SearchRequest // inside protocol traffic header wire_SearchResponse // inside protocol traffic header - wire_Keys // udp key packet (boxPub, sigPub) ) // Encode uint64 using a variable length scheme @@ -112,8 +111,6 @@ func wire_put_coords(coords []byte, bs []byte) []byte { func wire_decode_coords(packet []byte) ([]byte, int) { coordLen, coordBegin := wire_decode_uint64(packet) coordEnd := coordBegin + int(coordLen) - //if coordBegin == 0 { panic("No coords found") } // Testing - //if coordEnd > len(packet) { panic("Packet too short") } // Testing if coordBegin == 0 || coordEnd > len(packet) { return nil, 0 } @@ -138,7 +135,6 @@ func (m *msgAnnounce) encode() []byte { bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...) bs = append(bs, wire_encode_uint64(m.seq)...) bs = append(bs, wire_encode_uint64(m.len)...) - //bs = append(bs, wire_encode_uint64(m.Deg)...) bs = append(bs, wire_encode_uint64(m.rseq)...) return bs } @@ -159,7 +155,6 @@ func (m *msgAnnounce) decode(bs []byte) bool { return false case !wire_chop_uint64(&m.len, &bs): return false - //case !wire_chop_uint64(&m.Deg, &bs): return false case !wire_chop_uint64(&m.rseq, &bs): return false } @@ -381,16 +376,12 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { } type wire_linkProtoTrafficPacket struct { - //toKey boxPubKey - //fromKey boxPubKey nonce boxNonce payload []byte } func (p *wire_linkProtoTrafficPacket) encode() []byte { bs := wire_encode_uint64(wire_LinkProtocolTraffic) - //bs = append(bs, p.toKey[:]...) - //bs = append(bs, p.fromKey[:]...) bs = append(bs, p.nonce[:]...) bs = append(bs, p.payload...) return bs @@ -403,10 +394,6 @@ func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_LinkProtocolTraffic: return false - //case !wire_chop_slice(p.toKey[:], &bs): - // return false - //case !wire_chop_slice(p.fromKey[:], &bs): - // return false case !wire_chop_slice(p.nonce[:], &bs): return false } @@ -468,7 +455,6 @@ func (p *sessionPing) decode(bs []byte) bool { func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupRequest) - //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) return bs @@ -481,8 +467,6 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - //case !wire_chop_slice(r.key[:], &bs): - // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): @@ -495,7 +479,6 @@ func (r *dhtReq) decode(bs []byte) bool { func (r *dhtRes) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupResponse) - //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) for _, info := range r.infos { @@ -513,8 +496,6 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - //case !wire_chop_slice(r.key[:], &bs): - // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): diff --git a/yggdrasil.go b/yggdrasil.go index 110cd37f..50a9c63c 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -118,7 +118,7 @@ func generateConfig() *nodeConfig { cfg.Multicast = true cfg.LinkLocal = "" cfg.IfName = "auto" - cfg.IfMTU = 1280 //65535 + cfg.IfMTU = 1280 if runtime.GOOS == "windows" { cfg.IfTAPMode = true } else {