diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index bbc31546..4466ffd8 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -28,7 +28,8 @@ type dhtInfo struct { recv time.Time // When we last received a message pings int // Time out if at least 3 consecutive maintenance pings drop throttle time.Duration - dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node + path []byte // source route the destination, learned from response rpath + dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -89,12 +90,17 @@ func (t *dht) reconfigure() { // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { + t.reqs = make(map[dhtReqKey]time.Time) for _, info := range t.table { if t.isImportant(info) { - t.ping(info, nil) + t.ping(info, nil) // This will source route if a path is already known + if info.path != nil { + // In case the source route died, but the dest coords are still OK... + info.path = nil + t.ping(info, nil) + } } } - t.reqs = make(map[dhtReqKey]time.Time) t.table = make(map[crypto.NodeID]*dhtInfo) t.imp = nil } @@ -117,6 +123,9 @@ func (t *dht) lookup(nodeID *crypto.NodeID, everything bool) []*dhtInfo { results = newRes results = results[:dht_lookup_size] } + for _, info := range results { + info.dirty = true + } return results } @@ -185,7 +194,7 @@ func dht_ordered(first, second, third *crypto.NodeID) bool { // Reads a request, performs a lookup, and responds. // Update info about the node that sent the request. -func (t *dht) handleReq(req *dhtReq) { +func (t *dht) handleReq(req *dhtReq, rpath []byte) { // Send them what they asked for res := dhtRes{ Key: t.router.core.boxPub, @@ -193,7 +202,7 @@ func (t *dht) handleReq(req *dhtReq) { Dest: req.Dest, Infos: t.lookup(&req.Dest, false), } - t.sendRes(&res, req) + t.sendRes(&res, req, rpath) // Also add them to our DHT info := dhtInfo{ key: req.Key, @@ -213,13 +222,15 @@ func (t *dht) handleReq(req *dhtReq) { } // Sends a lookup response to the specified node. -func (t *dht) sendRes(res *dhtRes, req *dhtReq) { +func (t *dht) sendRes(res *dhtRes, req *dhtReq, rpath []byte) { // Send a reply for a dhtReq bs := res.encode() shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &req.Key) payload, nonce := crypto.BoxSeal(shared, bs, nil) + path := append([]byte{0}, switch_reverseCoordBytes(rpath)...) p := wire_protoTrafficPacket{ - Coords: req.Coords, + Offset: 1, + Coords: path, ToKey: req.Key, FromKey: t.router.core.boxPub, Nonce: *nonce, @@ -242,7 +253,7 @@ func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses -func (t *dht) handleRes(res *dhtRes) { +func (t *dht) handleRes(res *dhtRes, rpath []byte) { rq := dhtReqKey{res.Key, res.Dest} if callbacks, isIn := t.callbacks[rq]; isIn { for _, callback := range callbacks { @@ -258,6 +269,7 @@ func (t *dht) handleRes(res *dhtRes) { rinfo := dhtInfo{ key: res.Key, coords: res.Coords, + path: switch_reverseCoordBytes(rpath), } if t.isImportant(&rinfo) { t.insert(&rinfo) @@ -289,6 +301,10 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { Nonce: *nonce, Payload: payload, } + if dest.path != nil { + p.Coords = append([]byte{0}, dest.path...) + p.Offset += 1 + } packet := p.encode() t.router.out(packet) rq := dhtReqKey{dest.key, req.Dest} diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 6273e6c8..61f3cfd2 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -54,7 +54,8 @@ func (q *packetQueue) drop() bool { } func (q *packetQueue) push(packet []byte) { - id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + _, coords := wire_getTrafficOffsetAndCoords(packet) + id := pqStreamID(coords) // just coords for now info := pqPacketInfo{packet: packet, time: time.Now()} for idx := range q.streams { if q.streams[idx].id == id { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f04ab280..74e1023f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -236,13 +236,6 @@ func (p *peer) _handlePacket(packet []byte) { } } -// Get the coords of a packet without decoding -func peer_getPacketCoords(packet []byte) []byte { - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - return coords -} - // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { @@ -250,8 +243,26 @@ func (p *peer) _handleTraffic(packet []byte) { // Drop traffic if the peer isn't in the switch return } - coords := peer_getPacketCoords(packet) - next := p.table.lookup(coords) + obs, coords := wire_getTrafficOffsetAndCoords(packet) + offset, _ := wire_decode_uint64(obs) + ports := switch_getPorts(coords) + if offset == 0 { + offset = p.table.getOffset(ports) + } + var next switchPort + if offset == 0 { + // Greedy routing, find the best next hop + next = p.table.lookup(ports) + } else { + // Source routing, read next hop from coords and update offset/obs + if int(offset) < len(ports) { + next = ports[offset] + offset += 1 + // FIXME this breaks if offset is > 127, it's just for testing + wire_put_uint64(offset, obs[:0]) + } + } + packet = wire_put_uint64(uint64(p.port), packet) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketFrom(p, packet) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d387346e..db81068a 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -196,54 +196,54 @@ func (r *router) _handleProto(packet []byte) { } switch bsType { case wire_SessionPing: - r._handlePing(bs, &p.FromKey) + r._handlePing(bs, &p.FromKey, p.RPath) case wire_SessionPong: - r._handlePong(bs, &p.FromKey) + r._handlePong(bs, &p.FromKey, p.RPath) case wire_NodeInfoRequest: fallthrough case wire_NodeInfoResponse: r._handleNodeInfo(bs, &p.FromKey) case wire_DHTLookupRequest: - r._handleDHTReq(bs, &p.FromKey) + r._handleDHTReq(bs, &p.FromKey, p.RPath) case wire_DHTLookupResponse: - r._handleDHTRes(bs, &p.FromKey) + r._handleDHTRes(bs, &p.FromKey, p.RPath) default: } } // Decodes session pings from wire format and passes them to sessions.handlePing where they either create or update a session. -func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { ping := sessionPing{} if !ping.decode(bs) { return } ping.SendPermPub = *fromKey - r.sessions.handlePing(&ping) + r.sessions.handlePing(&ping, rpath) } // Handles session pongs (which are really pings with an extra flag to prevent acknowledgement). -func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey) { - r._handlePing(bs, fromKey) +func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { + r._handlePing(bs, fromKey, rpath) } // Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response. -func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { req := dhtReq{} if !req.decode(bs) { return } req.Key = *fromKey - r.dht.handleReq(&req) + r.dht.handleReq(&req, rpath) } // Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). -func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { res := dhtRes{} if !res.decode(bs) { return } res.Key = *fromKey - r.dht.handleRes(&res) + r.dht.handleRes(&res, rpath) } // Decodes nodeinfo request diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 360f2a1b..8fd7cd5b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -50,6 +50,7 @@ type sessionInfo struct { conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers table *lookupTable // table.self is a locator where we get our coords + path []byte // Path from self to destination } // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -65,41 +66,48 @@ type sessionPing struct { // Updates session info in response to a ping, after checking that the ping is OK. // Returns true if the session was updated, or false otherwise. -func (s *sessionInfo) _update(p *sessionPing) bool { - if !(p.Tstamp > s.tstamp) { +func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool { + if !(p.Tstamp > sinfo.tstamp) { // To protect against replay attacks return false } - if p.SendPermPub != s.theirPermPub { + if p.SendPermPub != sinfo.theirPermPub { // Should only happen if two sessions got the same handle // That shouldn't be allowed anyway, but if it happens then let one time out return false } - if p.SendSesPub != s.theirSesPub { - s.theirSesPub = p.SendSesPub - s.theirHandle = p.Handle - s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub) - s.theirNonce = crypto.BoxNonce{} + if p.SendSesPub != sinfo.theirSesPub { + sinfo.path = nil + sinfo.theirSesPub = p.SendSesPub + sinfo.theirHandle = p.Handle + sinfo.sharedSesKey = *crypto.GetSharedKey(&sinfo.mySesPriv, &sinfo.theirSesPub) + sinfo.theirNonce = crypto.BoxNonce{} } if p.MTU >= 1280 || p.MTU == 0 { - s.theirMTU = p.MTU - if s.conn != nil { - s.conn.setMTU(s, s._getMTU()) + sinfo.theirMTU = p.MTU + if sinfo.conn != nil { + sinfo.conn.setMTU(sinfo, sinfo._getMTU()) } } - if !bytes.Equal(s.coords, p.Coords) { + if !bytes.Equal(sinfo.coords, p.Coords) { // allocate enough space for additional coords - s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) + sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) + path := switch_reverseCoordBytes(rpath) + sinfo.path = append(sinfo.path[:0], path...) + defer sinfo._sendPingPong(false, nil) + } else if p.IsPong { + path := switch_reverseCoordBytes(rpath) + sinfo.path = append(sinfo.path[:0], path...) } - s.time = time.Now() - s.tstamp = p.Tstamp - s.reset = false + sinfo.time = time.Now() + sinfo.tstamp = p.Tstamp + sinfo.reset = false defer func() { recover() }() // Recover if the below panics select { - case <-s.init: + case <-sinfo.init: default: // Unblock anything waiting for the session to initialize - close(s.init) + close(sinfo.init) } return true } @@ -304,13 +312,13 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, // Sends a session ping by calling sendPingPong in ping mode. func (sinfo *sessionInfo) ping(from phony.Actor) { sinfo.Act(from, func() { - sinfo._sendPingPong(false) + sinfo._sendPingPong(false, nil) }) } // Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. // Updates the time the last ping was sent in the session info. -func (sinfo *sessionInfo) _sendPingPong(isPong bool) { +func (sinfo *sessionInfo) _sendPingPong(isPong bool, path []byte) { ping := sinfo._getPing() ping.IsPong = isPong bs := ping.encode() @@ -322,10 +330,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { Nonce: *nonce, Payload: payload, } + if path != nil { + p.Coords = append([]byte{0}, path...) + p.Offset += 1 + } packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) - if sinfo.pingTime.Before(sinfo.time) { + if !isPong { sinfo.pingTime = time.Now() } } @@ -339,7 +351,7 @@ func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // If the session has a packet cached (common when first setting up a session), it will be sent. -func (ss *sessions) handlePing(ping *sessionPing) { +func (ss *sessions) handlePing(ping *sessionPing, rpath []byte) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) switch { @@ -368,11 +380,11 @@ func (ss *sessions) handlePing(ping *sessionPing) { if sinfo != nil { sinfo.Act(ss.router, func() { // Update the session - if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ + if !sinfo._update(ping, rpath) { /*panic("Should not happen in testing")*/ return } if !ping.IsPong { - sinfo._sendPingPong(true) + sinfo._sendPingPong(true, switch_reverseCoordBytes(rpath)) } }) } @@ -413,6 +425,8 @@ func (ss *sessions) reset() { sinfo := _sinfo // So we can safely put it in a closure sinfo.Act(ss.router, func() { sinfo.reset = true + sinfo._sendPingPong(false, sinfo.path) + sinfo._sendPingPong(false, nil) }) } } @@ -483,12 +497,20 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { return } sinfo.bytesSent += uint64(len(msg.Message)) - coords := append([]byte(nil), sinfo.coords...) + var coords []byte + var offset uint64 + if len(sinfo.path) > 0 { + coords = append([]byte{0}, sinfo.path...) + offset += 1 + } else { + coords = append([]byte(nil), sinfo.coords...) + } if msg.FlowKey != 0 { coords = append(coords, 0) coords = append(coords, wire_encode_uint64(msg.FlowKey)...) } p := wire_trafficPacket{ + Offset: offset, Coords: coords, Handle: sinfo.theirHandle, Nonce: sinfo.myNonce, @@ -503,6 +525,9 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) + if time.Since(sinfo.pingTime) > 3*time.Second { + sinfo._sendPingPong(false, nil) + } } ch <- callback sinfo.checkCallbacks() diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index afa97c76..ef09c8e7 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -113,7 +113,8 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) { if msgLen > streamMsgSize { return nil, errors.New("oversized message") } - msg := pool_getBytes(int(msgLen)) + msg := pool_getBytes(int(msgLen + 10)) // Extra padding for up to 1 more switchPort + msg = msg[:msgLen] _, err = io.ReadFull(s.inputBuffer, msg) return msg, err } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 2f4f3194..6cab5bca 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -20,10 +20,9 @@ import ( ) const ( - switch_timeout = time.Minute - switch_updateInterval = switch_timeout / 2 - switch_throttle = switch_updateInterval / 2 - switch_faster_threshold = 240 //Number of switch updates before switching to a faster parent + switch_timeout = time.Minute + switch_updateInterval = switch_timeout / 2 + switch_throttle = switch_updateInterval / 2 ) // The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it. @@ -136,15 +135,14 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool { // Information about a peer, used by the switch to build the tree and eventually make routing decisions. type peerInfo struct { - key crypto.SigPubKey // ID of this peer - locator switchLocator // Should be able to respond with signatures upon request - degree uint64 // Self-reported degree - time time.Time // Time this node was last seen - faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower - port switchPort // Interface number of this peer - msg switchMsg // The wire switchMsg used - readBlock bool // True if the link notified us of a read that blocked too long - writeBlock bool // True of the link notified us of a write that blocked too long + key crypto.SigPubKey // ID of this peer + locator switchLocator // Should be able to respond with signatures upon request + degree uint64 // Self-reported degree + time time.Time // Time this node was last seen + port switchPort // Interface number of this peer + msg switchMsg // The wire switchMsg used + readBlock bool // True if the link notified us of a read that blocked too long + writeBlock bool // True of the link notified us of a write that blocked too long } func (pinfo *peerInfo) blocked() bool { @@ -427,37 +425,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi doUpdate := false oldSender := t.data.peers[fromPort] if !equiv(&sender.locator, &oldSender.locator) { - // Reset faster info, we'll start refilling it right after this - sender.faster = nil doUpdate = true } - // Update the matrix of peer "faster" thresholds if reprocessing { - sender.faster = oldSender.faster sender.time = oldSender.time sender.readBlock = oldSender.readBlock sender.writeBlock = oldSender.writeBlock - } else { - sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) - for port, peer := range t.data.peers { - if port == fromPort { - continue - } else if sender.locator.root != peer.locator.root || sender.locator.tstamp > peer.locator.tstamp { - // We were faster than this node, so increment, as long as we don't overflow because of it - if oldSender.faster[peer.port] < switch_faster_threshold { - sender.faster[port] = oldSender.faster[peer.port] + 1 - } else { - sender.faster[port] = switch_faster_threshold - } - } else { - // Slower than this node, penalize (more than the reward amount) - if oldSender.faster[port] > 1 { - sender.faster[port] = oldSender.faster[peer.port] - 2 - } else { - sender.faster[port] = 0 - } - } - } } if sender.blocked() != oldSender.blocked() { doUpdate = true @@ -496,35 +469,11 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi case noParent: // We currently have no working parent, and at this point in the switch statement, anything is better than nothing. updateRoot = true - case sender.faster[t.parent] >= switch_faster_threshold: - // The is reliably faster than the current parent. - updateRoot = true case !sender.blocked() && oldParent.blocked(): // Replace a blocked parent updateRoot = true case reprocessing && sender.blocked() && !oldParent.blocked(): // Don't replace an unblocked parent when reprocessing - case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: - // The sender seems to be reliably faster than the current parent, so switch to them instead. - updateRoot = true - case sender.port != t.parent: - // Ignore further cases if the sender isn't our parent. - case !reprocessing && !equiv(&sender.locator, &t.data.locator): - // Special case: - // If coords changed, then we need to penalize this node somehow, to prevent flapping. - // First, reset all faster-related info to 0. - // Then, de-parent the node and reprocess all messages to find a new parent. - t.parent = 0 - for _, peer := range t.data.peers { - if peer.port == sender.port { - continue - } - t._handleMsg(&peer.msg, peer.port, true) - } - // Process the sender last, to avoid keeping them as a parent if at all possible. - t._handleMsg(&sender.msg, sender.port, true) - case now.Sub(t.time) < switch_throttle: - // We've already gotten an update from this root recently, so ignore this one to avoid flooding. case sender.locator.tstamp > t.data.locator.tstamp: // The timestamp was updated, so we need to update locally and send to our peers. updateRoot = true @@ -631,13 +580,11 @@ func (t *switchTable) start() error { return nil } -func (t *lookupTable) lookup(coords []byte) switchPort { - var offset int +func (t *lookupTable) lookup(ports []switchPort) switchPort { here := &t._start - for offset < len(coords) { - port, l := wire_decode_uint64(coords[offset:]) - offset += l - if next, ok := here.next[switchPort(port)]; ok { + for idx := range ports { + port := ports[idx] + if next, ok := here.next[port]; ok { here = next } else { break @@ -645,3 +592,56 @@ func (t *lookupTable) lookup(coords []byte) switchPort { } return here.port } + +func switch_getPorts(coords []byte) []switchPort { + var ports []switchPort + var offset int + for offset < len(coords) { + port, l := wire_decode_uint64(coords[offset:]) + offset += l + ports = append(ports, switchPort(port)) + } + return ports +} + +func switch_reverseCoordBytes(coords []byte) []byte { + a := switch_getPorts(coords) + for i := len(a)/2 - 1; i >= 0; i-- { + opp := len(a) - 1 - i + a[i], a[opp] = a[opp], a[i] + } + var reversed []byte + for _, sPort := range a { + reversed = wire_put_uint64(uint64(sPort), reversed) + } + return reversed +} + +func (t *lookupTable) isDescendant(ports []switchPort) bool { + // Note that this returns true for anyone in the subtree that starts at us + // That includes ourself, so we are our own descendant by this logic... + if len(t.self.coords) >= len(ports) { + // Our coords are longer, so they can't be our descendant + return false + } + for idx := range t.self.coords { + if ports[idx] != t.self.coords[idx] { + return false + } + } + return true +} + +func (t *lookupTable) getOffset(ports []switchPort) uint64 { + // If they're our descendant, this returns the length of our coords, used as an offset for source routing + // If they're not our descendant, this returns 0 + var offset uint64 + for idx := range t.self.coords { + if idx < len(ports) && ports[idx] == t.self.coords[idx] { + offset += 1 + } else { + return 0 + } + } + return offset +} diff --git a/src/yggdrasil/version.go b/src/yggdrasil/version.go index e0cb38e3..f8ce85b8 100644 --- a/src/yggdrasil/version.go +++ b/src/yggdrasil/version.go @@ -24,7 +24,7 @@ func version_getBaseMetadata() version_metadata { return version_metadata{ meta: [4]byte{'m', 'e', 't', 'a'}, ver: 0, - minorVer: 2, + minorVer: 0, } } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 9746d46e..e8e9bf09 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -96,9 +96,9 @@ func wire_encode_coords(coords []byte) []byte { // Puts a length prefix and the coords into bs, returns the wire formatted coords. // Useful in hot loops where we don't want to allocate and we know the rest of the later parts of the slice are safe to overwrite. -func wire_put_coords(coords []byte, bs []byte) []byte { - bs = wire_put_uint64(uint64(len(coords)), bs) - bs = append(bs, coords...) +func wire_put_vslice(slice []byte, bs []byte) []byte { + bs = wire_put_uint64(uint64(len(slice)), bs) + bs = append(bs, slice...) return bs } @@ -194,14 +194,14 @@ func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { return true } -// A utility function to extract coords from a slice and advance the source slices, returning true if successful. -func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool { - coords, coordLen := wire_decode_coords(*fromSlice) - if coordLen == 0 { +// A utility function to extract a length-prefixed slice (such as coords) from a slice and advance the source slices, returning true if successful. +func wire_chop_vslice(toSlice *[]byte, fromSlice *[]byte) bool { + slice, sliceLen := wire_decode_coords(*fromSlice) + if sliceLen == 0 { // sliceLen is length-prefix size + slice size, in bytes return false } - *toCoords = append((*toCoords)[:0], coords...) - *fromSlice = (*fromSlice)[coordLen:] + *toSlice = append((*toSlice)[:0], slice...) + *fromSlice = (*fromSlice)[sliceLen:] return true } @@ -222,10 +222,12 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { // The wire format for ordinary IPv6 traffic encapsulated by the network. type wire_trafficPacket struct { + Offset uint64 Coords []byte Handle crypto.Handle Nonce crypto.BoxNonce Payload []byte + RPath []byte } // Encodes a wire_trafficPacket into its wire format. @@ -233,10 +235,12 @@ type wire_trafficPacket struct { func (p *wire_trafficPacket) encode() []byte { bs := pool_getBytes(0) bs = wire_put_uint64(wire_Traffic, bs) - bs = wire_put_coords(p.Coords, bs) + bs = wire_put_uint64(p.Offset, bs) + bs = wire_put_vslice(p.Coords, bs) bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) - bs = append(bs, p.Payload...) + bs = wire_put_vslice(p.Payload, bs) + bs = append(bs, p.RPath...) return bs } @@ -250,35 +254,42 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case pType != wire_Traffic: return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_uint64(&p.Offset, &bs): + return false + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_slice(p.Handle[:], &bs): return false case !wire_chop_slice(p.Nonce[:], &bs): return false + case !wire_chop_vslice(&p.Payload, &bs): + return false } - p.Payload = append(p.Payload, bs...) + p.RPath = append(p.RPath[:0], bs...) return true } // The wire format for protocol traffic, such as dht req/res or session ping/pong packets. type wire_protoTrafficPacket struct { + Offset uint64 Coords []byte ToKey crypto.BoxPubKey FromKey crypto.BoxPubKey Nonce crypto.BoxNonce Payload []byte + RPath []byte } // Encodes a wire_protoTrafficPacket into its wire format. func (p *wire_protoTrafficPacket) encode() []byte { - coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) - bs = append(bs, coords...) + bs = wire_put_uint64(p.Offset, bs) + bs = wire_put_vslice(p.Coords, bs) bs = append(bs, p.ToKey[:]...) bs = append(bs, p.FromKey[:]...) bs = append(bs, p.Nonce[:]...) - bs = append(bs, p.Payload...) + bs = wire_put_vslice(p.Payload, bs) + bs = append(bs, p.RPath...) return bs } @@ -290,7 +301,9 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_ProtocolTraffic: return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_uint64(&p.Offset, &bs): + return false + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_slice(p.ToKey[:], &bs): return false @@ -298,11 +311,23 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case !wire_chop_slice(p.Nonce[:], &bs): return false + case !wire_chop_vslice(&p.Payload, &bs): + return false } - p.Payload = bs + p.RPath = append(p.RPath[:0], bs...) return true } +// Get the offset and coord slices of a (protocol) traffic packet without decoding +func wire_getTrafficOffsetAndCoords(packet []byte) ([]byte, []byte) { + _, offsetBegin := wire_decode_uint64(packet) + _, offsetLen := wire_decode_uint64(packet[offsetBegin:]) + offsetEnd := offsetBegin + offsetLen + offset := packet[offsetBegin:offsetEnd] + coords, _ := wire_decode_coords(packet[offsetEnd:]) + return offset, coords +} + // The wire format for link protocol traffic, namely switchMsg. // There's really two layers of this, with the outer layer using permanent keys, and the inner layer using ephemeral keys. // The keys themselves are exchanged as part of the connection setup, and then omitted from the packets. @@ -373,7 +398,7 @@ func (p *sessionPing) decode(bs []byte) bool { return false case !wire_chop_uint64(&tstamp, &bs): return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_uint64(&mtu, &bs): mtu = 1280 @@ -397,7 +422,7 @@ func (p *nodeinfoReqRes) encode() []byte { pTypeVal = wire_NodeInfoRequest } bs := wire_encode_uint64(pTypeVal) - bs = wire_put_coords(p.SendCoords, bs) + bs = wire_put_vslice(p.SendCoords, bs) if pTypeVal == wire_NodeInfoResponse { bs = append(bs, p.NodeInfo...) } @@ -412,7 +437,7 @@ func (p *nodeinfoReqRes) decode(bs []byte) bool { return false case pType != wire_NodeInfoRequest && pType != wire_NodeInfoResponse: return false - case !wire_chop_coords(&p.SendCoords, &bs): + case !wire_chop_vslice(&p.SendCoords, &bs): return false } if p.IsResponse = pType == wire_NodeInfoResponse; p.IsResponse { @@ -446,7 +471,7 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - case !wire_chop_coords(&r.Coords, &bs): + case !wire_chop_vslice(&r.Coords, &bs): return false case !wire_chop_slice(r.Dest[:], &bs): return false @@ -477,7 +502,7 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - case !wire_chop_coords(&r.Coords, &bs): + case !wire_chop_vslice(&r.Coords, &bs): return false case !wire_chop_slice(r.Dest[:], &bs): return false @@ -487,7 +512,7 @@ func (r *dhtRes) decode(bs []byte) bool { switch { case !wire_chop_slice(info.key[:], &bs): return false - case !wire_chop_coords(&info.coords, &bs): + case !wire_chop_vslice(&info.coords, &bs): return false } r.Infos = append(r.Infos, &info)