send fewer link announcements, fix a ttl encoding bug, and remove unnecessary key field from the wire format for dht req/res

This commit is contained in:
Arceliar 2018-02-17 21:59:08 -06:00
parent 8ba11b86bb
commit 430d49d8a4
3 changed files with 46 additions and 21 deletions

View File

@ -73,6 +73,7 @@ type peer struct {
// Specifically, processing switch messages, signing, and verifying sigs // Specifically, processing switch messages, signing, and verifying sigs
// Resets at the start of each tick // Resets at the start of each tick
throttle uint8 throttle uint8
lastSend time.Time // To throttle sends, use only from linkLoop goroutine
} }
const peer_Throttle = 1 const peer_Throttle = 1
@ -126,6 +127,8 @@ func (ps *peers) newPeer(box *boxPubKey,
func (p *peer) linkLoop(in <-chan []byte) { func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()
p.lastSend = time.Now()
var lastRSeq uint64
for { for {
select { select {
case packet, ok := <-in: case packet, ok := <-in:
@ -139,13 +142,29 @@ func (p *peer) linkLoop(in <-chan []byte) {
if p.port == 0 { if p.port == 0 {
continue continue
} // Don't send announces on selfInterface } // Don't send announces on selfInterface
// Maybe we shouldn't time out, and instead wait for a kill signal?
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
var update bool
switch {
case p.msgAnc == nil:
update = true
case lastRSeq != p.msgAnc.seq:
update = true
case p.msgAnc.rseq != p.myMsg.seq:
update = true
case time.Since(p.lastSend) > 3*time.Second:
update = true
}
if update {
if p.msgAnc != nil {
lastRSeq = p.msgAnc.seq
}
p.lastSend = time.Now()
p.sendSwitchAnnounce() p.sendSwitchAnnounce()
} }
} }
} }
} }
}
func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) {
pType, pTypeLen := wire_decode_uint64(packet) pType, pTypeLen := wire_decode_uint64(packet)
@ -186,11 +205,12 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
if to == nil { if to == nil {
return return
} }
newTTLLen := wire_uint64_len(newTTL)
// This mutates the packet in-place if the length of the TTL changes! // This mutates the packet in-place if the length of the TTL changes!
ttlSlice := wire_encode_uint64(newTTL)
newTTLLen := len(ttlSlice)
shift := ttlLen - newTTLLen shift := ttlLen - newTTLLen
wire_put_uint64(newTTL, packet[ttlBegin+shift:])
copy(packet[shift:], packet[:pTypeLen]) copy(packet[shift:], packet[:pTypeLen])
copy(packet[ttlBegin+shift:], ttlSlice)
packet = packet[shift:] packet = packet[shift:]
to.sendPacket(packet) to.sendPacket(packet)
} }
@ -418,7 +438,9 @@ func (p *peer) sendSwitchAnnounce() {
anc.seq = p.myMsg.seq anc.seq = p.myMsg.seq
anc.len = uint64(len(p.myMsg.locator.coords)) anc.len = uint64(len(p.myMsg.locator.coords))
//anc.Deg = p.myMsg.Degree //anc.Deg = p.myMsg.Degree
//anc.RSeq = p.myMsg.RSeq if p.msgAnc != nil {
anc.rseq = p.msgAnc.seq
}
packet := anc.encode() packet := anc.encode()
p.sendLinkPacket(packet) p.sendLinkPacket(packet)
} }

View File

@ -295,9 +295,10 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) {
if !req.decode(bs) { if !req.decode(bs) {
return return
} }
if req.key != *fromKey { //if req.key != *fromKey {
return // return
} //}
req.key = *fromKey
r.core.dht.handleReq(&req) r.core.dht.handleReq(&req)
} }
@ -306,9 +307,10 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) {
if !res.decode(bs) { if !res.decode(bs) {
return return
} }
if res.key != *fromKey { //if res.key != *fromKey {
return // return
} //}
res.key = *fromKey
r.core.dht.handleRes(&res) r.core.dht.handleRes(&res)
} }

View File

@ -129,7 +129,7 @@ type msgAnnounce struct {
seq uint64 seq uint64
len uint64 len uint64
//Deg uint64 //Deg uint64
//RSeq uint64 rseq uint64
} }
func (m *msgAnnounce) encode() []byte { func (m *msgAnnounce) encode() []byte {
@ -139,7 +139,7 @@ func (m *msgAnnounce) encode() []byte {
bs = append(bs, wire_encode_uint64(m.seq)...) bs = append(bs, wire_encode_uint64(m.seq)...)
bs = append(bs, wire_encode_uint64(m.len)...) bs = append(bs, wire_encode_uint64(m.len)...)
//bs = append(bs, wire_encode_uint64(m.Deg)...) //bs = append(bs, wire_encode_uint64(m.Deg)...)
//bs = append(bs, wire_encode_uint64(m.RSeq)...) bs = append(bs, wire_encode_uint64(m.rseq)...)
return bs return bs
} }
@ -160,7 +160,8 @@ func (m *msgAnnounce) decode(bs []byte) bool {
case !wire_chop_uint64(&m.len, &bs): case !wire_chop_uint64(&m.len, &bs):
return false return false
//case !wire_chop_uint64(&m.Deg, &bs): return false //case !wire_chop_uint64(&m.Deg, &bs): return false
//case !wire_chop_uint64(&m.RSeq, &bs): return false case !wire_chop_uint64(&m.rseq, &bs):
return false
} }
m.tstamp = wire_intFromUint(tstamp) m.tstamp = wire_intFromUint(tstamp)
return true return true
@ -467,7 +468,7 @@ func (p *sessionPing) decode(bs []byte) bool {
func (r *dhtReq) encode() []byte { func (r *dhtReq) encode() []byte {
coords := wire_encode_coords(r.coords) coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupRequest) bs := wire_encode_uint64(wire_DHTLookupRequest)
bs = append(bs, r.key[:]...) //bs = append(bs, r.key[:]...)
bs = append(bs, coords...) bs = append(bs, coords...)
bs = append(bs, r.dest[:]...) bs = append(bs, r.dest[:]...)
return bs return bs
@ -480,8 +481,8 @@ func (r *dhtReq) decode(bs []byte) bool {
return false return false
case pType != wire_DHTLookupRequest: case pType != wire_DHTLookupRequest:
return false return false
case !wire_chop_slice(r.key[:], &bs): //case !wire_chop_slice(r.key[:], &bs):
return false // return false
case !wire_chop_coords(&r.coords, &bs): case !wire_chop_coords(&r.coords, &bs):
return false return false
case !wire_chop_slice(r.dest[:], &bs): case !wire_chop_slice(r.dest[:], &bs):
@ -494,7 +495,7 @@ func (r *dhtReq) decode(bs []byte) bool {
func (r *dhtRes) encode() []byte { func (r *dhtRes) encode() []byte {
coords := wire_encode_coords(r.coords) coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupResponse) bs := wire_encode_uint64(wire_DHTLookupResponse)
bs = append(bs, r.key[:]...) //bs = append(bs, r.key[:]...)
bs = append(bs, coords...) bs = append(bs, coords...)
bs = append(bs, r.dest[:]...) bs = append(bs, r.dest[:]...)
for _, info := range r.infos { for _, info := range r.infos {
@ -512,8 +513,8 @@ func (r *dhtRes) decode(bs []byte) bool {
return false return false
case pType != wire_DHTLookupResponse: case pType != wire_DHTLookupResponse:
return false return false
case !wire_chop_slice(r.key[:], &bs): //case !wire_chop_slice(r.key[:], &bs):
return false // return false
case !wire_chop_coords(&r.coords, &bs): case !wire_chop_coords(&r.coords, &bs):
return false return false
case !wire_chop_slice(r.dest[:], &bs): case !wire_chop_slice(r.dest[:], &bs):