diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 98f9cfb2..7f1de6b2 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -126,6 +126,8 @@ func (ps *peers) newPeer(box *boxPubKey, func (p *peer) linkLoop(in <-chan []byte) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + var counter uint8 + var lastRSeq uint64 for { select { case packet, ok := <-in: @@ -139,9 +141,25 @@ func (p *peer) linkLoop(in <-chan []byte) { if p.port == 0 { continue } // Don't send announces on selfInterface - // Maybe we shouldn't time out, and instead wait for a kill signal? p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - p.sendSwitchAnnounce() + var update bool + switch { + case p.msgAnc == nil: + update = true + case lastRSeq != p.msgAnc.seq: + update = true + case p.msgAnc.rseq != p.myMsg.seq: + update = true + case counter%4 == 0: + update = true + } + if update { + if p.msgAnc != nil { + lastRSeq = p.msgAnc.seq + } + p.sendSwitchAnnounce() + } + counter = (counter + 1) % 4 } } } @@ -186,11 +204,12 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if to == nil { return } - newTTLLen := wire_uint64_len(newTTL) // This mutates the packet in-place if the length of the TTL changes! + ttlSlice := wire_encode_uint64(newTTL) + newTTLLen := len(ttlSlice) shift := ttlLen - newTTLLen - wire_put_uint64(newTTL, packet[ttlBegin+shift:]) copy(packet[shift:], packet[:pTypeLen]) + copy(packet[ttlBegin+shift:], ttlSlice) packet = packet[shift:] to.sendPacket(packet) } @@ -204,8 +223,6 @@ func (p *peer) sendPacket(packet []byte) { func (p *peer) sendLinkPacket(packet []byte) { bs, nonce := boxSeal(&p.shared, packet, nil) linkPacket := wire_linkProtoTrafficPacket{ - toKey: p.box, - fromKey: p.core.boxPub, nonce: *nonce, payload: bs, } @@ -218,12 +235,6 @@ func (p *peer) handleLinkTraffic(bs []byte) { if !packet.decode(bs) { return } - if packet.toKey != p.core.boxPub { - return - } - if packet.fromKey != p.box { - return - } payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce) if !isOK { return @@ -418,7 +429,9 @@ func (p *peer) sendSwitchAnnounce() { anc.seq = p.myMsg.seq anc.len = uint64(len(p.myMsg.locator.coords)) //anc.Deg = p.myMsg.Degree - //anc.RSeq = p.myMsg.RSeq + if p.msgAnc != nil { + anc.rseq = p.msgAnc.seq + } packet := anc.encode() p.sendLinkPacket(packet) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 8ca5d2b9..c25d64a7 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -295,9 +295,7 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { if !req.decode(bs) { return } - if req.key != *fromKey { - return - } + req.key = *fromKey r.core.dht.handleReq(&req) } @@ -306,9 +304,7 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { if !res.decode(bs) { return } - if res.key != *fromKey { - return - } + res.key = *fromKey r.core.dht.handleRes(&res) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 47a4e987..5aeeacb6 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -152,6 +152,15 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { sinfo.myNonce = *newBoxNonce() sinfo.theirMTU = 1280 sinfo.myMTU = uint16(ss.core.tun.mtu) + if sinfo.myMTU > 2048 { + // FIXME this is a temporary workaround to an issue with UDP peers + // UDP links need to fragment packets (within ygg) to get them over the wire + // For some reason, TCP streams over UDP peers can get stuck in a bad state + // When this happens, TCP throttles back, and each TCP retransmission loses fragments + // On my wifi network, it seems to happen around the 22nd-23rd fragment of a large packet + // By setting the path MTU to something small, this should (hopefully) mitigate the issue + sinfo.myMTU = 2048 + } higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 04b86709..a297e893 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -11,11 +11,6 @@ package yggdrasil // TODO? use a pre-computed lookup table (python version had this) // A little annoying to do with constant changes from bandwidth estimates -// FIXME (!) throttle how often root updates are accepted -// If the root starts spaming with new timestamps, it should only affect their neighbors -// The rest of the network should see announcements at a somewhat reasonable rate -// Maybe no faster than 2x the usual update interval - import "time" import "sync" import "sync/atomic" @@ -23,6 +18,8 @@ import "sync/atomic" //import "fmt" const switch_timeout = time.Minute +const switch_updateInterval = switch_timeout / 2 +const switch_throttle = switch_updateInterval / 2 // You should be able to provide crypto signatures for this // 1 signature per coord, from the *sender* to that coord @@ -219,7 +216,7 @@ func (t *switchTable) cleanRoot() { } // Or, if we are the root, possibly update our timestamp if t.data.locator.root == t.key && - now.Sub(t.time) > switch_timeout/2 { + now.Sub(t.time) > switch_updateInterval { //fmt.Println("root is self and old, updating", t.data.locator.Root) doUpdate = true } @@ -343,9 +340,11 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig updateRoot = true case cost < pCost: updateRoot = true - case sender.port == t.parent && - (msg.locator.tstamp > t.data.locator.tstamp || - !equiv(&msg.locator, &t.data.locator)): + case sender.port != t.parent: // do nothing + case !equiv(&msg.locator, &t.data.locator): + updateRoot = true + case now.Sub(t.time) < switch_throttle: // do nothing + case msg.locator.tstamp > t.data.locator.tstamp: updateRoot = true } if updateRoot { @@ -390,6 +389,9 @@ func (t *switchTable) updateTable() { } for _, pinfo := range t.data.peers { //if !pinfo.forward { continue } + if pinfo.locator.root != newTable.self.root { + continue + } loc := pinfo.locator.clone() loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link newTable.elems = append(newTable.elems, tableElem{ @@ -422,9 +424,6 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { // score is in units of bandwidth / distance bestScore := float64(-1) for _, info := range table.elems { - if info.locator.root != table.self.root { - continue - } dist := info.locator.dist(dest) //getDist(info.locator.coords) if !(dist < myDist) { continue diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a190679f..9e01dc6c 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -15,6 +15,7 @@ import "time" import "errors" import "sync" import "fmt" +import "bufio" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense @@ -23,11 +24,14 @@ type tcpInterface struct { serv *net.TCPListener mutex sync.Mutex // Protecting the below calls map[string]struct{} + conns map[tcpInfo](chan struct{}) } -type tcpKeys struct { - box boxPubKey - sig sigPubKey +type tcpInfo struct { + box boxPubKey + sig sigPubKey + localAddr string // net.IPAddr.String(), not TCPAddr, don't care about port + remoteAddr string } func (iface *tcpInterface) init(core *Core, addr string) { @@ -41,6 +45,7 @@ func (iface *tcpInterface) init(core *Core, addr string) { panic(err) } iface.calls = make(map[string]struct{}) + iface.conns = make(map[tcpInfo](chan struct{})) go iface.listener() } @@ -102,8 +107,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { if n < len(keys) { /*panic("Partial key packet?") ;*/ return } - ks := tcpKeys{} - if !tcp_chop_keys(&ks.box, &ks.sig, &keys) { /*panic("Invalid key packet?") ;*/ + info := tcpInfo{} + if !tcp_chop_keys(&info.box, &info.sig, &keys) { /*panic("Invalid key packet?") ;*/ return } // Quit the parent call if this is a connection to ourself @@ -115,21 +120,73 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { } return true } - if equiv(ks.box[:], iface.core.boxPub[:]) { + if equiv(info.box[:], iface.core.boxPub[:]) { return } // testing - if equiv(ks.sig[:], iface.core.sigPub[:]) { + if equiv(info.sig[:], iface.core.sigPub[:]) { return } + // Check if we already have a connection to this node, close and block if yes + local := sock.LocalAddr().(*net.TCPAddr) + laddr := net.IPAddr{ + IP: local.IP, + Zone: local.Zone, + } + info.localAddr = laddr.String() + remote := sock.RemoteAddr().(*net.TCPAddr) + raddr := net.IPAddr{ + IP: remote.IP, + Zone: remote.Zone, + } + info.remoteAddr = raddr.String() + iface.mutex.Lock() + if blockChan, isIn := iface.conns[info]; isIn { + iface.mutex.Unlock() + sock.Close() + <-blockChan + return + } + blockChan := make(chan struct{}) + iface.conns[info] = blockChan + iface.mutex.Unlock() + defer func() { + iface.mutex.Lock() + delete(iface.conns, info) + iface.mutex.Unlock() + close(blockChan) + }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&ks.box, &ks.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) in := func(bs []byte) { p.handlePacket(bs, linkIn) } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) + buf := bufio.NewWriterSize(sock, tcp_msgSize) + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + before := buf.Buffered() + start := time.Now() + buf.Write(tcp_msg[:]) + buf.Write(msgLen) + buf.Write(msg) + timed := time.Since(start) + after := buf.Buffered() + written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after + if written > 0 { + p.updateBandwidth(written, timed) + } + util_putBytes(msg) + } + flush := func() { + size := buf.Buffered() + start := time.Now() + buf.Flush() + timed := time.Since(start) + p.updateBandwidth(size, timed) + } go func() { var stack [][]byte put := func(msg []byte) { @@ -139,25 +196,6 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { stack = stack[1:] } } - send := func() { - msg := stack[len(stack)-1] - stack = stack[:len(stack)-1] - buf := net.Buffers{tcp_msg[:], - wire_encode_uint64(uint64(len(msg))), - msg} - size := 0 - for _, bs := range buf { - size += len(bs) - } - start := time.Now() - buf.WriteTo(sock) - timed := time.Since(start) - pType, _ := wire_decode_uint64(msg) - if pType == wire_LinkProtocolTraffic { - p.updateBandwidth(size, timed) - } - util_putBytes(msg) - } for msg := range out { put(msg) for len(stack) > 0 { @@ -165,13 +203,17 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { select { case msg, ok := <-out: if !ok { + flush() return } put(msg) default: - send() + msg := stack[len(stack)-1] + stack = stack[:len(stack)-1] + send(msg) } } + flush() } }() p.out = func(msg []byte) { @@ -197,8 +239,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) { p.core.peers.mutex.Unlock() close(linkIn) }() - them := sock.RemoteAddr() - themNodeID := getNodeID(&ks.box) + them := sock.RemoteAddr().(*net.TCPAddr) + themNodeID := getNodeID(&info.box) themAddr := address_addrForNodeID(themNodeID) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 990a0890..0a04e7f0 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -46,16 +46,17 @@ func (c *connAddr) toUDPAddr() *net.UDPAddr { } type connInfo struct { - name string - addr connAddr - peer *peer - linkIn chan []byte - keysIn chan *udpKeys - timeout int // count of how many heartbeats have been missed - in func([]byte) - out chan []byte - countIn uint8 - countOut uint8 + name string + addr connAddr + peer *peer + linkIn chan []byte + keysIn chan *udpKeys + timeout int // count of how many heartbeats have been missed + in func([]byte) + out chan []byte + countIn uint8 + countOut uint8 + chunkSize uint16 } type udpKeys struct { @@ -96,7 +97,6 @@ func (iface *udpInterface) startConn(info *connInfo) { defer ticker.Stop() defer func() { // Cleanup - // FIXME this still leaks a peer struct iface.mutex.Lock() delete(iface.conns, info.addr) iface.mutex.Unlock() @@ -162,56 +162,31 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String()) conn = &connInfo{ - name: themString, - addr: connAddr(addr), - peer: iface.core.peers.newPeer(&ks.box, &ks.sig), - linkIn: make(chan []byte, 1), - keysIn: make(chan *udpKeys, 1), - out: make(chan []byte, 32), + name: themString, + addr: connAddr(addr), + peer: iface.core.peers.newPeer(&ks.box, &ks.sig), + linkIn: make(chan []byte, 1), + keysIn: make(chan *udpKeys, 1), + out: make(chan []byte, 32), + chunkSize: 576 - 60 - 8 - 3, // max safe - max ip - udp header - chunk overhead + } + if udpAddr.IP.IsLinkLocalUnicast() { + ifce, err := net.InterfaceByName(udpAddr.Zone) + if ifce != nil && err == nil { + conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3 + } } - /* - conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } - conn.peer.out = func (msg []byte) { - start := time.Now() - iface.sock.WriteToUDP(msg, udpAddr) - timed := time.Since(start) - conn.peer.updateBandwidth(len(msg), timed) - util_putBytes(msg) - } // Old version, always one syscall per packet - //*/ - /* - conn.peer.out = func (msg []byte) { - defer func() { recover() }() - select { - case conn.out<-msg: - default: util_putBytes(msg) - } - } - go func () { - for msg := range conn.out { - start := time.Now() - iface.sock.WriteToUDP(msg, udpAddr) - timed := time.Since(start) - conn.peer.updateBandwidth(len(msg), timed) - util_putBytes(msg) - } - }() - //*/ - //* var inChunks uint8 var inBuf []byte conn.in = func(bs []byte) { //defer util_putBytes(bs) chunks, chunk, count, payload := udp_decode(bs) - //iface.core.log.Println("DEBUG:", addr, chunks, chunk, count, len(payload)) - //iface.core.log.Println("DEBUG: payload:", payload) if count != conn.countIn { inChunks = 0 inBuf = inBuf[:0] conn.countIn = count } if chunk <= chunks && chunk == inChunks+1 { - //iface.core.log.Println("GOING:", addr, chunks, chunk, count, len(payload)) inChunks += 1 inBuf = append(inBuf, payload...) if chunks != chunk { @@ -219,7 +194,6 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { } msg := append(util_getBytes(), inBuf...) conn.peer.handlePacket(msg, conn.linkIn) - //iface.core.log.Println("DONE:", addr, chunks, chunk, count, len(payload)) } } conn.peer.out = func(msg []byte) { @@ -236,11 +210,10 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { for msg := range conn.out { chunks = chunks[:0] bs := msg - for len(bs) > udp_chunkSize { - chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:] + for len(bs) > int(conn.chunkSize) { + chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:] } chunks = append(chunks, bs) - //iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg)) if len(chunks) > 255 { continue } @@ -284,19 +257,14 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) { } func (iface *udpInterface) reader() { - bs := make([]byte, 2048) // This needs to be large enough for everything... + bs := make([]byte, 65536) // This needs to be large enough for everything... for { - //iface.core.log.Println("Starting read") n, udpAddr, err := iface.sock.ReadFromUDP(bs) - //iface.core.log.Println("Read", n, udpAddr.String(), err) + //iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n) if err != nil { panic(err) break } - if n > 1500 { - panic(n) - } - //msg := append(util_getBytes(), bs[:n]...) msg := bs[:n] var addr connAddr addr.fromUDPAddr(udpAddr) @@ -319,8 +287,6 @@ func (iface *udpInterface) reader() { //////////////////////////////////////////////////////////////////////////////// -const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size - func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { if len(bs) >= 3 { chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:] diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 6d574541..167a97d5 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -21,7 +21,6 @@ const ( wire_DHTLookupResponse // inside protocol traffic header wire_SearchRequest // inside protocol traffic header wire_SearchResponse // inside protocol traffic header - //wire_Keys // udp key packet (boxPub, sigPub) ) // Encode uint64 using a variable length scheme @@ -112,8 +111,6 @@ func wire_put_coords(coords []byte, bs []byte) []byte { func wire_decode_coords(packet []byte) ([]byte, int) { coordLen, coordBegin := wire_decode_uint64(packet) coordEnd := coordBegin + int(coordLen) - //if coordBegin == 0 { panic("No coords found") } // Testing - //if coordEnd > len(packet) { panic("Packet too short") } // Testing if coordBegin == 0 || coordEnd > len(packet) { return nil, 0 } @@ -129,7 +126,7 @@ type msgAnnounce struct { seq uint64 len uint64 //Deg uint64 - //RSeq uint64 + rseq uint64 } func (m *msgAnnounce) encode() []byte { @@ -138,8 +135,7 @@ func (m *msgAnnounce) encode() []byte { bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...) bs = append(bs, wire_encode_uint64(m.seq)...) bs = append(bs, wire_encode_uint64(m.len)...) - //bs = append(bs, wire_encode_uint64(m.Deg)...) - //bs = append(bs, wire_encode_uint64(m.RSeq)...) + bs = append(bs, wire_encode_uint64(m.rseq)...) return bs } @@ -159,8 +155,8 @@ func (m *msgAnnounce) decode(bs []byte) bool { return false case !wire_chop_uint64(&m.len, &bs): return false - //case !wire_chop_uint64(&m.Deg, &bs): return false - //case !wire_chop_uint64(&m.RSeq, &bs): return false + case !wire_chop_uint64(&m.rseq, &bs): + return false } m.tstamp = wire_intFromUint(tstamp) return true @@ -380,16 +376,12 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { } type wire_linkProtoTrafficPacket struct { - toKey boxPubKey - fromKey boxPubKey nonce boxNonce payload []byte } func (p *wire_linkProtoTrafficPacket) encode() []byte { bs := wire_encode_uint64(wire_LinkProtocolTraffic) - bs = append(bs, p.toKey[:]...) - bs = append(bs, p.fromKey[:]...) bs = append(bs, p.nonce[:]...) bs = append(bs, p.payload...) return bs @@ -402,10 +394,6 @@ func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_LinkProtocolTraffic: return false - case !wire_chop_slice(p.toKey[:], &bs): - return false - case !wire_chop_slice(p.fromKey[:], &bs): - return false case !wire_chop_slice(p.nonce[:], &bs): return false } @@ -467,7 +455,6 @@ func (p *sessionPing) decode(bs []byte) bool { func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupRequest) - bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) return bs @@ -480,8 +467,6 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - case !wire_chop_slice(r.key[:], &bs): - return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): @@ -494,7 +479,6 @@ func (r *dhtReq) decode(bs []byte) bool { func (r *dhtRes) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupResponse) - bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) for _, info := range r.infos { @@ -512,8 +496,6 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - case !wire_chop_slice(r.key[:], &bs): - return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): diff --git a/yggdrasil.go b/yggdrasil.go index 5abd6b09..50a9c63c 100644 --- a/yggdrasil.go +++ b/yggdrasil.go @@ -118,7 +118,7 @@ func generateConfig() *nodeConfig { cfg.Multicast = true cfg.LinkLocal = "" cfg.IfName = "auto" - cfg.IfMTU = 65535 + cfg.IfMTU = 1280 if runtime.GOOS == "windows" { cfg.IfTAPMode = true } else { @@ -188,8 +188,8 @@ func (n *node) announce() { panic(err) } var anAddr net.TCPAddr - tcpAddr := n.core.DEBUG_getGlobalTCPAddr() - anAddr.Port = tcpAddr.Port + myAddr := n.core.DEBUG_getGlobalTCPAddr() + anAddr.Port = myAddr.Port destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) if err != nil { panic(err)