diff --git a/misc/sim/treesim.go b/misc/sim/treesim.go index d8859261..0316b8fd 100644 --- a/misc/sim/treesim.go +++ b/misc/sim/treesim.go @@ -160,17 +160,13 @@ func testPaths(store map[[32]byte]*Node) bool { ttl := ^uint64(0) oldTTL := ttl for here := source; here != dest; { - if ttl == 0 { - fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL) - return false - } temp++ if temp > 4096 { - panic("Loop?") + fmt.Println("Loop?") + time.Sleep(time.Second) + return false } - oldTTL = ttl - nextPort, newTTL := here.core.DEBUG_switchLookup(coords, ttl) - ttl = newTTL + nextPort := here.core.DEBUG_switchLookup(coords) // First check if "here" is accepting packets from the previous node // TODO explain how this works ports := here.core.DEBUG_getPeers().DEBUG_getPorts() @@ -201,12 +197,16 @@ func testPaths(store map[[32]byte]*Node) bool { source.index, source.core.DEBUG_getLocator(), here.index, here.core.DEBUG_getLocator(), dest.index, dest.core.DEBUG_getLocator()) - here.core.DEBUG_getSwitchTable().DEBUG_dumpTable() + //here.core.DEBUG_getSwitchTable().DEBUG_dumpTable() } if here != source { // This is sufficient to check for routing loops or blackholes //break } + if here == next { + fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL) + return false + } here = next } } @@ -227,7 +227,7 @@ func stressTest(store map[[32]byte]*Node) { start := time.Now() for _, source := range store { for _, coords := range dests { - source.core.DEBUG_switchLookup(coords, ^uint64(0)) + source.core.DEBUG_switchLookup(coords) lookups++ } } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index b94d1544..3b5fcc1a 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -103,11 +103,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.switchTable.start(); err != nil { - c.log.Println("Failed to start switch table ticker") - return err - } - if err := c.admin.start(); err != nil { c.log.Println("Failed to start admin socket") return err diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index cd42560c..6b8211cb 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -36,7 +36,6 @@ func (c *Core) Init() { spub, spriv := newSigKeys() c.init(bpub, bpriv, spub, spriv) c.router.start() - c.switchTable.start() } //////////////////////////////////////////////////////////////////////////////// @@ -127,8 +126,8 @@ func (l *switchLocator) DEBUG_getCoords() []byte { return l.getCoords() } -func (c *Core) DEBUG_switchLookup(dest []byte, ttl uint64) (switchPort, uint64) { - return c.switchTable.lookup(dest, ttl) +func (c *Core) DEBUG_switchLookup(dest []byte) switchPort { + return c.switchTable.lookup(dest) } /* @@ -310,9 +309,6 @@ func (c *Core) DEBUG_init(bpub []byte, panic(err) } - if err := c.switchTable.start(); err != nil { - panic(err) - } } //////////////////////////////////////////////////////////////////////////////// @@ -453,16 +449,25 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func DEBUG_simLinkPeers(p, q *peer) { // Sets q.out() to point to p and starts p.linkLoop() - plinkIn := make(chan []byte, 1) - qlinkIn := make(chan []byte, 1) + p.linkOut, q.linkOut = make(chan []byte, 1), make(chan []byte, 1) + go func() { + for bs := range p.linkOut { + q.handlePacket(bs) + } + }() + go func() { + for bs := range q.linkOut { + p.handlePacket(bs) + } + }() p.out = func(bs []byte) { - go q.handlePacket(bs, qlinkIn) + go q.handlePacket(bs) } q.out = func(bs []byte) { - go p.handlePacket(bs, plinkIn) + go p.handlePacket(bs) } - go p.linkLoop(plinkIn) - go q.linkLoop(qlinkIn) + go p.linkLoop() + go q.linkLoop() } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index e59017a4..33933914 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -36,6 +36,7 @@ type dhtInfo struct { send time.Time // When we last sent a message recv time.Time // When we last received a message pings int // Decide when to drop + throttle uint8 // Number of seconds to wait before pinging a node to bootstrap buckets, gradually increases up to 1 minute } func (info *dhtInfo) getNodeID() *NodeID { @@ -81,7 +82,7 @@ type dht struct { func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() - t.peers = make(chan *dhtInfo, 1) + t.peers = make(chan *dhtInfo, 1024) t.reqs = make(map[boxPubKey]map[NodeID]time.Time) } @@ -116,10 +117,11 @@ func (t *dht) handleRes(res *dhtRes) { return } rinfo := dhtInfo{ - key: res.Key, - coords: res.Coords, - send: time.Now(), // Technically wrong but should be OK... - recv: time.Now(), + key: res.Key, + coords: res.Coords, + send: time.Now(), // Technically wrong but should be OK... + recv: time.Now(), + throttle: 1, } // If they're already in the table, then keep the correct send time bidx, isOK := t.getBucketIndex(rinfo.getNodeID()) @@ -130,11 +132,13 @@ func (t *dht) handleRes(res *dhtRes) { for _, oldinfo := range b.peers { if oldinfo.key == rinfo.key { rinfo.send = oldinfo.send + rinfo.throttle += oldinfo.throttle } } for _, oldinfo := range b.other { if oldinfo.key == rinfo.key { rinfo.send = oldinfo.send + rinfo.throttle += oldinfo.throttle } } // Insert into table @@ -231,6 +235,9 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { // This speeds up bootstrapping info.recv = info.recv.Add(-time.Hour) } + if isPeer || info.throttle > 60 { + info.throttle = 60 + } // First drop any existing entry from the bucket b.drop(&info.key) // Now add to the *end* of the bucket @@ -319,7 +326,6 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: dest.coords, ToKey: dest.key, FromKey: t.core.boxPub, @@ -345,7 +351,6 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: req.Coords, ToKey: req.Key, FromKey: t.core.boxPub, @@ -460,7 +465,7 @@ func (t *dht) doMaintenance() { } target := t.getTarget(t.offset) for _, info := range t.lookup(target, true) { - if time.Since(info.recv) > time.Minute { + if time.Since(info.recv) > time.Duration(info.throttle)*time.Second { t.addToMill(info, target) t.offset++ break @@ -520,9 +525,15 @@ func dht_firstCloserThanThird(first *NodeID, func (t *dht) reset() { // This is mostly so bootstrapping will reset to resend coords into the network + t.offset = 0 + t.rumorMill = nil // reset mill for _, b := range t.buckets_hidden { b.peers = b.peers[:0] + for _, info := range b.other { + // Add other nodes to the rumor mill so they'll be pinged soon + // This will hopefully tell them our coords and re-learn theirs quickly if they haven't changed + t.addToMill(info, info.getNodeID()) + } b.other = b.other[:0] } - t.offset = 0 } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index fa1a2789..51175ef3 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -11,17 +11,6 @@ package yggdrasil // It needs to ignore messages with a lower seq // Probably best to start setting seq to a timestamp in that case... -// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon -// That could happen with a peer over a high-latency link, with many msgHops -// Possible workarounds: -// 1. Pre-emptively send all hops when one is requested, or after any change -// Maybe requires changing how the throttle works and msgHops are saved -// In case some arrive out of order or are dropped -// This is relatively easy to implement, but could be wasteful -// 2. Save your old locator, sigs, etc, so you can respond to older ancs -// And finish requesting an old anc before updating to a new one -// But that may lead to other issues if not done carefully... - import "time" import "sync" import "sync/atomic" @@ -83,38 +72,23 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { } type peer struct { - // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends - // use get/update methods only! (atomic accessors as float64) - queueSize int64 + queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element - firstSeen time.Time // To track uptime for getPeers + core *Core + port switchPort box boxPubKey sig sigPubKey shared boxSharedKey - //in <-chan []byte - //out chan<- []byte - //in func([]byte) - out func([]byte) - core *Core - port switchPort - msgAnc *msgAnnounce - msgHops []*msgHop - myMsg *switchMessage - mySigs []sigInfo - // This is used to limit how often we perform expensive operations - // Specifically, processing switch messages, signing, and verifying sigs - // Resets at the start of each tick - throttle uint8 - // Called when a peer is removed, to close the underlying connection, or via admin api - close func() - // To allow the peer to call close if idle for too long - lastAnc time.Time + firstSeen time.Time // To track uptime for getPeers + linkOut (chan []byte) // used for protocol traffic (to bypass queues) + doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo *dhtInfo // used to keep the DHT working + out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -const peer_Throttle = 1 - func (p *peer) getQueueSize() int64 { return atomic.LoadInt64(&p.queueSize) } @@ -123,14 +97,13 @@ func (p *peer) updateQueueSize(delta int64) { atomic.AddInt64(&p.queueSize, delta) } -func (ps *peers) newPeer(box *boxPubKey, - sig *sigPubKey) *peer { +func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer { now := time.Now() p := peer{box: *box, sig: *sig, shared: *getSharedKey(&ps.core.boxPriv, box), - lastAnc: now, firstSeen: now, + doSend: make(chan struct{}, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -151,10 +124,12 @@ func (ps *peers) newPeer(box *boxPubKey, } func (ps *peers) removePeer(port switchPort) { - // TODO? store linkIn in the peer struct, close it here? (once) if port == 0 { return } // Can't remove self peer + ps.core.router.doAdmin(func() { + ps.core.switchTable.removePeer(port) + }) ps.mutex.Lock() oldPorts := ps.getPorts() p, isIn := oldPorts[port] @@ -165,56 +140,47 @@ func (ps *peers) removePeer(port switchPort) { delete(newPorts, port) ps.putPorts(newPorts) ps.mutex.Unlock() - if isIn && p.close != nil { - p.close() + if isIn { + if p.close != nil { + p.close() + } + close(p.doSend) } } -func (p *peer) linkLoop(in <-chan []byte) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - var counter uint8 - var lastRSeq uint64 - for { +func (ps *peers) sendSwitchMsgs() { + ports := ps.getPorts() + for _, p := range ports { + if p.port == 0 { + continue + } select { - case packet, ok := <-in: - if !ok { - return - } - p.handleLinkTraffic(packet) - case <-ticker.C: - if time.Since(p.lastAnc) > 16*time.Second && p.close != nil { - // Seems to have timed out, try to trigger a close - p.close() - } - p.throttle = 0 - if p.port == 0 { - continue - } // Don't send announces on selfInterface - p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port) - var update bool - switch { - case p.msgAnc == nil: - update = true - case lastRSeq != p.msgAnc.Seq: - update = true - case p.msgAnc.Rseq != p.myMsg.seq: - update = true - case counter%4 == 0: - update = true - } - if update { - if p.msgAnc != nil { - lastRSeq = p.msgAnc.Seq - } - p.sendSwitchAnnounce() - } - counter = (counter + 1) % 4 + case p.doSend <- struct{}{}: + default: } } } -func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { +func (p *peer) linkLoop() { + go func() { p.doSend <- struct{}{} }() + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { + select { + case _, ok := <-p.doSend: + if !ok { + return + } + p.sendSwitchMsg() + case _ = <-tick.C: + if p.dinfo != nil { + p.core.dht.peers <- p.dinfo + } + } + } +} + +func (p *peer) handlePacket(packet []byte) { // TODO See comment in sendPacket about atomics technically being done wrong atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) pType, pTypeLen := wire_decode_uint64(packet) @@ -227,31 +193,22 @@ func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) { case wire_ProtocolTraffic: p.handleTraffic(packet, pTypeLen) case wire_LinkProtocolTraffic: - { - select { - case linkIn <- packet: - default: - } - } - default: /*panic(pType) ;*/ + p.handleLinkTraffic(packet) + default: return } } func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.msgAnc == nil { - // Drop traffic until the peer manages to send us at least one anc + if p.port != 0 && p.dinfo == nil { + // Drop traffic until the peer manages to send us at least one good switchMsg return } - ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:]) - ttlBegin := pTypeLen - ttlEnd := pTypeLen + ttlLen - coords, coordLen := wire_decode_coords(packet[ttlEnd:]) - coordEnd := ttlEnd + coordLen - if coordEnd == len(packet) { + coords, coordLen := wire_decode_coords(packet[pTypeLen:]) + if coordLen >= len(packet) { return } // No payload - toPort, newTTL := p.core.switchTable.lookup(coords, ttl) + toPort := p.core.switchTable.lookup(coords) if toPort == p.port { return } @@ -259,13 +216,6 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if to == nil { return } - // This mutates the packet in-place if the length of the TTL changes! - ttlSlice := wire_encode_uint64(newTTL) - newTTLLen := len(ttlSlice) - shift := ttlLen - newTTLLen - copy(packet[shift:], packet[:pTypeLen]) - copy(packet[ttlBegin+shift:], ttlSlice) - packet = packet[shift:] to.sendPacket(packet) } @@ -284,7 +234,7 @@ func (p *peer) sendLinkPacket(packet []byte) { Payload: bs, } packet = linkPacket.encode() - p.sendPacket(packet) + p.linkOut <- packet } func (p *peer) handleLinkTraffic(bs []byte) { @@ -301,219 +251,70 @@ func (p *peer) handleLinkTraffic(bs []byte) { return } switch pType { - case wire_SwitchAnnounce: - p.handleSwitchAnnounce(payload) - case wire_SwitchHopRequest: - p.handleSwitchHopRequest(payload) - case wire_SwitchHop: - p.handleSwitchHop(payload) + case wire_SwitchMsg: + p.handleSwitchMsg(payload) + default: // TODO?... } } -func (p *peer) handleSwitchAnnounce(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchAnnounce") - anc := msgAnnounce{} - //err := wire_decode_struct(packet, &anc) - //if err != nil { return } - if !anc.decode(packet) { +func (p *peer) sendSwitchMsg() { + msg := p.core.switchTable.getMsg() + if msg == nil { return } - //if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil } - if p.msgAnc == nil || - anc.Root != p.msgAnc.Root || - anc.Tstamp != p.msgAnc.Tstamp || - anc.Seq != p.msgAnc.Seq { - p.msgHops = nil - } - p.msgAnc = &anc - p.processSwitchMessage() - p.lastAnc = time.Now() -} - -func (p *peer) requestHop(hop uint64) { - //p.core.log.Println("DEBUG requestHop") - req := msgHopReq{} - req.Root = p.msgAnc.Root - req.Tstamp = p.msgAnc.Tstamp - req.Seq = p.msgAnc.Seq - req.Hop = hop - packet := req.encode() + bs := getBytesForSig(&p.sig, msg) + msg.Hops = append(msg.Hops, switchMsgHop{ + Port: p.port, + Next: p.sig, + Sig: *sign(&p.core.sigPriv, bs), + }) + packet := msg.encode() + //p.core.log.Println("Encoded msg:", msg, "; bytes:", packet) + //fmt.Println("Encoded msg:", msg, "; bytes:", packet) p.sendLinkPacket(packet) } -func (p *peer) handleSwitchHopRequest(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHopRequest") - if p.throttle > peer_Throttle { +func (p *peer) handleSwitchMsg(packet []byte) { + var msg switchMsg + if !msg.decode(packet) { return } - if p.myMsg == nil { - return + //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) + if len(msg.Hops) < 1 { + p.core.peers.removePeer(p.port) } - req := msgHopReq{} - if !req.decode(packet) { - return - } - if req.Root != p.myMsg.locator.root { - return - } - if req.Tstamp != p.myMsg.locator.tstamp { - return - } - if req.Seq != p.myMsg.seq { - return - } - if uint64(len(p.myMsg.locator.coords)) <= req.Hop { - return - } - res := msgHop{} - res.Root = p.myMsg.locator.root - res.Tstamp = p.myMsg.locator.tstamp - res.Seq = p.myMsg.seq - res.Hop = req.Hop - res.Port = p.myMsg.locator.coords[res.Hop] - sinfo := p.getSig(res.Hop) - //p.core.log.Println("DEBUG sig:", sinfo) - res.Next = sinfo.next - res.Sig = sinfo.sig - packet = res.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) handleSwitchHop(packet []byte) { - //p.core.log.Println("DEBUG: handleSwitchHop") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - res := msgHop{} - if !res.decode(packet) { - return - } - if res.Root != p.msgAnc.Root { - return - } - if res.Tstamp != p.msgAnc.Tstamp { - return - } - if res.Seq != p.msgAnc.Seq { - return - } - if res.Hop != uint64(len(p.msgHops)) { - return - } // always process in order - loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)} - loc.root = res.Root - loc.tstamp = res.Tstamp - for _, hop := range p.msgHops { + var loc switchLocator + prevKey := msg.Root + for idx, hop := range msg.Hops { + // Check signatures and collect coords for dht + sigMsg := msg + sigMsg.Hops = msg.Hops[:idx] loc.coords = append(loc.coords, hop.Port) - } - loc.coords = append(loc.coords, res.Port) - thisHopKey := &res.Root - if res.Hop != 0 { - thisHopKey = &p.msgHops[res.Hop-1].Next - } - bs := getBytesForSig(&res.Next, &loc) - if p.core.sigs.check(thisHopKey, &res.Sig, bs) { - p.msgHops = append(p.msgHops, &res) - p.processSwitchMessage() - } else { - p.throttle++ - } -} - -func (p *peer) processSwitchMessage() { - //p.core.log.Println("DEBUG: processSwitchMessage") - if p.throttle > peer_Throttle { - return - } - if p.msgAnc == nil { - return - } - if uint64(len(p.msgHops)) < p.msgAnc.Len { - p.requestHop(uint64(len(p.msgHops))) - return - } - p.throttle++ - if p.msgAnc.Len != uint64(len(p.msgHops)) { - return - } - msg := switchMessage{} - coords := make([]switchPort, 0, len(p.msgHops)) - sigs := make([]sigInfo, 0, len(p.msgHops)) - for idx, hop := range p.msgHops { - // Consistency checks, should be redundant (already checked these...) - if hop.Root != p.msgAnc.Root { - return + bs := getBytesForSig(&hop.Next, &sigMsg) + if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { + p.core.peers.removePeer(p.port) } - if hop.Tstamp != p.msgAnc.Tstamp { - return - } - if hop.Seq != p.msgAnc.Seq { - return - } - if hop.Hop != uint64(idx) { - return - } - coords = append(coords, hop.Port) - sigs = append(sigs, sigInfo{next: hop.Next, sig: hop.Sig}) + prevKey = hop.Next } - msg.from = p.sig - msg.locator.root = p.msgAnc.Root - msg.locator.tstamp = p.msgAnc.Tstamp - msg.locator.coords = coords - msg.seq = p.msgAnc.Seq - //msg.RSeq = p.msgAnc.RSeq - //msg.Degree = p.msgAnc.Deg - p.core.switchTable.handleMessage(&msg, p.port, sigs) - if len(coords) == 0 { - return - } - // Reuse locator, set the coords to the peer's coords, to use in dht - msg.locator.coords = coords[:len(coords)-1] + p.core.switchTable.handleMsg(&msg, p.port) // Pass a mesage to the dht informing it that this peer (still) exists + loc.coords = loc.coords[:len(loc.coords)-1] dinfo := dhtInfo{ key: p.box, - coords: msg.locator.getCoords(), + coords: loc.getCoords(), } p.core.dht.peers <- &dinfo + p.dinfo = &dinfo } -func (p *peer) sendSwitchAnnounce() { - anc := msgAnnounce{} - anc.Root = p.myMsg.locator.root - anc.Tstamp = p.myMsg.locator.tstamp - anc.Seq = p.myMsg.seq - anc.Len = uint64(len(p.myMsg.locator.coords)) - //anc.Deg = p.myMsg.Degree - if p.msgAnc != nil { - anc.Rseq = p.msgAnc.Seq +func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte { + var loc switchLocator + for _, hop := range msg.Hops { + loc.coords = append(loc.coords, hop.Port) } - packet := anc.encode() - p.sendLinkPacket(packet) -} - -func (p *peer) getSig(hop uint64) sigInfo { - //p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop) - if hop < uint64(len(p.mySigs)) { - return p.mySigs[hop] - } - bs := getBytesForSig(&p.sig, &p.myMsg.locator) - sig := sigInfo{} - sig.next = p.sig - sig.sig = *sign(&p.core.sigPriv, bs) - p.mySigs = append(p.mySigs, sig) - //p.core.log.Println("DEBUG sig bs:", bs) - return sig -} - -func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte { - //bs, err := wire_encode_locator(loc) - //if err != nil { panic(err) } bs := append([]byte(nil), next[:]...) - bs = append(bs, wire_encode_locator(loc)...) - //bs := wire_encode_locator(loc) - //bs = append(next[:], bs...) + bs = append(bs, msg.Root[:]...) + bs = append(bs, wire_encode_uint64(wire_intToUint(msg.TStamp))...) + bs = append(bs, wire_encode_coords(loc.getCoords())...) return bs } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b848a792..ddb48486 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -55,7 +55,7 @@ func (r *router) init(core *Core) { } } r.in = in - r.out = func(packet []byte) { p.handlePacket(packet, nil) } // The caller is responsible for go-ing if it needs to not block + r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block recv := make(chan []byte, 32) send := make(chan []byte, 32) r.recv = recv @@ -91,7 +91,9 @@ func (r *router) mainLoop() { case <-ticker.C: { // Any periodic maintenance stuff goes here + r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() + //r.core.peers.sendSwitchMsgs() // FIXME debugging util_getBytes() // To slowly drain things } case f := <-r.admin: diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 4b9cd03a..d9dc76e3 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -107,7 +107,10 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { // Send to the next search target var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + var oldPings int + oldPings, next.pings = next.pings, 0 s.core.dht.ping(next, &sinfo.dest) + next.pings = oldPings // Don't evict a node for searching with it too much sinfo.visited[*next.getNodeID()] = true } } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 8bbbc33c..33111424 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -255,7 +255,6 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) payload, nonce := boxSeal(shared, bs, nil) p := wire_protoTrafficPacket{ - TTL: ^uint64(0), Coords: sinfo.coords, ToKey: sinfo.theirPermPub, FromKey: ss.core.boxPub, @@ -383,7 +382,6 @@ func (sinfo *sessionInfo) doSend(bs []byte) { payload, nonce := boxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce) defer util_putBytes(payload) p := wire_trafficPacket{ - TTL: ^uint64(0), Coords: sinfo.coords, Handle: sinfo.theirHandle, Nonce: *nonce, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 8731e504..4db4c67f 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -9,7 +9,7 @@ package yggdrasil // TODO document/comment everything in a lot more detail // TODO? use a pre-computed lookup table (python version had this) -// A little annoying to do with constant changes from bandwidth estimates +// A little annoying to do with constant changes from backpressure import "time" import "sort" @@ -113,17 +113,10 @@ type peerInfo struct { key sigPubKey // ID of this peer locator switchLocator // Should be able to respond with signatures upon request degree uint64 // Self-reported degree - coords []switchPort // Coords of this peer (taken from coords of the sent locator) time time.Time // Time this node was last seen firstSeen time.Time port switchPort // Interface number of this peer - seq uint64 // Seq number we last saw this peer advertise -} - -type switchMessage struct { - from sigPubKey // key of the sender - locator switchLocator // Locator advertised for the receiver, not the sender's loc! - seq uint64 + msg switchMsg // The wire switchMsg used } type switchPort uint64 @@ -143,7 +136,7 @@ type switchData struct { locator switchLocator seq uint64 // Sequence number, reported to peers, so they know about changes peers map[switchPort]peerInfo - sigs []sigInfo + msg *switchMsg } type switchTable struct { @@ -170,31 +163,17 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.drop = make(map[sigPubKey]int64) } -func (t *switchTable) start() error { - doTicker := func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - <-ticker.C - t.Tick() - } - } - go doTicker() - return nil -} - func (t *switchTable) getLocator() switchLocator { t.mutex.RLock() defer t.mutex.RUnlock() return t.data.locator.clone() } -func (t *switchTable) Tick() { +func (t *switchTable) doMaintenance() { // Periodic maintenance work to keep things internally consistent t.mutex.Lock() // Write lock defer t.mutex.Unlock() // Release lock when we're done t.cleanRoot() - t.cleanPeers() t.cleanDropped() } @@ -236,22 +215,16 @@ func (t *switchTable) cleanRoot() { } } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} - t.data.sigs = nil + t.core.peers.sendSwitchMsgs() } } -func (t *switchTable) cleanPeers() { - now := time.Now() - changed := false - for idx, info := range t.data.peers { - if info.port != switchPort(0) && now.Sub(info.time) > 6*time.Second /*switch_timeout*/ { - //fmt.Println("peer timed out", t.key, info.locator) - delete(t.data.peers, idx) - changed = true - } - } - if changed { - t.updater.Store(&sync.Once{}) +func (t *switchTable) removePeer(port switchPort) { + delete(t.data.peers, port) + t.updater.Store(&sync.Once{}) + // TODO if parent, find a new peer to use as parent instead + for _, info := range t.data.peers { + t.unlockedHandleMsg(&info.msg, info.port) } } @@ -264,33 +237,64 @@ func (t *switchTable) cleanDropped() { } } -func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) { - t.mutex.RLock() - defer t.mutex.RUnlock() - msg := switchMessage{from: t.key, locator: t.data.locator.clone()} - msg.locator.coords = append(msg.locator.coords, port) - msg.seq = t.data.seq - return &msg, t.data.sigs +type switchMsg struct { + Root sigPubKey + TStamp int64 + Hops []switchMsgHop } -func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) { +type switchMsgHop struct { + Port switchPort + Next sigPubKey + Sig sigBytes +} + +func (t *switchTable) getMsg() *switchMsg { + t.mutex.RLock() + defer t.mutex.RUnlock() + if t.parent == 0 { + return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} + } else if parent, isIn := t.data.peers[t.parent]; isIn { + msg := parent.msg + msg.Hops = append([]switchMsgHop(nil), msg.Hops...) + return &msg + } else { + return nil + } +} + +func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { t.mutex.Lock() defer t.mutex.Unlock() + t.unlockedHandleMsg(msg, fromPort) +} + +func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { + // TODO directly use a switchMsg instead of switchMessage + sigs now := time.Now() - if len(msg.locator.coords) == 0 { - return - } // Should always have >=1 links + // Set up the sender peerInfo + var sender peerInfo + sender.locator.root = msg.Root + sender.locator.tstamp = msg.TStamp + prevKey := msg.Root + for _, hop := range msg.Hops { + // Build locator and signatures + var sig sigInfo + sig.next = hop.Next + sig.sig = hop.Sig + sender.locator.coords = append(sender.locator.coords, hop.Port) + sender.key = prevKey + prevKey = hop.Next + } + sender.msg = *msg oldSender, isIn := t.data.peers[fromPort] if !isIn { oldSender.firstSeen = now } - sender := peerInfo{key: msg.from, - locator: msg.locator, - coords: msg.locator.coords[:len(msg.locator.coords)-1], - time: now, - firstSeen: oldSender.firstSeen, - port: fromPort, - seq: msg.seq} + sender.firstSeen = oldSender.firstSeen + sender.port = fromPort + sender.time = now + // Decide what to do equiv := func(x *switchLocator, y *switchLocator) bool { if x.root != y.root { return false @@ -306,20 +310,21 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig return true } doUpdate := false - if !equiv(&msg.locator, &oldSender.locator) { + if !equiv(&sender.locator, &oldSender.locator) { doUpdate = true + //sender.firstSeen = now // TODO? uncomment to prevent flapping? } t.data.peers[fromPort] = sender updateRoot := false oldParent, isIn := t.data.peers[t.parent] noParent := !isIn noLoop := func() bool { - for idx := 0; idx < len(sigs)-1; idx++ { - if sigs[idx].next == t.core.sigPub { + for idx := 0; idx < len(msg.Hops)-1; idx++ { + if msg.Hops[idx].Next == t.core.sigPub { return false } } - if msg.locator.root == t.core.sigPub { + if sender.locator.root == t.core.sigPub { return false } return true @@ -328,30 +333,30 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig pTime := oldParent.time.Sub(oldParent.firstSeen) + switch_timeout // Really want to compare sLen/sTime and pLen/pTime // Cross multiplied to avoid divide-by-zero - cost := len(msg.locator.coords) * int(pTime.Seconds()) + cost := len(sender.locator.coords) * int(pTime.Seconds()) pCost := len(t.data.locator.coords) * int(sTime.Seconds()) - dropTstamp, isIn := t.drop[msg.locator.root] + dropTstamp, isIn := t.drop[sender.locator.root] // Here be dragons switch { case !noLoop: // do nothing - case isIn && dropTstamp >= msg.locator.tstamp: // do nothing - case firstIsBetter(&msg.locator.root, &t.data.locator.root): + case isIn && dropTstamp >= sender.locator.tstamp: // do nothing + case firstIsBetter(&sender.locator.root, &t.data.locator.root): updateRoot = true - case t.data.locator.root != msg.locator.root: // do nothing - case t.data.locator.tstamp > msg.locator.tstamp: // do nothing + case t.data.locator.root != sender.locator.root: // do nothing + case t.data.locator.tstamp > sender.locator.tstamp: // do nothing case noParent: updateRoot = true case cost < pCost: updateRoot = true case sender.port != t.parent: // do nothing - case !equiv(&msg.locator, &t.data.locator): + case !equiv(&sender.locator, &t.data.locator): updateRoot = true case now.Sub(t.time) < switch_throttle: // do nothing - case msg.locator.tstamp > t.data.locator.tstamp: + case sender.locator.tstamp > t.data.locator.tstamp: updateRoot = true } if updateRoot { - if !equiv(&msg.locator, &t.data.locator) { + if !equiv(&sender.locator, &t.data.locator) { doUpdate = true t.data.seq++ select { @@ -361,13 +366,13 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig //t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords) //fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) } - if t.data.locator.tstamp != msg.locator.tstamp { + if t.data.locator.tstamp != sender.locator.tstamp { t.time = now } - t.data.locator = msg.locator + t.data.locator = sender.locator t.parent = sender.port - t.data.sigs = sigs //t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords) + t.core.peers.sendSwitchMsgs() } if doUpdate { t.updater.Store(&sync.Once{}) @@ -408,19 +413,19 @@ func (t *switchTable) updateTable() { t.table.Store(newTable) } -func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { +func (t *switchTable) lookup(dest []byte) switchPort { t.updater.Load().(*sync.Once).Do(t.updateTable) table := t.table.Load().(lookupTable) - myDist := table.self.dist(dest) //getDist(table.self.coords) - if !(uint64(myDist) < ttl) { - return 0, 0 + myDist := table.self.dist(dest) + if myDist == 0 { + return 0 } // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow ports := t.core.peers.getPorts() var best switchPort bestCost := int64(^uint64(0) >> 1) for _, info := range table.elems { - dist := info.locator.dist(dest) //getDist(info.locator.coords) + dist := info.locator.dist(dest) if !(dist < myDist) { continue } @@ -434,8 +439,8 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { bestCost = cost } } - //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) - return best, uint64(myDist) + //t.core.log.Println("DEBUG: sending to", best, "cost", bestCost) + return best } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index acde0344..e7b682c9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -15,7 +15,6 @@ import "time" import "errors" import "sync" import "fmt" -import "bufio" import "golang.org/x/net/proxy" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense @@ -208,40 +207,61 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { }() // Note that multiple connections to the same node are allowed // E.g. over different interfaces - linkIn := make(chan []byte, 1) - p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out) + p := iface.core.peers.newPeer(&info.box, &info.sig) + p.linkOut = make(chan []byte, 1) in := func(bs []byte) { - p.handlePacket(bs, linkIn) + p.handlePacket(bs) } out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) - buf := bufio.NewWriterSize(sock, tcp_msgSize) - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf.Write(tcp_msg[:]) - buf.Write(msgLen) - buf.Write(msg) - p.updateQueueSize(-1) - util_putBytes(msg) - } go func() { + var shadow uint64 var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] - p.updateQueueSize(-1) + shadow++ } } - for msg := range out { - put(msg) + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + util_putBytes(msg) + } + timerInterval := 4 * time.Second + timer := time.NewTimer(timerInterval) + defer timer.Stop() + for { + for ; shadow > 0; shadow-- { + p.updateQueueSize(-1) + } + timer.Stop() + select { + case <-timer.C: + default: + } + timer.Reset(timerInterval) + select { + case _ = <-timer.C: + //iface.core.log.Println("DEBUG: sending keep-alive:", sock.RemoteAddr().String()) + send(nil) // TCP keep-alive traffic + case msg := <-p.linkOut: + send(msg) + case msg, ok := <-out: + if !ok { + return + } + put(msg) + } for len(stack) > 0 { - // Keep trying to fill the stack (LIFO order) while sending select { + case msg := <-p.linkOut: + send(msg) case msg, ok := <-out: if !ok { - buf.Flush() return } put(msg) @@ -249,9 +269,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { msg := stack[len(stack)-1] stack = stack[:len(stack)-1] send(msg) + p.updateQueueSize(-1) } } - buf.Flush() } }() p.out = func(msg []byte) { @@ -265,11 +285,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { } p.close = func() { sock.Close() } setNoDelay(sock, true) - go p.linkLoop(linkIn) + go p.linkLoop() defer func() { // Put all of our cleanup here... p.core.peers.removePeer(p.port) - close(linkIn) }() them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) themNodeID := getNodeID(&info.box) diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 6b592e5b..3b43143b 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -12,9 +12,7 @@ const ( wire_Traffic = iota // data being routed somewhere, handle for crypto wire_ProtocolTraffic // protocol traffic, pub keys for crypto wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto - wire_SwitchAnnounce // inside protocol traffic header - wire_SwitchHopRequest // inside protocol traffic header - wire_SwitchHop // inside protocol traffic header + wire_SwitchMsg // inside link protocol traffic header wire_SessionPing // inside protocol traffic header wire_SessionPong // inside protocol traffic header wire_DHTLookupRequest // inside protocol traffic header @@ -117,144 +115,48 @@ func wire_decode_coords(packet []byte) ([]byte, int) { //////////////////////////////////////////////////////////////////////////////// -// Announces that we can send parts of a Message with a particular seq -type msgAnnounce struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Len uint64 - //Deg uint64 - Rseq uint64 -} - -func (m *msgAnnounce) encode() []byte { - bs := wire_encode_uint64(wire_SwitchAnnounce) +func (m *switchMsg) encode() []byte { + bs := wire_encode_uint64(wire_SwitchMsg) bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Len)...) - bs = append(bs, wire_encode_uint64(m.Rseq)...) + bs = append(bs, wire_encode_uint64(wire_intToUint(m.TStamp))...) + for _, hop := range m.Hops { + bs = append(bs, wire_encode_uint64(uint64(hop.Port))...) + bs = append(bs, hop.Next[:]...) + bs = append(bs, hop.Sig[:]...) + } return bs } -func (m *msgAnnounce) decode(bs []byte) bool { +func (m *switchMsg) decode(bs []byte) bool { var pType uint64 var tstamp uint64 switch { case !wire_chop_uint64(&pType, &bs): return false - case pType != wire_SwitchAnnounce: + case pType != wire_SwitchMsg: return false case !wire_chop_slice(m.Root[:], &bs): return false case !wire_chop_uint64(&tstamp, &bs): return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Len, &bs): - return false - case !wire_chop_uint64(&m.Rseq, &bs): - return false } - m.Tstamp = wire_intFromUint(tstamp) + m.TStamp = wire_intFromUint(tstamp) + for len(bs) > 0 { + var hop switchMsgHop + switch { + case !wire_chop_uint64((*uint64)(&hop.Port), &bs): + return false + case !wire_chop_slice(hop.Next[:], &bs): + return false + case !wire_chop_slice(hop.Sig[:], &bs): + return false + } + m.Hops = append(m.Hops, hop) + } return true } -type msgHopReq struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 -} - -func (m *msgHopReq) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHopRequest) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - return bs -} - -func (m *msgHopReq) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHopRequest: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -type msgHop struct { - Root sigPubKey - Tstamp int64 - Seq uint64 - Hop uint64 - Port switchPort - Next sigPubKey - Sig sigBytes -} - -func (m *msgHop) encode() []byte { - bs := wire_encode_uint64(wire_SwitchHop) - bs = append(bs, m.Root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...) - bs = append(bs, wire_encode_uint64(m.Seq)...) - bs = append(bs, wire_encode_uint64(m.Hop)...) - bs = append(bs, wire_encode_uint64(uint64(m.Port))...) - bs = append(bs, m.Next[:]...) - bs = append(bs, m.Sig[:]...) - return bs -} - -func (m *msgHop) decode(bs []byte) bool { - var pType uint64 - var tstamp uint64 - switch { - case !wire_chop_uint64(&pType, &bs): - return false - case pType != wire_SwitchHop: - return false - case !wire_chop_slice(m.Root[:], &bs): - return false - case !wire_chop_uint64(&tstamp, &bs): - return false - case !wire_chop_uint64(&m.Seq, &bs): - return false - case !wire_chop_uint64(&m.Hop, &bs): - return false - case !wire_chop_uint64((*uint64)(&m.Port), &bs): - return false - case !wire_chop_slice(m.Next[:], &bs): - return false - case !wire_chop_slice(m.Sig[:], &bs): - return false - } - m.Tstamp = wire_intFromUint(tstamp) - return true -} - -// Format used to check signatures only, so no need to also support decoding -func wire_encode_locator(loc *switchLocator) []byte { - coords := wire_encode_coords(loc.getCoords()) - var bs []byte - bs = append(bs, loc.root[:]...) - bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...) - bs = append(bs, coords...) - return bs -} +//////////////////////////////////////////////////////////////////////////////// func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { if len(*fromSlice) < len(toSlice) { @@ -290,7 +192,6 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { // Wire traffic packets type wire_trafficPacket struct { - TTL uint64 Coords []byte Handle handle Nonce boxNonce @@ -301,7 +202,6 @@ type wire_trafficPacket struct { func (p *wire_trafficPacket) encode() []byte { bs := util_getBytes() bs = wire_put_uint64(wire_Traffic, bs) - bs = wire_put_uint64(p.TTL, bs) bs = wire_put_coords(p.Coords, bs) bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) @@ -317,8 +217,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case pType != wire_Traffic: return false - case !wire_chop_uint64(&p.TTL, &bs): - return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.Handle[:], &bs): @@ -331,7 +229,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { } type wire_protoTrafficPacket struct { - TTL uint64 Coords []byte ToKey boxPubKey FromKey boxPubKey @@ -342,7 +239,6 @@ type wire_protoTrafficPacket struct { func (p *wire_protoTrafficPacket) encode() []byte { coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) - bs = append(bs, wire_encode_uint64(p.TTL)...) bs = append(bs, coords...) bs = append(bs, p.ToKey[:]...) bs = append(bs, p.FromKey[:]...) @@ -358,8 +254,6 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_ProtocolTraffic: return false - case !wire_chop_uint64(&p.TTL, &bs): - return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.ToKey[:], &bs):