From 690d29435d7a4c88e413a1d8e470cccf062c9468 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 17:44:10 -0500 Subject: [PATCH 01/23] adjust link packet logic so they bypass the lifo stack and are delivered first --- src/yggdrasil/debug.go | 12 ++++++------ src/yggdrasil/peer.go | 21 +++++++++------------ src/yggdrasil/router.go | 2 +- src/yggdrasil/tcp.go | 30 ++++++++++++++++++++---------- 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index cd42560c..599ee90c 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -453,16 +453,16 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.linkLoop() - plinkIn := make(chan []byte, 1) - qlinkIn := make(chan []byte, 1) + p.linkIn, q.linkIn = make(chan []byte, 32), make(chan []byte, 32) + p.linkOut, q.linkOut = q.linkIn, p.linkIn p.out = func(bs []byte) { - go q.handlePacket(bs, qlinkIn) + go q.handlePacket(bs) } q.out = func(bs []byte) { - go p.handlePacket(bs, plinkIn) + go p.handlePacket(bs) } - go p.linkLoop(plinkIn) - go q.linkLoop(qlinkIn) + go p.linkLoop() + go q.linkLoop() } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index fa1a2789..0113470e 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -111,6 +111,9 @@ type peer struct { close func() // To allow the peer to call close if idle for too long lastAnc time.Time + // used for protocol traffic (to bypass queues) + linkIn (chan []byte) // handlePacket sends, linkLoop recvs + linkOut (chan []byte) } const peer_Throttle = 1 @@ -123,8 +126,7 @@ func (p *peer) updateQueueSize(delta int64) { atomic.AddInt64(&p.queueSize, delta) } -func (ps *peers) newPeer(box *boxPubKey, - sig *sigPubKey) *peer { +func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -170,14 +172,14 @@ func (ps *peers) removePeer(port switchPort) { } } -func (p *peer) linkLoop(in <-chan []byte) { +func (p *peer) linkLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() var counter uint8 var lastRSeq uint64 for { select { - case packet, ok := <-in: + case packet, ok := <-p.linkIn: if !ok { return } @@ -214,7 +216,7 @@ func (p *peer) linkLoop(in <-chan []byte) { } } -func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { +func (p *peer) handlePacket(packet []byte) { // TODO See comment in sendPacket about atomics technically being done wrong atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) pType, pTypeLen := wire_decode_uint64(packet) @@ -227,12 +229,7 @@ func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen) case wire_LinkProtocolTraffic: - { - select { - case linkIn <- packet: - default: - } - } + p.linkIn <- packet default: /*panic(pType) ;*/ return } @@ -284,7 +281,7 @@ func (p *peer) sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - p.sendPacket(packet) + p.linkOut <- packet } func (p *peer) handleLinkTraffic(bs []byte) { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b848a792..a8797d5f 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -55,7 +55,7 @@ func (r *router) init(core *Core) { } } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet, nil) } // The caller is responsible for go-ing if it needs to not block + r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block recv := make(chan []byte, 32) send := make(chan []byte, 32) r.recv = recv diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index acde0344..e21522b8 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -208,10 +208,11 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) + p.linkIn = make(chan []byte, 1) + p.linkOut = make(chan []byte, 1) in := func(bs []byte) { - p.handlePacket(bs, linkIn) + p.handlePacket(bs) } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) @@ -221,10 +222,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { buf.Write(tcp_msg[:]) buf.Write(msgLen) buf.Write(msg) - p.updateQueueSize(-1) util_putBytes(msg) } go func() { + defer buf.Flush() var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) @@ -234,14 +235,22 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { p.updateQueueSize(-1) } } - for msg := range out { - put(msg) + for { + select { + case msg := <-p.linkOut: + send(msg) + case msg, ok := <-out: + if !ok { + return + } + put(msg) + } for len(stack) > 0 { - // Keep trying to fill the stack (LIFO order) while sending select { + case msg := <-p.linkOut: + send(msg) case msg, ok := <-out: if !ok { - buf.Flush() return } put(msg) @@ -249,6 +258,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { msg := stack[len(stack)-1] stack = stack[:len(stack)-1] send(msg) + p.updateQueueSize(-1) } } buf.Flush() @@ -265,11 +275,11 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } p.close = func() { sock.Close() } setNoDelay(sock, true) - go p.linkLoop(linkIn) + go p.linkLoop() defer func() { // Put all of our cleanup here... p.core.peers.removePeer(p.port) - close(linkIn) + close(p.linkIn) }() them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) themNodeID := getNodeID(&info.box) From 6811759fc9f3fd5e103546c3d6505c410503602c Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 21:11:10 -0500 Subject: [PATCH 02/23] add new switchMsg struct to replace old anc/hop/res approach --- src/yggdrasil/peer.go | 90 ++++++++++++++++++++++++++++++++++++++----- src/yggdrasil/wire.go | 56 +++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 0113470e..1656d392 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -205,11 +205,13 @@ func (p *peer) linkLoop() { case counter%4 == 0: update = true } - if update { + if true || update { + // TODO change update logic, the new switchMsg works differently if p.msgAnc != nil { lastRSeq = p.msgAnc.Seq } - p.sendSwitchAnnounce() + p.sendSwitchMsg() + //p.sendSwitchAnnounce() } counter = (counter + 1) % 4 } @@ -236,10 +238,11 @@ func (p *peer) handlePacket(packet []byte) { } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.msgAnc == nil { - // Drop traffic until the peer manages to send us at least one anc - return - } + //if p.port != 0 && p.msgAnc == nil { + // // Drop traffic until the peer manages to send us at least one anc + // // TODO? equivalent for new switch format? + // return + //} ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) ttlBegin := pTypeLen ttlEnd := pTypeLen + ttlLen @@ -298,6 +301,8 @@ func (p *peer) handleLinkTraffic(bs []byte) { return } switch pType { + case wire_SwitchMsg: + p.handleSwitchMsg(payload) case wire_SwitchAnnounce: p.handleSwitchAnnounce(payload) case wire_SwitchHopRequest: @@ -467,12 +472,79 @@ func (p *peer) processSwitchMessage() { if len(coords) == 0 { return } - // Reuse locator, set the coords to the peer's coords, to use in dht - msg.locator.coords = coords[:len(coords)-1] // Pass a mesage to the dht informing it that this peer (still) exists + l := msg.locator + l.coords = l.coords[:len(l.coords)-1] dinfo := dhtInfo{ key: p.box, - coords: msg.locator.getCoords(), + coords: l.getCoords(), + } + p.core.dht.peers <- &dinfo + p.core.log.Println("DEBUG: peers<-&dhtInfo", dinfo, p.box, msg) +} + +func (p *peer) sendSwitchMsg() { + info, sigs := p.core.switchTable.createMessage(p.port) + var msg switchMsg + msg.Root, msg.TStamp = info.locator.root, info.locator.tstamp + for idx, sig := range sigs { + hop := switchMsgHop{ + Port: info.locator.coords[idx], + Next: sig.next, + Sig: sig.sig, + } + msg.Hops = append(msg.Hops, hop) + } + msg.Hops = append(msg.Hops, switchMsgHop{ + Port: p.port, + Next: p.sig, + Sig: *sign(&p.core.sigPriv, getBytesForSig(&p.sig, &info.locator)), + }) + packet := msg.encode() + var test switchMsg + test.decode(packet) + //p.core.log.Println("Encoded msg:", msg, "; bytes:", packet) + p.sendLinkPacket(packet) +} + +func (p *peer) handleSwitchMsg(packet []byte) { + var msg switchMsg + msg.decode(packet) + //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) + if len(msg.Hops) < 1 { + p.throttle++ + panic("FIXME testing") + return + } + var info switchMessage + var sigs []sigInfo + info.locator.root = msg.Root + info.locator.tstamp = msg.TStamp + thisHopKey := &msg.Root + for _, hop := range msg.Hops { + var sig sigInfo + sig.next = hop.Next + sig.sig = hop.Sig + sigs = append(sigs, sig) + info.locator.coords = append(info.locator.coords, hop.Port) + // TODO check signatures + bs := getBytesForSig(&hop.Next, &info.locator) + if !p.core.sigs.check(thisHopKey, &hop.Sig, bs) { + //p.throttle++ + //panic("FIXME testing") + //return + } + thisHopKey = &hop.Next + } + info.from = p.sig + info.seq = uint64(time.Now().Unix()) + p.core.switchTable.handleMessage(&info, p.port, sigs) + // Pass a mesage to the dht informing it that this peer (still) exists + l := info.locator + l.coords = l.coords[:len(l.coords)-1] + dinfo := dhtInfo{ + key: p.box, + coords: l.getCoords(), } p.core.dht.peers <- &dinfo } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 6b592e5b..bd298de9 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -12,6 +12,7 @@ const ( wire_Traffic = iota // data being routed somewhere, handle for crypto wire_ProtocolTraffic // protocol traffic, pub keys for crypto wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto + wire_SwitchMsg // inside link protocol traffic header wire_SwitchAnnounce // inside protocol traffic header wire_SwitchHopRequest // inside protocol traffic header wire_SwitchHop // inside protocol traffic header @@ -117,6 +118,61 @@ func wire_decode_coords(packet []byte) ([]byte, int) { //////////////////////////////////////////////////////////////////////////////// +type switchMsg struct { + Root sigPubKey + TStamp int64 + Hops []switchMsgHop +} + +type switchMsgHop struct { + Port switchPort + Next sigPubKey + Sig sigBytes +} + +func (m *switchMsg) encode() []byte { + bs := wire_encode_uint64(wire_SwitchMsg) + bs = append(bs, m.Root[:]...) + bs = append(bs, wire_encode_uint64(wire_intToUint(m.TStamp))...) + for _, hop := range m.Hops { + bs = append(bs, wire_encode_uint64(uint64(hop.Port))...) + bs = append(bs, hop.Next[:]...) + bs = append(bs, hop.Sig[:]...) + } + return bs +} + +func (m *switchMsg) decode(bs []byte) bool { + var pType uint64 + var tstamp uint64 + switch { + case !wire_chop_uint64(&pType, &bs): + return false + case pType != wire_SwitchMsg: + return false + case !wire_chop_slice(m.Root[:], &bs): + return false + case !wire_chop_uint64(&tstamp, &bs): + return false + } + m.TStamp = wire_intFromUint(tstamp) + for len(bs) > 0 { + var hop switchMsgHop + switch { + case !wire_chop_uint64((*uint64)(&hop.Port), &bs): + return false + case !wire_chop_slice(hop.Next[:], &bs): + return false + case !wire_chop_slice(hop.Sig[:], &bs): + return false + } + m.Hops = append(m.Hops, hop) + } + return true +} + +//////////////////////////////////////////////////////////////////////////////// + // Announces that we can send parts of a Message with a particular seq type msgAnnounce struct { Root sigPubKey From 5fb33da3a2521319ff6a81c3f3c55d8141003b25 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 21:18:21 -0500 Subject: [PATCH 03/23] remove old switch anc/hop/res code --- src/yggdrasil/peer.go | 241 ++---------------------------------------- src/yggdrasil/wire.go | 133 +---------------------- 2 files changed, 7 insertions(+), 367 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 1656d392..7ddf4bbb 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -96,13 +96,9 @@ type peer struct { //in <-chan []byte //out chan<- []byte //in func([]byte) - out func([]byte) - core *Core - port switchPort - msgAnc *msgAnnounce - msgHops []*msgHop - myMsg *switchMessage - mySigs []sigInfo + out func([]byte) + core *Core + port switchPort // This is used to limit how often we perform expensive operations // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick @@ -175,8 +171,6 @@ func (ps *peers) removePeer(port switchPort) { func (p *peer) linkLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - var counter uint8 - var lastRSeq uint64 for { select { case packet, ok := <-p.linkIn: @@ -193,27 +187,8 @@ func (p *peer) linkLoop() { if p.port == 0 { continue } // Don't send announces on selfInterface - p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - var update bool - switch { - case p.msgAnc == nil: - update = true - case lastRSeq != p.msgAnc.Seq: - update = true - case p.msgAnc.Rseq != p.myMsg.seq: - update = true - case counter%4 == 0: - update = true - } - if true || update { - // TODO change update logic, the new switchMsg works differently - if p.msgAnc != nil { - lastRSeq = p.msgAnc.Seq - } - p.sendSwitchMsg() - //p.sendSwitchAnnounce() - } - counter = (counter + 1) % 4 + // TODO change update logic, the new switchMsg works differently, we only need to send if something changes + p.sendSwitchMsg() } } } @@ -303,186 +278,10 @@ func (p *peer) handleLinkTraffic(bs []byte) { switch pType { case wire_SwitchMsg: p.handleSwitchMsg(payload) - case wire_SwitchAnnounce: - p.handleSwitchAnnounce(payload) - case wire_SwitchHopRequest: - p.handleSwitchHopRequest(payload) - case wire_SwitchHop: - p.handleSwitchHop(payload) + default: // TODO?... } } -func (p *peer) handleSwitchAnnounce(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchAnnounce") - anc := msgAnnounce{} - //err := wire_decode_struct(packet, &anc) - //if err != nil { return } - if !anc.decode(packet) { - return - } - //if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil } - if p.msgAnc == nil || - anc.Root != p.msgAnc.Root || - anc.Tstamp != p.msgAnc.Tstamp || - anc.Seq != p.msgAnc.Seq { - p.msgHops = nil - } - p.msgAnc = &anc - p.processSwitchMessage() - p.lastAnc = time.Now() -} - -func (p *peer) requestHop(hop uint64) { - //p.core.log.Println("DEBUG requestHop") - req := msgHopReq{} - req.Root = p.msgAnc.Root - req.Tstamp = p.msgAnc.Tstamp - req.Seq = p.msgAnc.Seq - req.Hop = hop - packet := req.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) handleSwitchHopRequest(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHopRequest") - if p.throttle > peer_Throttle { - return - } - if p.myMsg == nil { - return - } - req := msgHopReq{} - if !req.decode(packet) { - return - } - if req.Root != p.myMsg.locator.root { - return - } - if req.Tstamp != p.myMsg.locator.tstamp { - return - } - if req.Seq != p.myMsg.seq { - return - } - if uint64(len(p.myMsg.locator.coords)) <= req.Hop { - return - } - res := msgHop{} - res.Root = p.myMsg.locator.root - res.Tstamp = p.myMsg.locator.tstamp - res.Seq = p.myMsg.seq - res.Hop = req.Hop - res.Port = p.myMsg.locator.coords[res.Hop] - sinfo := p.getSig(res.Hop) - //p.core.log.Println("DEBUG sig:", sinfo) - res.Next = sinfo.next - res.Sig = sinfo.sig - packet = res.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) handleSwitchHop(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHop") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - res := msgHop{} - if !res.decode(packet) { - return - } - if res.Root != p.msgAnc.Root { - return - } - if res.Tstamp != p.msgAnc.Tstamp { - return - } - if res.Seq != p.msgAnc.Seq { - return - } - if res.Hop != uint64(len(p.msgHops)) { - return - } // always process in order - loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)} - loc.root = res.Root - loc.tstamp = res.Tstamp - for _, hop := range p.msgHops { - loc.coords = append(loc.coords, hop.Port) - } - loc.coords = append(loc.coords, res.Port) - thisHopKey := &res.Root - if res.Hop != 0 { - thisHopKey = &p.msgHops[res.Hop-1].Next - } - bs := getBytesForSig(&res.Next, &loc) - if p.core.sigs.check(thisHopKey, &res.Sig, bs) { - p.msgHops = append(p.msgHops, &res) - p.processSwitchMessage() - } else { - p.throttle++ - } -} - -func (p *peer) processSwitchMessage() { - //p.core.log.Println("DEBUG: processSwitchMessage") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - if uint64(len(p.msgHops)) < p.msgAnc.Len { - p.requestHop(uint64(len(p.msgHops))) - return - } - p.throttle++ - if p.msgAnc.Len != uint64(len(p.msgHops)) { - return - } - msg := switchMessage{} - coords := make([]switchPort, 0, len(p.msgHops)) - sigs := make([]sigInfo, 0, len(p.msgHops)) - for idx, hop := range p.msgHops { - // Consistency checks, should be redundant (already checked these...) - if hop.Root != p.msgAnc.Root { - return - } - if hop.Tstamp != p.msgAnc.Tstamp { - return - } - if hop.Seq != p.msgAnc.Seq { - return - } - if hop.Hop != uint64(idx) { - return - } - coords = append(coords, hop.Port) - sigs = append(sigs, sigInfo{next: hop.Next, sig: hop.Sig}) - } - msg.from = p.sig - msg.locator.root = p.msgAnc.Root - msg.locator.tstamp = p.msgAnc.Tstamp - msg.locator.coords = coords - msg.seq = p.msgAnc.Seq - //msg.RSeq = p.msgAnc.RSeq - //msg.Degree = p.msgAnc.Deg - p.core.switchTable.handleMessage(&msg, p.port, sigs) - if len(coords) == 0 { - return - } - // Pass a mesage to the dht informing it that this peer (still) exists - l := msg.locator - l.coords = l.coords[:len(l.coords)-1] - dinfo := dhtInfo{ - key: p.box, - coords: l.getCoords(), - } - p.core.dht.peers <- &dinfo - p.core.log.Println("DEBUG: peers<-&dhtInfo", dinfo, p.box, msg) -} - func (p *peer) sendSwitchMsg() { info, sigs := p.core.switchTable.createMessage(p.port) var msg switchMsg @@ -549,34 +348,6 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.dht.peers <- &dinfo } -func (p *peer) sendSwitchAnnounce() { - anc := msgAnnounce{} - anc.Root = p.myMsg.locator.root - anc.Tstamp = p.myMsg.locator.tstamp - anc.Seq = p.myMsg.seq - anc.Len = uint64(len(p.myMsg.locator.coords)) - //anc.Deg = p.myMsg.Degree - if p.msgAnc != nil { - anc.Rseq = p.msgAnc.Seq - } - packet := anc.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) getSig(hop uint64) sigInfo { - //p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop) - if hop < uint64(len(p.mySigs)) { - return p.mySigs[hop] - } - bs := getBytesForSig(&p.sig, &p.myMsg.locator) - sig := sigInfo{} - sig.next = p.sig - sig.sig = *sign(&p.core.sigPriv, bs) - p.mySigs = append(p.mySigs, sig) - //p.core.log.Println("DEBUG sig bs:", bs) - return sig -} - func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { //bs, err := wire_encode_locator(loc) //if err != nil { panic(err) } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index bd298de9..9344d902 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -13,9 +13,6 @@ const ( wire_ProtocolTraffic // protocol traffic, pub keys for crypto wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto wire_SwitchMsg // inside link protocol traffic header - wire_SwitchAnnounce // inside protocol traffic header - wire_SwitchHopRequest // inside protocol traffic header - wire_SwitchHop // inside protocol traffic header wire_SessionPing // inside protocol traffic header wire_SessionPong // inside protocol traffic header wire_DHTLookupRequest // inside protocol traffic header @@ -173,136 +170,8 @@ func (m *switchMsg) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// -// Announces that we can send parts of a Message with a particular seq -type msgAnnounce struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Len uint64 - //Deg uint64 - Rseq uint64 -} - -func (m *msgAnnounce) encode() []byte { - bs := wire_encode_uint64(wire_SwitchAnnounce) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Len)...) - bs = append(bs, wire_encode_uint64(m.Rseq)...) - return bs -} - -func (m *msgAnnounce) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchAnnounce: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Len, &bs): - return false - case !wire_chop_uint64(&m.Rseq, &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -type msgHopReq struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 -} - -func (m *msgHopReq) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHopRequest) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - return bs -} - -func (m *msgHopReq) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHopRequest: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -type msgHop struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 - Port switchPort - Next sigPubKey - Sig sigBytes -} - -func (m *msgHop) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHop) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - bs = append(bs, wire_encode_uint64(uint64(m.Port))...) - bs = append(bs, m.Next[:]...) - bs = append(bs, m.Sig[:]...) - return bs -} - -func (m *msgHop) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHop: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - case !wire_chop_uint64((*uint64)(&m.Port), &bs): - return false - case !wire_chop_slice(m.Next[:], &bs): - return false - case !wire_chop_slice(m.Sig[:], &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - // Format used to check signatures only, so no need to also support decoding +// TODO something else for signatures func wire_encode_locator(loc *switchLocator) []byte { coords := wire_encode_coords(loc.getCoords()) var bs []byte From 1e7d34492d3cc923ea250cacd71f6dcf4d3f0fbe Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 22:39:22 -0500 Subject: [PATCH 04/23] fix signature checks and add some TODO reminder comments --- src/yggdrasil/peer.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7ddf4bbb..059ec739 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -100,13 +100,11 @@ type peer struct { core *Core port switchPort // This is used to limit how often we perform expensive operations - // Specifically, processing switch messages, signing, and verifying sigs - // Resets at the start of each tick - throttle uint8 + throttle uint8 // TODO apply this sanely // Called when a peer is removed, to close the underlying connection, or via admin api close func() // To allow the peer to call close if idle for too long - lastAnc time.Time + lastAnc time.Time // TODO? rename and use this // used for protocol traffic (to bypass queues) linkIn (chan []byte) // handlePacket sends, linkLoop recvs linkOut (chan []byte) @@ -149,7 +147,6 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { } func (ps *peers) removePeer(port switchPort) { - // TODO? store linkIn in the peer struct, close it here? (once) if port == 0 { return } // Can't remove self peer @@ -181,6 +178,7 @@ func (p *peer) linkLoop() { case <-ticker.C: if time.Since(p.lastAnc) > 16*time.Second && p.close != nil { // Seems to have timed out, try to trigger a close + // FIXME this depends on lastAnc or something equivalent being updated p.close() } p.throttle = 0 @@ -215,7 +213,7 @@ func (p *peer) handlePacket(packet []byte) { func (p *peer) handleTraffic(packet []byte, pTypeLen int) { //if p.port != 0 && p.msgAnc == nil { // // Drop traffic until the peer manages to send us at least one anc - // // TODO? equivalent for new switch format? + // // TODO equivalent for new switch format, maybe add some bool flag? // return //} ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) @@ -294,14 +292,13 @@ func (p *peer) sendSwitchMsg() { } msg.Hops = append(msg.Hops, hop) } + bs := getBytesForSig(&p.sig, &info.locator) msg.Hops = append(msg.Hops, switchMsgHop{ Port: p.port, Next: p.sig, - Sig: *sign(&p.core.sigPriv, getBytesForSig(&p.sig, &info.locator)), + Sig: *sign(&p.core.sigPriv, bs), }) packet := msg.encode() - var test switchMsg - test.decode(packet) //p.core.log.Println("Encoded msg:", msg, "; bytes:", packet) p.sendLinkPacket(packet) } @@ -319,21 +316,22 @@ func (p *peer) handleSwitchMsg(packet []byte) { var sigs []sigInfo info.locator.root = msg.Root info.locator.tstamp = msg.TStamp - thisHopKey := &msg.Root + prevKey := msg.Root for _, hop := range msg.Hops { + // Build locator and signatures var sig sigInfo sig.next = hop.Next sig.sig = hop.Sig sigs = append(sigs, sig) info.locator.coords = append(info.locator.coords, hop.Port) - // TODO check signatures - bs := getBytesForSig(&hop.Next, &info.locator) - if !p.core.sigs.check(thisHopKey, &hop.Sig, bs) { - //p.throttle++ - //panic("FIXME testing") - //return + // Check signature + bs := getBytesForSig(&sig.next, &info.locator) + if !p.core.sigs.check(&prevKey, &sig.sig, bs) { + p.throttle++ + panic("FIXME testing") + return } - thisHopKey = &hop.Next + prevKey = sig.next } info.from = p.sig info.seq = uint64(time.Now().Unix()) From 5dc0cb5544744e5cc68a05ca96dbe9a07b4d6f01 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 23:00:17 -0500 Subject: [PATCH 05/23] move wire_encode_locator logic into getBytesForSig, since that's the only place it's used --- src/yggdrasil/peer.go | 8 +++----- src/yggdrasil/wire.go | 11 ----------- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 059ec739..47f29f81 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -347,11 +347,9 @@ func (p *peer) handleSwitchMsg(packet []byte) { } func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { - //bs, err := wire_encode_locator(loc) - //if err != nil { panic(err) } bs := append([]byte(nil), next[:]...) - bs = append(bs, wire_encode_locator(loc)...) - //bs := wire_encode_locator(loc) - //bs = append(next[:], bs...) + bs = append(bs, loc.root[:]...) + bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...) + bs = append(bs, wire_encode_coords(loc.getCoords())...) return bs } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 9344d902..be2fb4e6 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -170,17 +170,6 @@ func (m *switchMsg) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// -// Format used to check signatures only, so no need to also support decoding -// TODO something else for signatures -func wire_encode_locator(loc *switchLocator) []byte { - coords := wire_encode_coords(loc.getCoords()) - var bs []byte - bs = append(bs, loc.root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...) - bs = append(bs, coords...) - return bs -} - func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { if len(*fromSlice) < len(toSlice) { return false From 3b783fbf974b1a5fc6a7c62242da105aa5137411 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 23:10:33 -0500 Subject: [PATCH 06/23] move periodic switch maintenance into the router instead of its own goroutine --- src/yggdrasil/core.go | 5 ----- src/yggdrasil/debug.go | 4 ---- src/yggdrasil/router.go | 1 + src/yggdrasil/switch.go | 15 +-------------- 4 files changed, 2 insertions(+), 23 deletions(-) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index b94d1544..3b5fcc1a 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -103,11 +103,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.switchTable.start(); err != nil { - c.log.Println("Failed to start switch table ticker") - return err - } - if err := c.admin.start(); err != nil { c.log.Println("Failed to start admin socket") return err diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 599ee90c..940db794 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -36,7 +36,6 @@ func (c *Core) Init() { spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) c.router.start() - c.switchTable.start() } //////////////////////////////////////////////////////////////////////////////// @@ -310,9 +309,6 @@ func (c *Core) DEBUG_init(bpub []byte, panic(err) } - if err := c.switchTable.start(); err != nil { - panic(err) - } } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index a8797d5f..872a4734 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -91,6 +91,7 @@ func (r *router) mainLoop() { case <-ticker.C: { // Any periodic maintenance stuff goes here + r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() util_getBytes() // To slowly drain things } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 8731e504..03921de2 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -170,26 +170,13 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.drop = make(map[sigPubKey]int64) } -func (t *switchTable) start() error { - doTicker := func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - <-ticker.C - t.Tick() - } - } - go doTicker() - return nil -} - func (t *switchTable) getLocator() switchLocator { t.mutex.RLock() defer t.mutex.RUnlock() return t.data.locator.clone() } -func (t *switchTable) Tick() { +func (t *switchTable) doMaintenance() { // Periodic maintenance work to keep things internally consistent t.mutex.Lock() // Write lock defer t.mutex.Unlock() // Release lock when we're done From 85afe187ff79eb3dececbffb5075b664aa193408 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 6 Jun 2018 23:23:16 -0500 Subject: [PATCH 07/23] remove peer timeout logic from the switch, so switch peer entrires are only removed when the peer struct is removed --- src/yggdrasil/peer.go | 10 ++++++++-- src/yggdrasil/switch.go | 17 +++-------------- src/yggdrasil/tcp.go | 1 - 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 47f29f81..15371e3e 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -150,6 +150,9 @@ func (ps *peers) removePeer(port switchPort) { if port == 0 { return } // Can't remove self peer + ps.core.router.doAdmin(func() { + ps.core.switchTable.removePeer(port) + }) ps.mutex.Lock() oldPorts := ps.getPorts() p, isIn := oldPorts[port] @@ -160,8 +163,11 @@ func (ps *peers) removePeer(port switchPort) { delete(newPorts, port) ps.putPorts(newPorts) ps.mutex.Unlock() - if isIn && p.close != nil { - p.close() + if isIn { + if p.close != nil { + p.close() + } + close(p.linkIn) } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 03921de2..d765f594 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -181,7 +181,6 @@ func (t *switchTable) doMaintenance() { t.mutex.Lock() // Write lock defer t.mutex.Unlock() // Release lock when we're done t.cleanRoot() - t.cleanPeers() t.cleanDropped() } @@ -227,19 +226,9 @@ func (t *switchTable) cleanRoot() { } } -func (t *switchTable) cleanPeers() { - now := time.Now() - changed := false - for idx, info := range t.data.peers { - if info.port != switchPort(0) && now.Sub(info.time) > 6*time.Second /*switch_timeout*/ { - //fmt.Println("peer timed out", t.key, info.locator) - delete(t.data.peers, idx) - changed = true - } - } - if changed { - t.updater.Store(&sync.Once{}) - } +func (t *switchTable) removePeer(port switchPort) { + delete(t.data.peers, port) + t.updater.Store(&sync.Once{}) } func (t *switchTable) cleanDropped() { diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index e21522b8..75be31b2 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -279,7 +279,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer func() { // Put all of our cleanup here... p.core.peers.removePeer(p.port) - close(p.linkIn) }() them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) themNodeID := getNodeID(&info.box) From ecf37cae8a1548ef2ebcb40dd53f0de2b51feeb1 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 00:16:47 -0500 Subject: [PATCH 08/23] make the switch react to peer coord changes immediately, and send out updates immediately --- src/yggdrasil/peer.go | 57 ++++++++++++++++++++++++++++++----------- src/yggdrasil/switch.go | 4 +++ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 15371e3e..0efcbe3f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -108,6 +108,8 @@ type peer struct { // used for protocol traffic (to bypass queues) linkIn (chan []byte) // handlePacket sends, linkLoop recvs linkOut (chan []byte) + lastMsg []byte // last switchMsg accepted + doSend (chan struct{}) // tell the linkLoop to send a switchMsg } const peer_Throttle = 1 @@ -127,6 +129,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { shared: *getSharedKey(&ps.core.boxPriv, box), lastAnc: now, firstSeen: now, + doSend: make(chan struct{}, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -171,8 +174,33 @@ func (ps *peers) removePeer(port switchPort) { } } +func (ps *peers) sendSwitchMsgs() { + ports := ps.getPorts() + for _, p := range ports { + if p.port == 0 { + continue + } + select { + case p.doSend <- struct{}{}: + default: + } + } +} + +func (ps *peers) fixSwitchAfterPeerDisconnect() { + // TODO something better, this is very wasteful + ports := ps.getPorts() + for _, p := range ports { + if p.lastMsg == nil { + continue + } + p.handleSwitchMsg(p.lastMsg) + } +} + func (p *peer) linkLoop() { - ticker := time.NewTicker(time.Second) + go func() { p.doSend <- struct{}{} }() + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { @@ -182,16 +210,15 @@ func (p *peer) linkLoop() { } p.handleLinkTraffic(packet) case <-ticker.C: - if time.Since(p.lastAnc) > 16*time.Second && p.close != nil { - // Seems to have timed out, try to trigger a close - // FIXME this depends on lastAnc or something equivalent being updated - p.close() - } p.throttle = 0 - if p.port == 0 { - continue - } // Don't send announces on selfInterface - // TODO change update logic, the new switchMsg works differently, we only need to send if something changes + if p.lastMsg != nil { + // TODO? remove ticker completely + // p.throttle isn't useful anymore (if they send a wrong message, remove peer instead) + // the handleMessage below is just for debugging, but it *shouldn't* be needed now that things react to state changes instantly + // The one case where it's maybe useful is if you get messages faster than the switch throttle, but that should fix itself after the next periodic update or timeout + p.handleSwitchMsg(p.lastMsg) + } + case <-p.doSend: p.sendSwitchMsg() } } @@ -217,11 +244,10 @@ func (p *peer) handlePacket(packet []byte) { } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - //if p.port != 0 && p.msgAnc == nil { - // // Drop traffic until the peer manages to send us at least one anc - // // TODO equivalent for new switch format, maybe add some bool flag? - // return - //} + if p.port != 0 && p.lastMsg == nil { + // Drop traffic until the peer manages to send us at least one good switchMsg + return + } ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) ttlBegin := pTypeLen ttlEnd := pTypeLen + ttlLen @@ -350,6 +376,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { coords: l.getCoords(), } p.core.dht.peers <- &dinfo + p.lastMsg = packet } func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d765f594..a9a0fcd6 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -223,12 +223,14 @@ func (t *switchTable) cleanRoot() { } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.sigs = nil + t.core.peers.sendSwitchMsgs() } } func (t *switchTable) removePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) + t.core.peers.fixSwitchAfterPeerDisconnect() } func (t *switchTable) cleanDropped() { @@ -250,6 +252,7 @@ func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) } func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) { + // TODO directly use a switchMsg instead of switchMessage + sigs t.mutex.Lock() defer t.mutex.Unlock() now := time.Now() @@ -344,6 +347,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig t.parent = sender.port t.data.sigs = sigs //t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) + t.core.peers.sendSwitchMsgs() } if doUpdate { t.updater.Store(&sync.Once{}) From deb755e3e99768de613cb441bd18053378783b30 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 00:49:06 -0500 Subject: [PATCH 09/23] remove peer.linkIn channel and related logic --- src/yggdrasil/debug.go | 13 +++++++++++-- src/yggdrasil/peer.go | 30 +++++------------------------- src/yggdrasil/tcp.go | 1 - 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index 940db794..f0808826 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -449,8 +449,17 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.linkLoop() - p.linkIn, q.linkIn = make(chan []byte, 32), make(chan []byte, 32) - p.linkOut, q.linkOut = q.linkIn, p.linkIn + p.linkOut, q.linkOut = make(chan []byte, 1), make(chan []byte, 1) + go func() { + for bs := range p.linkOut { + q.handlePacket(bs) + } + }() + go func() { + for bs := range q.linkOut { + p.handlePacket(bs) + } + }() p.out = func(bs []byte) { go q.handlePacket(bs) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 0efcbe3f..ce5cd0c2 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -106,7 +106,6 @@ type peer struct { // To allow the peer to call close if idle for too long lastAnc time.Time // TODO? rename and use this // used for protocol traffic (to bypass queues) - linkIn (chan []byte) // handlePacket sends, linkLoop recvs linkOut (chan []byte) lastMsg []byte // last switchMsg accepted doSend (chan struct{}) // tell the linkLoop to send a switchMsg @@ -170,7 +169,7 @@ func (ps *peers) removePeer(port switchPort) { if p.close != nil { p.close() } - close(p.linkIn) + close(p.doSend) } } @@ -200,27 +199,8 @@ func (ps *peers) fixSwitchAfterPeerDisconnect() { func (p *peer) linkLoop() { go func() { p.doSend <- struct{}{} }() - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case packet, ok := <-p.linkIn: - if !ok { - return - } - p.handleLinkTraffic(packet) - case <-ticker.C: - p.throttle = 0 - if p.lastMsg != nil { - // TODO? remove ticker completely - // p.throttle isn't useful anymore (if they send a wrong message, remove peer instead) - // the handleMessage below is just for debugging, but it *shouldn't* be needed now that things react to state changes instantly - // The one case where it's maybe useful is if you get messages faster than the switch throttle, but that should fix itself after the next periodic update or timeout - p.handleSwitchMsg(p.lastMsg) - } - case <-p.doSend: - p.sendSwitchMsg() - } + for range p.doSend { + p.sendSwitchMsg() } } @@ -237,8 +217,8 @@ func (p *peer) handlePacket(packet []byte) { case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen) case wire_LinkProtocolTraffic: - p.linkIn <- packet - default: /*panic(pType) ;*/ + p.handleLinkTraffic(packet) + default: return } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 75be31b2..90fb80b0 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -209,7 +209,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // Note that multiple connections to the same node are allowed // E.g. over different interfaces p := iface.core.peers.newPeer(&info.box, &info.sig) - p.linkIn = make(chan []byte, 1) p.linkOut = make(chan []byte, 1) in := func(bs []byte) { p.handlePacket(bs) From 3dab94be9f4cc500652fb0f9f97914da7be21794 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 10:58:24 -0500 Subject: [PATCH 10/23] keep dht peers alive --- src/yggdrasil/dht.go | 2 +- src/yggdrasil/peer.go | 33 +++++++++++++++++---------------- src/yggdrasil/router.go | 1 + src/yggdrasil/switch.go | 3 ++- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e59017a4..3c9f61c9 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -81,7 +81,7 @@ type dht struct { func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() - t.peers = make(chan *dhtInfo, 1) + t.peers = make(chan *dhtInfo, 1024) t.reqs = make(map[boxPubKey]map[NodeID]time.Time) } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ce5cd0c2..00f8a9c1 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -107,8 +107,8 @@ type peer struct { lastAnc time.Time // TODO? rename and use this // used for protocol traffic (to bypass queues) linkOut (chan []byte) - lastMsg []byte // last switchMsg accepted doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo *dhtInfo // used to keep the DHT working } const peer_Throttle = 1 @@ -186,21 +186,22 @@ func (ps *peers) sendSwitchMsgs() { } } -func (ps *peers) fixSwitchAfterPeerDisconnect() { - // TODO something better, this is very wasteful - ports := ps.getPorts() - for _, p := range ports { - if p.lastMsg == nil { - continue - } - p.handleSwitchMsg(p.lastMsg) - } -} - func (p *peer) linkLoop() { go func() { p.doSend <- struct{}{} }() - for range p.doSend { - p.sendSwitchMsg() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case _, ok := <-p.doSend: + if !ok { + return + } + p.sendSwitchMsg() + case _ = <-tick.C: + if p.dinfo != nil { + p.core.dht.peers <- p.dinfo + } + } } } @@ -224,7 +225,7 @@ func (p *peer) handlePacket(packet []byte) { } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.lastMsg == nil { + if p.port != 0 && p.dinfo == nil { // Drop traffic until the peer manages to send us at least one good switchMsg return } @@ -356,7 +357,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { coords: l.getCoords(), } p.core.dht.peers <- &dinfo - p.lastMsg = packet + p.dinfo = &dinfo } func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 872a4734..3246f63e 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -93,6 +93,7 @@ func (r *router) mainLoop() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() + //r.core.peers.fixSwitchAfterPeerDisconnect() // FIXME makes sure dht peers get added quickly util_getBytes() // To slowly drain things } case f := <-r.admin: diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a9a0fcd6..fec47af1 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -230,7 +230,7 @@ func (t *switchTable) cleanRoot() { func (t *switchTable) removePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) - t.core.peers.fixSwitchAfterPeerDisconnect() + // TODO if parent, find a new peer to use as parent instead } func (t *switchTable) cleanDropped() { @@ -287,6 +287,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig doUpdate := false if !equiv(&msg.locator, &oldSender.locator) { doUpdate = true + //sender.firstSeen = now // TODO? uncomment to prevent flapping? } t.data.peers[fromPort] = sender updateRoot := false From 00e4da28c74155c3b8310a0a1d702b1aea974b99 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 13:56:11 -0500 Subject: [PATCH 11/23] use/store switchMsg in the switch instead of going through the old switchMessage --- src/yggdrasil/peer.go | 58 +++++++++++++++++------------------------ src/yggdrasil/switch.go | 54 ++++++++++++++++++++++++++++++++++++-- src/yggdrasil/wire.go | 12 --------- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 00f8a9c1..042702e5 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -294,18 +294,11 @@ func (p *peer) handleLinkTraffic(bs []byte) { } func (p *peer) sendSwitchMsg() { - info, sigs := p.core.switchTable.createMessage(p.port) - var msg switchMsg - msg.Root, msg.TStamp = info.locator.root, info.locator.tstamp - for idx, sig := range sigs { - hop := switchMsgHop{ - Port: info.locator.coords[idx], - Next: sig.next, - Sig: sig.sig, - } - msg.Hops = append(msg.Hops, hop) + msg := p.core.switchTable.getMsg() + if msg == nil { + return } - bs := getBytesForSig(&p.sig, &info.locator) + bs := getBytesForSig(&p.sig, msg) msg.Hops = append(msg.Hops, switchMsgHop{ Port: p.port, Next: p.sig, @@ -313,6 +306,7 @@ func (p *peer) sendSwitchMsg() { }) packet := msg.encode() //p.core.log.Println("Encoded msg:", msg, "; bytes:", packet) + //fmt.Println("Encoded msg:", msg, "; bytes:", packet) p.sendLinkPacket(packet) } @@ -326,44 +320,40 @@ func (p *peer) handleSwitchMsg(packet []byte) { return } var info switchMessage - var sigs []sigInfo - info.locator.root = msg.Root - info.locator.tstamp = msg.TStamp + var loc switchLocator prevKey := msg.Root - for _, hop := range msg.Hops { - // Build locator and signatures - var sig sigInfo - sig.next = hop.Next - sig.sig = hop.Sig - sigs = append(sigs, sig) - info.locator.coords = append(info.locator.coords, hop.Port) - // Check signature - bs := getBytesForSig(&sig.next, &info.locator) - if !p.core.sigs.check(&prevKey, &sig.sig, bs) { + for idx, hop := range msg.Hops { + // Check signatures and collect coords for dht + sigMsg := msg + sigMsg.Hops = msg.Hops[:idx] + loc.coords = append(loc.coords, hop.Port) + bs := getBytesForSig(&hop.Next, &sigMsg) + if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { p.throttle++ panic("FIXME testing") return } - prevKey = sig.next + prevKey = hop.Next } - info.from = p.sig - info.seq = uint64(time.Now().Unix()) - p.core.switchTable.handleMessage(&info, p.port, sigs) + p.core.switchTable.handleMsg(&msg, &info, p.port) // Pass a mesage to the dht informing it that this peer (still) exists - l := info.locator - l.coords = l.coords[:len(l.coords)-1] + loc.coords = loc.coords[:len(loc.coords)-1] dinfo := dhtInfo{ key: p.box, - coords: l.getCoords(), + coords: loc.getCoords(), } p.core.dht.peers <- &dinfo p.dinfo = &dinfo } -func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { +func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte { + var loc switchLocator + for _, hop := range msg.Hops { + loc.coords = append(loc.coords, hop.Port) + } bs := append([]byte(nil), next[:]...) - bs = append(bs, loc.root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...) + bs = append(bs, msg.Root[:]...) + bs = append(bs, wire_encode_uint64(wire_intToUint(msg.TStamp))...) bs = append(bs, wire_encode_coords(loc.getCoords())...) return bs } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index fec47af1..09083262 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -118,6 +118,7 @@ type peerInfo struct { firstSeen time.Time port switchPort // Interface number of this peer seq uint64 // Seq number we last saw this peer advertise + smsg switchMsg // The wire switchMsg used } type switchMessage struct { @@ -144,6 +145,7 @@ type switchData struct { seq uint64 // Sequence number, reported to peers, so they know about changes peers map[switchPort]peerInfo sigs []sigInfo + msg *switchMsg } type switchTable struct { @@ -251,11 +253,58 @@ func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) return &msg, t.data.sigs } -func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) { +type switchMsg struct { + Root sigPubKey + TStamp int64 + Hops []switchMsgHop +} + +type switchMsgHop struct { + Port switchPort + Next sigPubKey + Sig sigBytes +} + +func (t *switchTable) getMsg() *switchMsg { + t.mutex.RLock() + defer t.mutex.RUnlock() + if t.parent == 0 { + return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} + } else if parent, isIn := t.data.peers[t.parent]; isIn { + msg := parent.smsg + msg.Hops = append([]switchMsgHop(nil), msg.Hops...) + return &msg + } else { + return nil + } +} + +func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort switchPort) { // TODO directly use a switchMsg instead of switchMessage + sigs t.mutex.Lock() defer t.mutex.Unlock() now := time.Now() + + //* + var msg switchMessage + var sigs []sigInfo + msg.locator.root = smsg.Root + msg.locator.tstamp = smsg.TStamp + msg.from = smsg.Root + prevKey := msg.from + for _, hop := range smsg.Hops { + // Build locator and signatures + var sig sigInfo + sig.next = hop.Next + sig.sig = hop.Sig + sigs = append(sigs, sig) + msg.locator.coords = append(msg.locator.coords, hop.Port) + msg.from = prevKey + prevKey = hop.Next + } + msg.seq = uint64(now.Unix()) + //*/ + if len(msg.locator.coords) == 0 { return } // Should always have >=1 links @@ -269,7 +318,8 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig time: now, firstSeen: oldSender.firstSeen, port: fromPort, - seq: msg.seq} + seq: msg.seq, + smsg: *smsg} equiv := func(x *switchLocator, y *switchLocator) bool { if x.root != y.root { return false diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index be2fb4e6..305e5fca 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -115,18 +115,6 @@ func wire_decode_coords(packet []byte) ([]byte, int) { //////////////////////////////////////////////////////////////////////////////// -type switchMsg struct { - Root sigPubKey - TStamp int64 - Hops []switchMsgHop -} - -type switchMsgHop struct { - Port switchPort - Next sigPubKey - Sig sigBytes -} - func (m *switchMsg) encode() []byte { bs := wire_encode_uint64(wire_SwitchMsg) bs = append(bs, m.Root[:]...) From f8ba80e7d89b5dabb3eb965d3e84cd482c3e9b5b Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 14:13:31 -0500 Subject: [PATCH 12/23] remove old switchMessage and clean up related code --- src/yggdrasil/peer.go | 3 +- src/yggdrasil/switch.go | 95 ++++++++++++++--------------------------- 2 files changed, 32 insertions(+), 66 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 042702e5..f9aca08d 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -319,7 +319,6 @@ func (p *peer) handleSwitchMsg(packet []byte) { panic("FIXME testing") return } - var info switchMessage var loc switchLocator prevKey := msg.Root for idx, hop := range msg.Hops { @@ -335,7 +334,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { } prevKey = hop.Next } - p.core.switchTable.handleMsg(&msg, &info, p.port) + p.core.switchTable.handleMsg(&msg, p.port) // Pass a mesage to the dht informing it that this peer (still) exists loc.coords = loc.coords[:len(loc.coords)-1] dinfo := dhtInfo{ diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 09083262..aa9e3d30 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -113,18 +113,10 @@ type peerInfo struct { key sigPubKey // ID of this peer locator switchLocator // Should be able to respond with signatures upon request degree uint64 // Self-reported degree - coords []switchPort // Coords of this peer (taken from coords of the sent locator) time time.Time // Time this node was last seen firstSeen time.Time port switchPort // Interface number of this peer - seq uint64 // Seq number we last saw this peer advertise - smsg switchMsg // The wire switchMsg used -} - -type switchMessage struct { - from sigPubKey // key of the sender - locator switchLocator // Locator advertised for the receiver, not the sender's loc! - seq uint64 + msg switchMsg // The wire switchMsg used } type switchPort uint64 @@ -144,7 +136,6 @@ type switchData struct { locator switchLocator seq uint64 // Sequence number, reported to peers, so they know about changes peers map[switchPort]peerInfo - sigs []sigInfo msg *switchMsg } @@ -224,7 +215,6 @@ func (t *switchTable) cleanRoot() { } } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} - t.data.sigs = nil t.core.peers.sendSwitchMsgs() } } @@ -244,15 +234,6 @@ func (t *switchTable) cleanDropped() { } } -func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) { - t.mutex.RLock() - defer t.mutex.RUnlock() - msg := switchMessage{from: t.key, locator: t.data.locator.clone()} - msg.locator.coords = append(msg.locator.coords, port) - msg.seq = t.data.seq - return &msg, t.data.sigs -} - type switchMsg struct { Root sigPubKey TStamp int64 @@ -271,7 +252,7 @@ func (t *switchTable) getMsg() *switchMsg { if t.parent == 0 { return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} } else if parent, isIn := t.data.peers[t.parent]; isIn { - msg := parent.smsg + msg := parent.msg msg.Hops = append([]switchMsgHop(nil), msg.Hops...) return &msg } else { @@ -279,47 +260,34 @@ func (t *switchTable) getMsg() *switchMsg { } } -func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort switchPort) { +func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { // TODO directly use a switchMsg instead of switchMessage + sigs t.mutex.Lock() defer t.mutex.Unlock() now := time.Now() - - //* - var msg switchMessage - var sigs []sigInfo - msg.locator.root = smsg.Root - msg.locator.tstamp = smsg.TStamp - msg.from = smsg.Root - prevKey := msg.from - for _, hop := range smsg.Hops { + // Set up the sender peerInfo + var sender peerInfo + sender.locator.root = msg.Root + sender.locator.tstamp = msg.TStamp + prevKey := msg.Root + for _, hop := range msg.Hops { // Build locator and signatures var sig sigInfo sig.next = hop.Next sig.sig = hop.Sig - sigs = append(sigs, sig) - msg.locator.coords = append(msg.locator.coords, hop.Port) - msg.from = prevKey + sender.locator.coords = append(sender.locator.coords, hop.Port) + sender.key = prevKey prevKey = hop.Next } - msg.seq = uint64(now.Unix()) - //*/ - - if len(msg.locator.coords) == 0 { - return - } // Should always have >=1 links + sender.msg = *msg oldSender, isIn := t.data.peers[fromPort] if !isIn { oldSender.firstSeen = now } - sender := peerInfo{key: msg.from, - locator: msg.locator, - coords: msg.locator.coords[:len(msg.locator.coords)-1], - time: now, - firstSeen: oldSender.firstSeen, - port: fromPort, - seq: msg.seq, - smsg: *smsg} + sender.firstSeen = oldSender.firstSeen + sender.port = fromPort + sender.time = now + // Decide what to do equiv := func(x *switchLocator, y *switchLocator) bool { if x.root != y.root { return false @@ -335,7 +303,7 @@ func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort s return true } doUpdate := false - if !equiv(&msg.locator, &oldSender.locator) { + if !equiv(&sender.locator, &oldSender.locator) { doUpdate = true //sender.firstSeen = now // TODO? uncomment to prevent flapping? } @@ -344,12 +312,12 @@ func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort s oldParent, isIn := t.data.peers[t.parent] noParent := !isIn noLoop := func() bool { - for idx := 0; idx < len(sigs)-1; idx++ { - if sigs[idx].next == t.core.sigPub { + for idx := 0; idx < len(msg.Hops)-1; idx++ { + if msg.Hops[idx].Next == t.core.sigPub { return false } } - if msg.locator.root == t.core.sigPub { + if sender.locator.root == t.core.sigPub { return false } return true @@ -358,30 +326,30 @@ func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort s pTime := oldParent.time.Sub(oldParent.firstSeen) + switch_timeout // Really want to compare sLen/sTime and pLen/pTime // Cross multiplied to avoid divide-by-zero - cost := len(msg.locator.coords) * int(pTime.Seconds()) + cost := len(sender.locator.coords) * int(pTime.Seconds()) pCost := len(t.data.locator.coords) * int(sTime.Seconds()) - dropTstamp, isIn := t.drop[msg.locator.root] + dropTstamp, isIn := t.drop[sender.locator.root] // Here be dragons switch { case !noLoop: // do nothing - case isIn && dropTstamp >= msg.locator.tstamp: // do nothing - case firstIsBetter(&msg.locator.root, &t.data.locator.root): + case isIn && dropTstamp >= sender.locator.tstamp: // do nothing + case firstIsBetter(&sender.locator.root, &t.data.locator.root): updateRoot = true - case t.data.locator.root != msg.locator.root: // do nothing - case t.data.locator.tstamp > msg.locator.tstamp: // do nothing + case t.data.locator.root != sender.locator.root: // do nothing + case t.data.locator.tstamp > sender.locator.tstamp: // do nothing case noParent: updateRoot = true case cost < pCost: updateRoot = true case sender.port != t.parent: // do nothing - case !equiv(&msg.locator, &t.data.locator): + case !equiv(&sender.locator, &t.data.locator): updateRoot = true case now.Sub(t.time) < switch_throttle: // do nothing - case msg.locator.tstamp > t.data.locator.tstamp: + case sender.locator.tstamp > t.data.locator.tstamp: updateRoot = true } if updateRoot { - if !equiv(&msg.locator, &t.data.locator) { + if !equiv(&sender.locator, &t.data.locator) { doUpdate = true t.data.seq++ select { @@ -391,12 +359,11 @@ func (t *switchTable) handleMsg(smsg *switchMsg, xmsg *switchMessage, fromPort s //t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords) //fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) } - if t.data.locator.tstamp != msg.locator.tstamp { + if t.data.locator.tstamp != sender.locator.tstamp { t.time = now } - t.data.locator = msg.locator + t.data.locator = sender.locator t.parent = sender.port - t.data.sigs = sigs //t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) t.core.peers.sendSwitchMsgs() } From d46888214780baee629043b2b17e4abe6c707dd9 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 14:24:02 -0500 Subject: [PATCH 13/23] cleanup --- src/yggdrasil/peer.go | 49 +++++++++-------------------------------- src/yggdrasil/switch.go | 2 +- 2 files changed, 12 insertions(+), 39 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f9aca08d..1834cfd2 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -11,17 +11,6 @@ package yggdrasil // It needs to ignore messages with a lower seq // Probably best to start setting seq to a timestamp in that case... -// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon -// That could happen with a peer over a high-latency link, with many msgHops -// Possible workarounds: -// 1. Pre-emptively send all hops when one is requested, or after any change -// Maybe requires changing how the throttle works and msgHops are saved -// In case some arrive out of order or are dropped -// This is relatively easy to implement, but could be wasteful -// 2. Save your old locator, sigs, etc, so you can respond to older ancs -// And finish requesting an old anc before updating to a new one -// But that may lead to other issues if not done carefully... - import "time" import "sync" import "sync/atomic" @@ -83,36 +72,23 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { } type peer struct { - // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends - // use get/update methods only! (atomic accessors as float64) - queueSize int64 + queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element - firstSeen time.Time // To track uptime for getPeers + core *Core + port switchPort box boxPubKey sig sigPubKey shared boxSharedKey - //in <-chan []byte - //out chan<- []byte - //in func([]byte) - out func([]byte) - core *Core - port switchPort - // This is used to limit how often we perform expensive operations - throttle uint8 // TODO apply this sanely - // Called when a peer is removed, to close the underlying connection, or via admin api - close func() - // To allow the peer to call close if idle for too long - lastAnc time.Time // TODO? rename and use this - // used for protocol traffic (to bypass queues) - linkOut (chan []byte) - doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working + firstSeen time.Time // To track uptime for getPeers + linkOut (chan []byte) // used for protocol traffic (to bypass queues) + doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo *dhtInfo // used to keep the DHT working + out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -const peer_Throttle = 1 - func (p *peer) getQueueSize() int64 { return atomic.LoadInt64(&p.queueSize) } @@ -126,7 +102,6 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { p := peer{box: *box, sig: *sig, shared: *getSharedKey(&ps.core.boxPriv, box), - lastAnc: now, firstSeen: now, doSend: make(chan struct{}, 1), core: ps.core} @@ -315,9 +290,8 @@ func (p *peer) handleSwitchMsg(packet []byte) { msg.decode(packet) //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) if len(msg.Hops) < 1 { - p.throttle++ panic("FIXME testing") - return + p.core.peers.removePeer(p.port) } var loc switchLocator prevKey := msg.Root @@ -328,9 +302,8 @@ func (p *peer) handleSwitchMsg(packet []byte) { loc.coords = append(loc.coords, hop.Port) bs := getBytesForSig(&hop.Next, &sigMsg) if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { - p.throttle++ panic("FIXME testing") - return + p.core.peers.removePeer(p.port) } prevKey = hop.Next } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index aa9e3d30..67d9d8fd 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -9,7 +9,7 @@ package yggdrasil // TODO document/comment everything in a lot more detail // TODO? use a pre-computed lookup table (python version had this) -// A little annoying to do with constant changes from bandwidth estimates +// A little annoying to do with constant changes from backpressure import "time" import "sort" From c1f8baf9b5f6ecff858374b74f70a49047ae6bd8 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 14:39:43 -0500 Subject: [PATCH 14/23] update dht.reset() to possibly play better with coord changes --- src/yggdrasil/dht.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 3c9f61c9..e77ab90d 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -520,9 +520,15 @@ func dht_firstCloserThanThird(first *NodeID, func (t *dht) reset() { // This is mostly so bootstrapping will reset to resend coords into the network + t.offset = 0 + t.rumorMill = nil // reset mill for _, b := range t.buckets_hidden { b.peers = b.peers[:0] + for _, info := range b.other { + // Add other nodes to the rumor mill so they'll be pinged soon + // This will hopefully tell them our coords and re-learn theirs quickly if they haven't changed + t.addToMill(info, info.getNodeID()) + } b.other = b.other[:0] } - t.offset = 0 } From 63feed8dc32a499aa316dcf404a33edab3433962 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 15:04:17 -0500 Subject: [PATCH 15/23] adjust tcp timeout and add shadow queues to track dropped packets --- src/yggdrasil/tcp.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 90fb80b0..6564dfda 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -225,16 +225,20 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } go func() { defer buf.Flush() + var shadow uint64 var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] - p.updateQueueSize(-1) + shadow++ } } for { + for ; shadow > 0; shadow-- { + p.updateQueueSize(-1) + } select { case msg := <-p.linkOut: send(msg) @@ -294,7 +298,7 @@ func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) { bs := make([]byte, 2*tcp_msgSize) frag := bs[:0] for { - timeout := time.Now().Add(6 * time.Second) + timeout := time.Now().Add(2 * time.Minute) sock.SetReadDeadline(timeout) n, err := sock.Read(bs[len(frag):]) if err != nil || n == 0 { From bcfeb2291518cd966dbbf345ea1188a1735cf034 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 16:49:51 -0500 Subject: [PATCH 16/23] more tcp debugging --- src/yggdrasil/router.go | 2 +- src/yggdrasil/switch.go | 10 +++++++++- src/yggdrasil/tcp.go | 31 +++++++++++++++++++------------ 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 3246f63e..ddb48486 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -93,7 +93,7 @@ func (r *router) mainLoop() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() - //r.core.peers.fixSwitchAfterPeerDisconnect() // FIXME makes sure dht peers get added quickly + //r.core.peers.sendSwitchMsgs() // FIXME debugging util_getBytes() // To slowly drain things } case f := <-r.admin: diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 67d9d8fd..de79f4a3 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -223,6 +223,9 @@ func (t *switchTable) removePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) // TODO if parent, find a new peer to use as parent instead + for _, info := range t.data.peers { + t.unlockedHandleMsg(&info.msg, info.port) + } } func (t *switchTable) cleanDropped() { @@ -261,9 +264,13 @@ func (t *switchTable) getMsg() *switchMsg { } func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { - // TODO directly use a switchMsg instead of switchMessage + sigs t.mutex.Lock() defer t.mutex.Unlock() + t.unlockedHandleMsg(msg, fromPort) +} + +func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { + // TODO directly use a switchMsg instead of switchMessage + sigs now := time.Now() // Set up the sender peerInfo var sender peerInfo @@ -433,6 +440,7 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { } } //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) + //t.core.log.Println("DEBUG: sending to", best, "cost", bestCost) return best, uint64(myDist) } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 6564dfda..e7b682c9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -15,7 +15,6 @@ import "time" import "errors" import "sync" import "fmt" -import "bufio" import "golang.org/x/net/proxy" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense @@ -215,16 +214,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) - buf := bufio.NewWriterSize(sock, tcp_msgSize) - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf.Write(tcp_msg[:]) - buf.Write(msgLen) - buf.Write(msg) - util_putBytes(msg) - } go func() { - defer buf.Flush() var shadow uint64 var stack [][]byte put := func(msg []byte) { @@ -235,11 +225,29 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { shadow++ } } + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + util_putBytes(msg) + } + timerInterval := 4 * time.Second + timer := time.NewTimer(timerInterval) + defer timer.Stop() for { for ; shadow > 0; shadow-- { p.updateQueueSize(-1) } + timer.Stop() select { + case <-timer.C: + default: + } + timer.Reset(timerInterval) + select { + case _ = <-timer.C: + //iface.core.log.Println("DEBUG: sending keep-alive:", sock.RemoteAddr().String()) + send(nil) // TCP keep-alive traffic case msg := <-p.linkOut: send(msg) case msg, ok := <-out: @@ -264,7 +272,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { p.updateQueueSize(-1) } } - buf.Flush() } }() p.out = func(msg []byte) { @@ -298,7 +305,7 @@ func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) { bs := make([]byte, 2*tcp_msgSize) frag := bs[:0] for { - timeout := time.Now().Add(2 * time.Minute) + timeout := time.Now().Add(6 * time.Second) sock.SetReadDeadline(timeout) n, err := sock.Read(bs[len(frag):]) if err != nil || n == 0 { From ec1c173ca55a134646750404ba81ce560d6824ff Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 16:53:39 -0500 Subject: [PATCH 17/23] it helps to check that messages decoded correctly --- src/yggdrasil/peer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 1834cfd2..27be44b3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -287,7 +287,9 @@ func (p *peer) sendSwitchMsg() { func (p *peer) handleSwitchMsg(packet []byte) { var msg switchMsg - msg.decode(packet) + if !msg.decode(packet) { + return + } //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) if len(msg.Hops) < 1 { panic("FIXME testing") From fe12e1509ade0a3a9f78eb4dadb26765d6b87721 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 17:55:43 -0500 Subject: [PATCH 18/23] add a throttle to nodes in the dht. the throttle is gradually increased each time the node is pinged. it determines the minimum amount of time to wait between using the node in a bootstrapping search --- src/yggdrasil/dht.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e77ab90d..95f6bb1e 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -36,6 +36,7 @@ type dhtInfo struct { send time.Time // When we last sent a message recv time.Time // When we last received a message pings int // Decide when to drop + throttle uint8 // Number of seconds to wait before pinging a node to bootstrap buckets, gradually increases up to 1 minute } func (info *dhtInfo) getNodeID() *NodeID { @@ -116,10 +117,11 @@ func (t *dht) handleRes(res *dhtRes) { return } rinfo := dhtInfo{ - key: res.Key, - coords: res.Coords, - send: time.Now(), // Technically wrong but should be OK... - recv: time.Now(), + key: res.Key, + coords: res.Coords, + send: time.Now(), // Technically wrong but should be OK... + recv: time.Now(), + throttle: 1, } // If they're already in the table, then keep the correct send time bidx, isOK := t.getBucketIndex(rinfo.getNodeID()) @@ -130,11 +132,13 @@ func (t *dht) handleRes(res *dhtRes) { for _, oldinfo := range b.peers { if oldinfo.key == rinfo.key { rinfo.send = oldinfo.send + rinfo.throttle += oldinfo.throttle } } for _, oldinfo := range b.other { if oldinfo.key == rinfo.key { rinfo.send = oldinfo.send + rinfo.throttle += oldinfo.throttle } } // Insert into table @@ -231,6 +235,9 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { // This speeds up bootstrapping info.recv = info.recv.Add(-time.Hour) } + if isPeer || info.throttle > 60 { + info.throttle = 60 + } // First drop any existing entry from the bucket b.drop(&info.key) // Now add to the *end* of the bucket @@ -460,7 +467,7 @@ func (t *dht) doMaintenance() { } target := t.getTarget(t.offset) for _, info := range t.lookup(target, true) { - if time.Since(info.recv) > time.Minute { + if time.Since(info.recv) > time.Duration(info.throttle)*time.Second { t.addToMill(info, target) t.offset++ break From 84c13fac90df2018e449e05f6f6e514ff676b6af Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 20:18:13 -0500 Subject: [PATCH 19/23] don't use TTL --- misc/sim/treesim.go | 4 ++++ src/yggdrasil/peer.go | 12 ++---------- src/yggdrasil/switch.go | 5 ++++- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index d8859261..91b8e0ef 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -207,6 +207,10 @@ func testPaths(store map[[32]byte]*Node) bool { // This is sufficient to check for routing loops or blackholes //break } + if here == next { + fmt.Println("Drop2:", source.index, here.index, dest.index, oldTTL) + return false + } here = next } } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 27be44b3..ee25b072 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -204,15 +204,14 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { // Drop traffic until the peer manages to send us at least one good switchMsg return } - ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) - ttlBegin := pTypeLen + _, ttlLen := wire_decode_uint64(packet[pTypeLen:]) ttlEnd := pTypeLen + ttlLen coords, coordLen := wire_decode_coords(packet[ttlEnd:]) coordEnd := ttlEnd + coordLen if coordEnd == len(packet) { return } // No payload - toPort, newTTL := p.core.switchTable.lookup(coords, ttl) + toPort, _ := p.core.switchTable.lookup(coords, 0) if toPort == p.port { return } @@ -220,13 +219,6 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if to == nil { return } - // This mutates the packet in-place if the length of the TTL changes! - ttlSlice := wire_encode_uint64(newTTL) - newTTLLen := len(ttlSlice) - shift := ttlLen - newTTLLen - copy(packet[shift:], packet[:pTypeLen]) - copy(packet[ttlBegin+shift:], ttlSlice) - packet = packet[shift:] to.sendPacket(packet) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index de79f4a3..be3027aa 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -418,6 +418,9 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { table := t.table.Load().(lookupTable) myDist := table.self.dist(dest) //getDist(table.self.coords) if !(uint64(myDist) < ttl) { + //return 0, 0 + } + if myDist == 0 { return 0, 0 } // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow @@ -441,7 +444,7 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { } //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) //t.core.log.Println("DEBUG: sending to", best, "cost", bestCost) - return best, uint64(myDist) + return best, ttl //uint64(myDist) } //////////////////////////////////////////////////////////////////////////////// From bced15b1389a648dc4694f953304103bf5dc8ad9 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 20:29:22 -0500 Subject: [PATCH 20/23] remove TTL from traffic packets --- misc/sim/treesim.go | 12 +++--------- src/yggdrasil/debug.go | 4 ++-- src/yggdrasil/dht.go | 2 -- src/yggdrasil/peer.go | 9 +++------ src/yggdrasil/session.go | 2 -- src/yggdrasil/switch.go | 14 +++++--------- src/yggdrasil/wire.go | 8 -------- 7 files changed, 13 insertions(+), 38 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 91b8e0ef..83548e81 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -160,17 +160,11 @@ func testPaths(store map[[32]byte]*Node) bool { ttl := ^uint64(0) oldTTL := ttl for here := source; here != dest; { - if ttl == 0 { - fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL) - return false - } temp++ if temp > 4096 { panic("Loop?") } - oldTTL = ttl - nextPort, newTTL := here.core.DEBUG_switchLookup(coords, ttl) - ttl = newTTL + nextPort := here.core.DEBUG_switchLookup(coords) // First check if "here" is accepting packets from the previous node // TODO explain how this works ports := here.core.DEBUG_getPeers().DEBUG_getPorts() @@ -208,7 +202,7 @@ func testPaths(store map[[32]byte]*Node) bool { //break } if here == next { - fmt.Println("Drop2:", source.index, here.index, dest.index, oldTTL) + fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL) return false } here = next @@ -231,7 +225,7 @@ func stressTest(store map[[32]byte]*Node) { start := time.Now() for _, source := range store { for _, coords := range dests { - source.core.DEBUG_switchLookup(coords, ^uint64(0)) + source.core.DEBUG_switchLookup(coords) lookups++ } } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f0808826..6b8211cb 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -126,8 +126,8 @@ func (l *switchLocator) DEBUG_getCoords() []byte { return l.getCoords() } -func (c *Core) DEBUG_switchLookup(dest []byte, ttl uint64) (switchPort, uint64) { - return c.switchTable.lookup(dest, ttl) +func (c *Core) DEBUG_switchLookup(dest []byte) switchPort { + return c.switchTable.lookup(dest) } /* diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 95f6bb1e..33933914 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -326,7 +326,6 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: dest.coords, ToKey: dest.key, FromKey: t.core.boxPub, @@ -352,7 +351,6 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: req.Coords, ToKey: req.Key, FromKey: t.core.boxPub, diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index ee25b072..497024ca 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -204,14 +204,11 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { // Drop traffic until the peer manages to send us at least one good switchMsg return } - _, ttlLen := wire_decode_uint64(packet[pTypeLen:]) - ttlEnd := pTypeLen + ttlLen - coords, coordLen := wire_decode_coords(packet[ttlEnd:]) - coordEnd := ttlEnd + coordLen - if coordEnd == len(packet) { + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { return } // No payload - toPort, _ := p.core.switchTable.lookup(coords, 0) + toPort := p.core.switchTable.lookup(coords) if toPort == p.port { return } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 8bbbc33c..33111424 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -255,7 +255,6 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: sinfo.coords, ToKey: sinfo.theirPermPub, FromKey: ss.core.boxPub, @@ -383,7 +382,6 @@ func (sinfo *sessionInfo) doSend(bs []byte) { payload, nonce := boxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce) defer util_putBytes(payload) p := wire_trafficPacket{ - TTL: ^uint64(0), Coords: sinfo.coords, Handle: sinfo.theirHandle, Nonce: *nonce, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index be3027aa..4db4c67f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -413,22 +413,19 @@ func (t *switchTable) updateTable() { t.table.Store(newTable) } -func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { +func (t *switchTable) lookup(dest []byte) switchPort { t.updater.Load().(*sync.Once).Do(t.updateTable) table := t.table.Load().(lookupTable) - myDist := table.self.dist(dest) //getDist(table.self.coords) - if !(uint64(myDist) < ttl) { - //return 0, 0 - } + myDist := table.self.dist(dest) if myDist == 0 { - return 0, 0 + return 0 } // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow ports := t.core.peers.getPorts() var best switchPort bestCost := int64(^uint64(0) >> 1) for _, info := range table.elems { - dist := info.locator.dist(dest) //getDist(info.locator.coords) + dist := info.locator.dist(dest) if !(dist < myDist) { continue } @@ -442,9 +439,8 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { bestCost = cost } } - //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) //t.core.log.Println("DEBUG: sending to", best, "cost", bestCost) - return best, ttl //uint64(myDist) + return best } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 305e5fca..3b43143b 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -192,7 +192,6 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { // Wire traffic packets type wire_trafficPacket struct { - TTL uint64 Coords []byte Handle handle Nonce boxNonce @@ -203,7 +202,6 @@ type wire_trafficPacket struct { func (p *wire_trafficPacket) encode() []byte { bs := util_getBytes() bs = wire_put_uint64(wire_Traffic, bs) - bs = wire_put_uint64(p.TTL, bs) bs = wire_put_coords(p.Coords, bs) bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) @@ -219,8 +217,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case pType != wire_Traffic: return false - case !wire_chop_uint64(&p.TTL, &bs): - return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.Handle[:], &bs): @@ -233,7 +229,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { } type wire_protoTrafficPacket struct { - TTL uint64 Coords []byte ToKey boxPubKey FromKey boxPubKey @@ -244,7 +239,6 @@ type wire_protoTrafficPacket struct { func (p *wire_protoTrafficPacket) encode() []byte { coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) - bs = append(bs, wire_encode_uint64(p.TTL)...) bs = append(bs, coords...) bs = append(bs, p.ToKey[:]...) bs = append(bs, p.FromKey[:]...) @@ -260,8 +254,6 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_ProtocolTraffic: return false - case !wire_chop_uint64(&p.TTL, &bs): - return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.ToKey[:], &bs): From 6bdc9a7eb634036f9a8883f797aea5fab1e3636d Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 21:06:30 -0500 Subject: [PATCH 21/23] fix the sim, part of it bypasses queues so it's expected to see loops in those cases while things are in the middle of updating --- misc/sim/treesim.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index 83548e81..0316b8fd 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -162,7 +162,9 @@ func testPaths(store map[[32]byte]*Node) bool { for here := source; here != dest; { temp++ if temp > 4096 { - panic("Loop?") + fmt.Println("Loop?") + time.Sleep(time.Second) + return false } nextPort := here.core.DEBUG_switchLookup(coords) // First check if "here" is accepting packets from the previous node @@ -195,7 +197,7 @@ func testPaths(store map[[32]byte]*Node) bool { source.index, source.core.DEBUG_getLocator(), here.index, here.core.DEBUG_getLocator(), dest.index, dest.core.DEBUG_getLocator()) - here.core.DEBUG_getSwitchTable().DEBUG_dumpTable() + //here.core.DEBUG_getSwitchTable().DEBUG_dumpTable() } if here != source { // This is sufficient to check for routing loops or blackholes From ea1d21f7e55c51dc2b0553b89a8c5fe1dcafed67 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 21:28:08 -0500 Subject: [PATCH 22/23] don't change dhtInfo.pings when sending a search, to prevent multiple different searches from evicting a node --- src/yggdrasil/search.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 4b9cd03a..d9dc76e3 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -107,7 +107,10 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + var oldPings int + oldPings, next.pings = next.pings, 0 s.core.dht.ping(next, &sinfo.dest) + next.pings = oldPings // Don't evict a node for searching with it too much sinfo.visited[*next.getNodeID()] = true } } From 495891d9e82a2ba2ffe4716a7ebd96c5ec7c35b3 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Thu, 7 Jun 2018 22:32:01 -0500 Subject: [PATCH 23/23] remove testing panics --- src/yggdrasil/peer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 497024ca..51175ef3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -281,7 +281,6 @@ func (p *peer) handleSwitchMsg(packet []byte) { } //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) if len(msg.Hops) < 1 { - panic("FIXME testing") p.core.peers.removePeer(p.port) } var loc switchLocator @@ -293,7 +292,6 @@ func (p *peer) handleSwitchMsg(packet []byte) { loc.coords = append(loc.coords, hop.Port) bs := getBytesForSig(&hop.Next, &sigMsg) if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { - panic("FIXME testing") p.core.peers.removePeer(p.port) } prevKey = hop.Next