diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d42430a5..37b87c03 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -73,6 +73,7 @@ type peer struct { // Specifically, processing switch messages, signing, and verifying sigs // Resets at the start of each tick throttle uint8 + lastSend time.Time // To throttle sends, use only from linkLoop goroutine } const peer_Throttle = 1 @@ -126,6 +127,8 @@ func (ps *peers) newPeer(box *boxPubKey, func (p *peer) linkLoop(in <-chan []byte) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + p.lastSend = time.Now() + var lastRSeq uint64 for { select { case packet, ok := <-in: @@ -139,9 +142,25 @@ func (p *peer) linkLoop(in <-chan []byte) { if p.port == 0 { continue } // Don't send announces on selfInterface - // Maybe we shouldn't time out, and instead wait for a kill signal? p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - p.sendSwitchAnnounce() + var update bool + switch { + case p.msgAnc == nil: + update = true + case lastRSeq != p.msgAnc.seq: + update = true + case p.msgAnc.rseq != p.myMsg.seq: + update = true + case time.Since(p.lastSend) > 3*time.Second: + update = true + } + if update { + if p.msgAnc != nil { + lastRSeq = p.msgAnc.seq + } + p.lastSend = time.Now() + p.sendSwitchAnnounce() + } } } } @@ -186,11 +205,12 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if to == nil { return } - newTTLLen := wire_uint64_len(newTTL) // This mutates the packet in-place if the length of the TTL changes! + ttlSlice := wire_encode_uint64(newTTL) + newTTLLen := len(ttlSlice) shift := ttlLen - newTTLLen - wire_put_uint64(newTTL, packet[ttlBegin+shift:]) copy(packet[shift:], packet[:pTypeLen]) + copy(packet[ttlBegin+shift:], ttlSlice) packet = packet[shift:] to.sendPacket(packet) } @@ -418,7 +438,9 @@ func (p *peer) sendSwitchAnnounce() { anc.seq = p.myMsg.seq anc.len = uint64(len(p.myMsg.locator.coords)) //anc.Deg = p.myMsg.Degree - //anc.RSeq = p.myMsg.RSeq + if p.msgAnc != nil { + anc.rseq = p.msgAnc.seq + } packet := anc.encode() p.sendLinkPacket(packet) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 8ca5d2b9..8bfa73b8 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -295,9 +295,10 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { if !req.decode(bs) { return } - if req.key != *fromKey { - return - } + //if req.key != *fromKey { + // return + //} + req.key = *fromKey r.core.dht.handleReq(&req) } @@ -306,9 +307,10 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { if !res.decode(bs) { return } - if res.key != *fromKey { - return - } + //if res.key != *fromKey { + // return + //} + res.key = *fromKey r.core.dht.handleRes(&res) } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index f50ec431..c4a6485b 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -129,7 +129,7 @@ type msgAnnounce struct { seq uint64 len uint64 //Deg uint64 - //RSeq uint64 + rseq uint64 } func (m *msgAnnounce) encode() []byte { @@ -139,7 +139,7 @@ func (m *msgAnnounce) encode() []byte { bs = append(bs, wire_encode_uint64(m.seq)...) bs = append(bs, wire_encode_uint64(m.len)...) //bs = append(bs, wire_encode_uint64(m.Deg)...) - //bs = append(bs, wire_encode_uint64(m.RSeq)...) + bs = append(bs, wire_encode_uint64(m.rseq)...) return bs } @@ -159,8 +159,9 @@ func (m *msgAnnounce) decode(bs []byte) bool { return false case !wire_chop_uint64(&m.len, &bs): return false - //case !wire_chop_uint64(&m.Deg, &bs): return false - //case !wire_chop_uint64(&m.RSeq, &bs): return false + //case !wire_chop_uint64(&m.Deg, &bs): return false + case !wire_chop_uint64(&m.rseq, &bs): + return false } m.tstamp = wire_intFromUint(tstamp) return true @@ -467,7 +468,7 @@ func (p *sessionPing) decode(bs []byte) bool { func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupRequest) - bs = append(bs, r.key[:]...) + //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) return bs @@ -480,8 +481,8 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - case !wire_chop_slice(r.key[:], &bs): - return false + //case !wire_chop_slice(r.key[:], &bs): + // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs): @@ -494,7 +495,7 @@ func (r *dhtReq) decode(bs []byte) bool { func (r *dhtRes) encode() []byte { coords := wire_encode_coords(r.coords) bs := wire_encode_uint64(wire_DHTLookupResponse) - bs = append(bs, r.key[:]...) + //bs = append(bs, r.key[:]...) bs = append(bs, coords...) bs = append(bs, r.dest[:]...) for _, info := range r.infos { @@ -512,8 +513,8 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - case !wire_chop_slice(r.key[:], &bs): - return false + //case !wire_chop_slice(r.key[:], &bs): + // return false case !wire_chop_coords(&r.coords, &bs): return false case !wire_chop_slice(r.dest[:], &bs):