From 92dbb48eda4f412831d34f206e9b36d5c0f4ddb5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 06:18:09 -0600 Subject: [PATCH 01/11] add (but don't use) offset field for (protocol) traffic packets --- src/yggdrasil/packetqueue.go | 3 ++- src/yggdrasil/peer.go | 9 +-------- src/yggdrasil/version.go | 2 +- src/yggdrasil/wire.go | 18 ++++++++++++++++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 6273e6c8..61f3cfd2 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -54,7 +54,8 @@ func (q *packetQueue) drop() bool { } func (q *packetQueue) push(packet []byte) { - id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + _, coords := wire_getTrafficOffsetAndCoords(packet) + id := pqStreamID(coords) // just coords for now info := pqPacketInfo{packet: packet, time: time.Now()} for idx := range q.streams { if q.streams[idx].id == id { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index f04ab280..1d1b8054 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -236,13 +236,6 @@ func (p *peer) _handlePacket(packet []byte) { } } -// Get the coords of a packet without decoding -func peer_getPacketCoords(packet []byte) []byte { - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - return coords -} - // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { @@ -250,7 +243,7 @@ func (p *peer) _handleTraffic(packet []byte) { // Drop traffic if the peer isn't in the switch return } - coords := peer_getPacketCoords(packet) + _, coords := wire_getTrafficOffsetAndCoords(packet) next := p.table.lookup(coords) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketFrom(p, packet) diff --git a/src/yggdrasil/version.go b/src/yggdrasil/version.go index e0cb38e3..f8ce85b8 100644 --- a/src/yggdrasil/version.go +++ b/src/yggdrasil/version.go @@ -24,7 +24,7 @@ func version_getBaseMetadata() version_metadata { return version_metadata{ meta: [4]byte{'m', 'e', 't', 'a'}, ver: 0, - minorVer: 2, + minorVer: 0, } } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 9746d46e..9c871031 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -222,6 +222,7 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { // The wire format for ordinary IPv6 traffic encapsulated by the network. type wire_trafficPacket struct { + Offset uint64 Coords []byte Handle crypto.Handle Nonce crypto.BoxNonce @@ -233,6 +234,7 @@ type wire_trafficPacket struct { func (p *wire_trafficPacket) encode() []byte { bs := pool_getBytes(0) bs = wire_put_uint64(wire_Traffic, bs) + bs = wire_put_uint64(p.Offset, bs) bs = wire_put_coords(p.Coords, bs) bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) @@ -250,6 +252,8 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case pType != wire_Traffic: return false + case !wire_chop_uint64(&p.Offset, &bs): + return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.Handle[:], &bs): @@ -263,6 +267,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { // The wire format for protocol traffic, such as dht req/res or session ping/pong packets. type wire_protoTrafficPacket struct { + Offset uint64 Coords []byte ToKey crypto.BoxPubKey FromKey crypto.BoxPubKey @@ -274,6 +279,7 @@ type wire_protoTrafficPacket struct { func (p *wire_protoTrafficPacket) encode() []byte { coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) + bs = wire_put_uint64(p.Offset, bs) bs = append(bs, coords...) bs = append(bs, p.ToKey[:]...) bs = append(bs, p.FromKey[:]...) @@ -290,6 +296,8 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case pType != wire_ProtocolTraffic: return false + case !wire_chop_uint64(&p.Offset, &bs): + return false case !wire_chop_coords(&p.Coords, &bs): return false case !wire_chop_slice(p.ToKey[:], &bs): @@ -303,6 +311,16 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return true } +// Get the offset and coord slices of a (protocol) traffic packet without decoding +func wire_getTrafficOffsetAndCoords(packet []byte) ([]byte, []byte) { + _, offsetBegin := wire_decode_uint64(packet) + _, offsetLen := wire_decode_uint64(packet[offsetBegin:]) + offsetEnd := offsetBegin + offsetLen + offset := packet[offsetBegin:offsetEnd] + coords, _ := wire_decode_coords(packet[offsetEnd:]) + return offset, coords +} + // The wire format for link protocol traffic, namely switchMsg. // There's really two layers of this, with the outer layer using permanent keys, and the inner layer using ephemeral keys. // The keys themselves are exchanged as part of the connection setup, and then omitted from the packets. From 36e4ce4b0b2748696b6d0e61d3baeb98ad9e64a8 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 07:10:13 -0600 Subject: [PATCH 02/11] WIP rough implementation of the source routed part of hybrid routing, does not work if coord length is too long (>127 hops) --- src/yggdrasil/peer.go | 21 +++++++++++++++-- src/yggdrasil/switch.go | 50 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 1d1b8054..b3629311 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -243,8 +243,25 @@ func (p *peer) _handleTraffic(packet []byte) { // Drop traffic if the peer isn't in the switch return } - _, coords := wire_getTrafficOffsetAndCoords(packet) - next := p.table.lookup(coords) + obs, coords := wire_getTrafficOffsetAndCoords(packet) + offset, _ := wire_decode_uint64(obs) + ports := p.table.getPorts(coords) + if offset == 0 { + offset = p.table.getOffset(ports) + } + var next switchPort + if offset == 0 { + // Greedy routing, find the best next hop + next = p.table.lookup(ports) + } else { + // Source routing, read next hop from coords and update offset/obs + if int(offset) < len(ports) { + next = ports[offset] + offset += 1 + // FIXME this breaks if offset is > 127, it's just for testing + wire_put_uint64(offset, obs[:0]) + } + } if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketFrom(p, packet) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 2f4f3194..4873d8b4 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -631,13 +631,11 @@ func (t *switchTable) start() error { return nil } -func (t *lookupTable) lookup(coords []byte) switchPort { - var offset int +func (t *lookupTable) lookup(ports []switchPort) switchPort { here := &t._start - for offset < len(coords) { - port, l := wire_decode_uint64(coords[offset:]) - offset += l - if next, ok := here.next[switchPort(port)]; ok { + for idx := range ports { + port := ports[idx] + if next, ok := here.next[port]; ok { here = next } else { break @@ -645,3 +643,43 @@ func (t *lookupTable) lookup(coords []byte) switchPort { } return here.port } + +func (t *lookupTable) getPorts(coords []byte) []switchPort { + var ports []switchPort + var offset int + for offset < len(coords) { + port, l := wire_decode_uint64(coords[offset:]) + offset += l + ports = append(ports, switchPort(port)) + } + return ports +} + +func (t *lookupTable) isDescendant(ports []switchPort) bool { + // Note that this returns true for anyone in the subtree that starts at us + // That includes ourself, so we are our own descendant by this logic... + if len(t.self.coords) >= len(ports) { + // Our coords are longer, so they can't be our descendant + return false + } + for idx := range t.self.coords { + if ports[idx] != t.self.coords[idx] { + return false + } + } + return true +} + +func (t *lookupTable) getOffset(ports []switchPort) uint64 { + // If they're our descendant, this returns the length of our coords, used as an offset for source routing + // If they're not our descendant, this returns 0 + var offset uint64 + for idx := range t.self.coords { + if idx < len(ports) && ports[idx] == t.self.coords[idx] { + offset += 1 + } else { + return 0 + } + } + return offset +} From e2521de94d0743800ae53a45dcb666200b5ece21 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 09:44:34 -0600 Subject: [PATCH 03/11] add path information to (protocol) traffic packets as they flow through the network, and a field for a reply path --- src/yggdrasil/peer.go | 1 + src/yggdrasil/stream.go | 3 +- src/yggdrasil/wire.go | 63 +++++++++++++++++++++++++---------------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index b3629311..9f367695 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -262,6 +262,7 @@ func (p *peer) _handleTraffic(packet []byte) { wire_put_uint64(offset, obs[:0]) } } + packet = wire_put_uint64(uint64(next), packet) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketFrom(p, packet) } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index afa97c76..ab6336f1 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -113,7 +113,8 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) { if msgLen > streamMsgSize { return nil, errors.New("oversized message") } - msg := pool_getBytes(int(msgLen)) + msg := pool_getBytes(int(msgLen + 10)) // Extra padding for up to 1 more switchPort + msg = msg[msgLen:] _, err = io.ReadFull(s.inputBuffer, msg) return msg, err } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 9c871031..6746cd74 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -96,9 +96,9 @@ func wire_encode_coords(coords []byte) []byte { // Puts a length prefix and the coords into bs, returns the wire formatted coords. // Useful in hot loops where we don't want to allocate and we know the rest of the later parts of the slice are safe to overwrite. -func wire_put_coords(coords []byte, bs []byte) []byte { - bs = wire_put_uint64(uint64(len(coords)), bs) - bs = append(bs, coords...) +func wire_put_vslice(slice []byte, bs []byte) []byte { + bs = wire_put_uint64(uint64(len(slice)), bs) + bs = append(bs, slice...) return bs } @@ -194,14 +194,14 @@ func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { return true } -// A utility function to extract coords from a slice and advance the source slices, returning true if successful. -func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool { - coords, coordLen := wire_decode_coords(*fromSlice) - if coordLen == 0 { +// A utility function to extract a length-prefixed slice (such as coords) from a slice and advance the source slices, returning true if successful. +func wire_chop_vslice(toSlice *[]byte, fromSlice *[]byte) bool { + slice, sliceLen := wire_decode_coords(*fromSlice) + if sliceLen == 0 { // sliceLen is length-prefix size + slice size, in bytes return false } - *toCoords = append((*toCoords)[:0], coords...) - *fromSlice = (*fromSlice)[coordLen:] + *toSlice = append((*toSlice)[:0], slice...) + *fromSlice = (*fromSlice)[sliceLen:] return true } @@ -227,6 +227,8 @@ type wire_trafficPacket struct { Handle crypto.Handle Nonce crypto.BoxNonce Payload []byte + RPath []byte + Path []byte } // Encodes a wire_trafficPacket into its wire format. @@ -235,10 +237,12 @@ func (p *wire_trafficPacket) encode() []byte { bs := pool_getBytes(0) bs = wire_put_uint64(wire_Traffic, bs) bs = wire_put_uint64(p.Offset, bs) - bs = wire_put_coords(p.Coords, bs) + bs = wire_put_vslice(p.Coords, bs) bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) - bs = append(bs, p.Payload...) + bs = wire_put_vslice(p.Payload, bs) + bs = wire_put_vslice(p.RPath, bs) + bs = append(bs, p.Path...) return bs } @@ -254,14 +258,18 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case !wire_chop_uint64(&p.Offset, &bs): return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_slice(p.Handle[:], &bs): return false case !wire_chop_slice(p.Nonce[:], &bs): return false + case !wire_chop_vslice(&p.Payload, &bs): + return false + case !wire_chop_vslice(&p.RPath, &bs): + return false } - p.Payload = append(p.Payload, bs...) + p.Path = bs return true } @@ -273,18 +281,21 @@ type wire_protoTrafficPacket struct { FromKey crypto.BoxPubKey Nonce crypto.BoxNonce Payload []byte + RPath []byte + Path []byte } // Encodes a wire_protoTrafficPacket into its wire format. func (p *wire_protoTrafficPacket) encode() []byte { - coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) bs = wire_put_uint64(p.Offset, bs) - bs = append(bs, coords...) + bs = wire_put_vslice(p.Coords, bs) bs = append(bs, p.ToKey[:]...) bs = append(bs, p.FromKey[:]...) bs = append(bs, p.Nonce[:]...) - bs = append(bs, p.Payload...) + bs = wire_put_vslice(p.Payload, bs) + bs = wire_put_vslice(p.RPath, bs) + bs = append(bs, p.Path...) return bs } @@ -298,7 +309,7 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case !wire_chop_uint64(&p.Offset, &bs): return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_slice(p.ToKey[:], &bs): return false @@ -306,8 +317,12 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case !wire_chop_slice(p.Nonce[:], &bs): return false + case !wire_chop_vslice(&p.Payload, &bs): + return false + case !wire_chop_vslice(&p.RPath, &bs): + return false } - p.Payload = bs + p.Path = bs return true } @@ -391,7 +406,7 @@ func (p *sessionPing) decode(bs []byte) bool { return false case !wire_chop_uint64(&tstamp, &bs): return false - case !wire_chop_coords(&p.Coords, &bs): + case !wire_chop_vslice(&p.Coords, &bs): return false case !wire_chop_uint64(&mtu, &bs): mtu = 1280 @@ -415,7 +430,7 @@ func (p *nodeinfoReqRes) encode() []byte { pTypeVal = wire_NodeInfoRequest } bs := wire_encode_uint64(pTypeVal) - bs = wire_put_coords(p.SendCoords, bs) + bs = wire_put_vslice(p.SendCoords, bs) if pTypeVal == wire_NodeInfoResponse { bs = append(bs, p.NodeInfo...) } @@ -430,7 +445,7 @@ func (p *nodeinfoReqRes) decode(bs []byte) bool { return false case pType != wire_NodeInfoRequest && pType != wire_NodeInfoResponse: return false - case !wire_chop_coords(&p.SendCoords, &bs): + case !wire_chop_vslice(&p.SendCoords, &bs): return false } if p.IsResponse = pType == wire_NodeInfoResponse; p.IsResponse { @@ -464,7 +479,7 @@ func (r *dhtReq) decode(bs []byte) bool { return false case pType != wire_DHTLookupRequest: return false - case !wire_chop_coords(&r.Coords, &bs): + case !wire_chop_vslice(&r.Coords, &bs): return false case !wire_chop_slice(r.Dest[:], &bs): return false @@ -495,7 +510,7 @@ func (r *dhtRes) decode(bs []byte) bool { return false case pType != wire_DHTLookupResponse: return false - case !wire_chop_coords(&r.Coords, &bs): + case !wire_chop_vslice(&r.Coords, &bs): return false case !wire_chop_slice(r.Dest[:], &bs): return false @@ -505,7 +520,7 @@ func (r *dhtRes) decode(bs []byte) bool { switch { case !wire_chop_slice(info.key[:], &bs): return false - case !wire_chop_coords(&info.coords, &bs): + case !wire_chop_vslice(&info.coords, &bs): return false } r.Infos = append(r.Infos, &info) From b5cd40b801a45baf9eedeba30c2c725014a1a8f9 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 10:50:55 -0600 Subject: [PATCH 04/11] WIP very simple insecure proof-of-concept for pathfinding and source routing --- src/yggdrasil/session.go | 19 ++++++++++++++++++- src/yggdrasil/stream.go | 2 +- src/yggdrasil/wire.go | 4 ++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 360f2a1b..6a9589fa 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -50,6 +50,8 @@ type sessionInfo struct { conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers table *lookupTable // table.self is a locator where we get our coords + path []byte // Path from self to destination + rpath []byte // Path from destination to self } // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -328,6 +330,10 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { if sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } + // Sending a ping may happen when we don't know if our path info is good anymore... + // Reset paths just to be safe... + sinfo.path = nil + sinfo.rpath = nil } func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { @@ -468,6 +474,8 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) + sinfo.path = append(sinfo.path[:0], p.RPath...) + sinfo.rpath = append(sinfo.rpath[:0], p.Path...) } ch <- callback sinfo.checkCallbacks() @@ -483,15 +491,24 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { return } sinfo.bytesSent += uint64(len(msg.Message)) - coords := append([]byte(nil), sinfo.coords...) + var coords []byte + var offset uint64 + if len(sinfo.path) > 0 && len(sinfo.path) <= len(sinfo.rpath) { + coords = append([]byte{0}, sinfo.path...) + offset += 1 + } else { + coords = append([]byte(nil), sinfo.coords...) + } if msg.FlowKey != 0 { coords = append(coords, 0) coords = append(coords, wire_encode_uint64(msg.FlowKey)...) } p := wire_trafficPacket{ + Offset: offset, Coords: coords, Handle: sinfo.theirHandle, Nonce: sinfo.myNonce, + RPath: sinfo.rpath, } sinfo.myNonce.Increment() k := sinfo.sharedSesKey diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index ab6336f1..ef09c8e7 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -114,7 +114,7 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) { return nil, errors.New("oversized message") } msg := pool_getBytes(int(msgLen + 10)) // Extra padding for up to 1 more switchPort - msg = msg[msgLen:] + msg = msg[:msgLen] _, err = io.ReadFull(s.inputBuffer, msg) return msg, err } diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 6746cd74..e0c2614a 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -269,7 +269,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { case !wire_chop_vslice(&p.RPath, &bs): return false } - p.Path = bs + p.Path = append(p.Path[:0], bs...) return true } @@ -322,7 +322,7 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { case !wire_chop_vslice(&p.RPath, &bs): return false } - p.Path = bs + p.Path = append(p.Path[:0], bs...) return true } From 994c26e5f71a605ab0b83c195abefce856859453 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 12:08:01 -0600 Subject: [PATCH 05/11] simplify pathfinder --- src/yggdrasil/peer.go | 4 ++-- src/yggdrasil/session.go | 14 +++++++++++--- src/yggdrasil/switch.go | 2 +- src/yggdrasil/wire.go | 16 ++++------------ 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 9f367695..74e1023f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -245,7 +245,7 @@ func (p *peer) _handleTraffic(packet []byte) { } obs, coords := wire_getTrafficOffsetAndCoords(packet) offset, _ := wire_decode_uint64(obs) - ports := p.table.getPorts(coords) + ports := switch_getPorts(coords) if offset == 0 { offset = p.table.getOffset(ports) } @@ -262,7 +262,7 @@ func (p *peer) _handleTraffic(packet []byte) { wire_put_uint64(offset, obs[:0]) } } - packet = wire_put_uint64(uint64(next), packet) + packet = wire_put_uint64(uint64(p.port), packet) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketFrom(p, packet) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 6a9589fa..4a06beed 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -474,8 +474,16 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) - sinfo.path = append(sinfo.path[:0], p.RPath...) - sinfo.rpath = append(sinfo.rpath[:0], p.Path...) + a := switch_getPorts(p.RPath) + for i := len(a)/2 - 1; i >= 0; i-- { + opp := len(a) - 1 - i + a[i], a[opp] = a[opp], a[i] + } + sinfo.path = sinfo.path[:0] + for _, sPort := range a { + sinfo.path = wire_put_uint64(uint64(sPort), sinfo.path) + } + //sinfo.rpath = append(sinfo.rpath[:0], p.Path...) } ch <- callback sinfo.checkCallbacks() @@ -493,7 +501,7 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { sinfo.bytesSent += uint64(len(msg.Message)) var coords []byte var offset uint64 - if len(sinfo.path) > 0 && len(sinfo.path) <= len(sinfo.rpath) { + if len(sinfo.path) > 0 { coords = append([]byte{0}, sinfo.path...) offset += 1 } else { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 4873d8b4..c291d82d 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -644,7 +644,7 @@ func (t *lookupTable) lookup(ports []switchPort) switchPort { return here.port } -func (t *lookupTable) getPorts(coords []byte) []switchPort { +func switch_getPorts(coords []byte) []switchPort { var ports []switchPort var offset int for offset < len(coords) { diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index e0c2614a..e8e9bf09 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -228,7 +228,6 @@ type wire_trafficPacket struct { Nonce crypto.BoxNonce Payload []byte RPath []byte - Path []byte } // Encodes a wire_trafficPacket into its wire format. @@ -241,8 +240,7 @@ func (p *wire_trafficPacket) encode() []byte { bs = append(bs, p.Handle[:]...) bs = append(bs, p.Nonce[:]...) bs = wire_put_vslice(p.Payload, bs) - bs = wire_put_vslice(p.RPath, bs) - bs = append(bs, p.Path...) + bs = append(bs, p.RPath...) return bs } @@ -266,10 +264,8 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return false case !wire_chop_vslice(&p.Payload, &bs): return false - case !wire_chop_vslice(&p.RPath, &bs): - return false } - p.Path = append(p.Path[:0], bs...) + p.RPath = append(p.RPath[:0], bs...) return true } @@ -282,7 +278,6 @@ type wire_protoTrafficPacket struct { Nonce crypto.BoxNonce Payload []byte RPath []byte - Path []byte } // Encodes a wire_protoTrafficPacket into its wire format. @@ -294,8 +289,7 @@ func (p *wire_protoTrafficPacket) encode() []byte { bs = append(bs, p.FromKey[:]...) bs = append(bs, p.Nonce[:]...) bs = wire_put_vslice(p.Payload, bs) - bs = wire_put_vslice(p.RPath, bs) - bs = append(bs, p.Path...) + bs = append(bs, p.RPath...) return bs } @@ -319,10 +313,8 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return false case !wire_chop_vslice(&p.Payload, &bs): return false - case !wire_chop_vslice(&p.RPath, &bs): - return false } - p.Path = append(p.Path[:0], bs...) + p.RPath = append(p.RPath[:0], bs...) return true } From e19e938f642ae530e0d94119d590a403ae3cb0b6 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 7 Nov 2020 15:19:09 -0600 Subject: [PATCH 06/11] safer pathfinding behavior --- src/yggdrasil/router.go | 12 +++--- src/yggdrasil/session.go | 79 ++++++++++++++++++++-------------------- src/yggdrasil/switch.go | 13 +++++++ 3 files changed, 59 insertions(+), 45 deletions(-) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d387346e..f89b26f9 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -196,9 +196,9 @@ func (r *router) _handleProto(packet []byte) { } switch bsType { case wire_SessionPing: - r._handlePing(bs, &p.FromKey) + r._handlePing(bs, &p.FromKey, p.RPath) case wire_SessionPong: - r._handlePong(bs, &p.FromKey) + r._handlePong(bs, &p.FromKey, p.RPath) case wire_NodeInfoRequest: fallthrough case wire_NodeInfoResponse: @@ -212,18 +212,18 @@ func (r *router) _handleProto(packet []byte) { } // Decodes session pings from wire format and passes them to sessions.handlePing where they either create or update a session. -func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { ping := sessionPing{} if !ping.decode(bs) { return } ping.SendPermPub = *fromKey - r.sessions.handlePing(&ping) + r.sessions.handlePing(&ping, rpath) } // Handles session pongs (which are really pings with an extra flag to prevent acknowledgement). -func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey) { - r._handlePing(bs, fromKey) +func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { + r._handlePing(bs, fromKey, rpath) } // Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 4a06beed..181a199a 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -51,7 +51,6 @@ type sessionInfo struct { callbacks []chan func() // Finished work from crypto workers table *lookupTable // table.self is a locator where we get our coords path []byte // Path from self to destination - rpath []byte // Path from destination to self } // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -67,41 +66,46 @@ type sessionPing struct { // Updates session info in response to a ping, after checking that the ping is OK. // Returns true if the session was updated, or false otherwise. -func (s *sessionInfo) _update(p *sessionPing) bool { - if !(p.Tstamp > s.tstamp) { +func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool { + if !(p.Tstamp > sinfo.tstamp) { // To protect against replay attacks return false } - if p.SendPermPub != s.theirPermPub { + if p.SendPermPub != sinfo.theirPermPub { // Should only happen if two sessions got the same handle // That shouldn't be allowed anyway, but if it happens then let one time out return false } - if p.SendSesPub != s.theirSesPub { - s.theirSesPub = p.SendSesPub - s.theirHandle = p.Handle - s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub) - s.theirNonce = crypto.BoxNonce{} + if p.SendSesPub != sinfo.theirSesPub { + sinfo.path = nil + sinfo.theirSesPub = p.SendSesPub + sinfo.theirHandle = p.Handle + sinfo.sharedSesKey = *crypto.GetSharedKey(&sinfo.mySesPriv, &sinfo.theirSesPub) + sinfo.theirNonce = crypto.BoxNonce{} } if p.MTU >= 1280 || p.MTU == 0 { - s.theirMTU = p.MTU - if s.conn != nil { - s.conn.setMTU(s, s._getMTU()) + sinfo.theirMTU = p.MTU + if sinfo.conn != nil { + sinfo.conn.setMTU(sinfo, sinfo._getMTU()) } } - if !bytes.Equal(s.coords, p.Coords) { + if !bytes.Equal(sinfo.coords, p.Coords) { // allocate enough space for additional coords - s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) + sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) } - s.time = time.Now() - s.tstamp = p.Tstamp - s.reset = false + sinfo.time = time.Now() + sinfo.tstamp = p.Tstamp + if p.IsPong && sinfo.path == nil { + path := switch_reverseCoordBytes(rpath) + sinfo.path = append(sinfo.path[:0], path...) + } + sinfo.reset = false defer func() { recover() }() // Recover if the below panics select { - case <-s.init: + case <-sinfo.init: default: // Unblock anything waiting for the session to initialize - close(s.init) + close(sinfo.init) } return true } @@ -306,13 +310,13 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, // Sends a session ping by calling sendPingPong in ping mode. func (sinfo *sessionInfo) ping(from phony.Actor) { sinfo.Act(from, func() { - sinfo._sendPingPong(false) + sinfo._sendPingPong(false, nil) }) } // Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. // Updates the time the last ping was sent in the session info. -func (sinfo *sessionInfo) _sendPingPong(isPong bool) { +func (sinfo *sessionInfo) _sendPingPong(isPong bool, path []byte) { ping := sinfo._getPing() ping.IsPong = isPong bs := ping.encode() @@ -324,16 +328,21 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) { Nonce: *nonce, Payload: payload, } + if path != nil { + p.Coords = append([]byte{0}, path...) + p.Offset += 1 + } packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) - if sinfo.pingTime.Before(sinfo.time) { + if !isPong && sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } - // Sending a ping may happen when we don't know if our path info is good anymore... - // Reset paths just to be safe... - sinfo.path = nil - sinfo.rpath = nil + if !isPong { + // Sending a ping may happen when we don't know if our path info is good anymore... + // Reset paths just to be safe... + sinfo.path = nil + } } func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { @@ -345,7 +354,7 @@ func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // If the session has a packet cached (common when first setting up a session), it will be sent. -func (ss *sessions) handlePing(ping *sessionPing) { +func (ss *sessions) handlePing(ping *sessionPing, rpath []byte) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) switch { @@ -374,11 +383,11 @@ func (ss *sessions) handlePing(ping *sessionPing) { if sinfo != nil { sinfo.Act(ss.router, func() { // Update the session - if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ + if !sinfo._update(ping, rpath) { /*panic("Should not happen in testing")*/ return } if !ping.IsPong { - sinfo._sendPingPong(true) + sinfo._sendPingPong(true, switch_reverseCoordBytes(rpath)) } }) } @@ -474,16 +483,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) - a := switch_getPorts(p.RPath) - for i := len(a)/2 - 1; i >= 0; i-- { - opp := len(a) - 1 - i - a[i], a[opp] = a[opp], a[i] + if sinfo.path == nil { + sinfo._sendPingPong(false, nil) } - sinfo.path = sinfo.path[:0] - for _, sPort := range a { - sinfo.path = wire_put_uint64(uint64(sPort), sinfo.path) - } - //sinfo.rpath = append(sinfo.rpath[:0], p.Path...) } ch <- callback sinfo.checkCallbacks() @@ -516,7 +518,6 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { Coords: coords, Handle: sinfo.theirHandle, Nonce: sinfo.myNonce, - RPath: sinfo.rpath, } sinfo.myNonce.Increment() k := sinfo.sharedSesKey diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index c291d82d..a245560e 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -655,6 +655,19 @@ func switch_getPorts(coords []byte) []switchPort { return ports } +func switch_reverseCoordBytes(coords []byte) []byte { + a := switch_getPorts(coords) + for i := len(a)/2 - 1; i >= 0; i-- { + opp := len(a) - 1 - i + a[i], a[opp] = a[opp], a[i] + } + var reversed []byte + for _, sPort := range a { + reversed = wire_put_uint64(uint64(sPort), reversed) + } + return reversed +} + func (t *lookupTable) isDescendant(ports []switchPort) bool { // Note that this returns true for anyone in the subtree that starts at us // That includes ourself, so we are our own descendant by this logic... From 0ac203b007f0427cba34be40aa5bc79ccfb1c304 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 8 Nov 2020 05:39:30 -0600 Subject: [PATCH 07/11] adjust how sessions learn source routes, try to recover faster if coords change (but assume the old path still works until we get a ping through that gives us a new path) --- src/yggdrasil/session.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 181a199a..9a69db94 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -95,7 +95,7 @@ func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool { } sinfo.time = time.Now() sinfo.tstamp = p.Tstamp - if p.IsPong && sinfo.path == nil { + if p.IsPong { path := switch_reverseCoordBytes(rpath) sinfo.path = append(sinfo.path[:0], path...) } @@ -335,13 +335,8 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool, path []byte) { packet := p.encode() // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) - if !isPong && sinfo.pingTime.Before(sinfo.time) { - sinfo.pingTime = time.Now() - } if !isPong { - // Sending a ping may happen when we don't know if our path info is good anymore... - // Reset paths just to be safe... - sinfo.path = nil + sinfo.pingTime = time.Now() } } @@ -483,9 +478,6 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { sinfo._updateNonce(&p.Nonce) sinfo.bytesRecvd += uint64(len(bs)) sinfo.conn.recvMsg(sinfo, bs) - if sinfo.path == nil { - sinfo._sendPingPong(false, nil) - } } ch <- callback sinfo.checkCallbacks() @@ -529,6 +521,9 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) + if time.Since(sinfo.pingTime) > 3*time.Second { + sinfo._sendPingPong(false, nil) + } } ch <- callback sinfo.checkCallbacks() From 144d42c773ad072e6d2311e76ad86c0fa98d7d85 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 8 Nov 2020 06:09:55 -0600 Subject: [PATCH 08/11] send dht responses via reverse path (fixes some possible DDoS issues with the old coord approach) --- src/yggdrasil/dht.go | 10 ++++++---- src/yggdrasil/router.go | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index bbc31546..a68ba2bd 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -185,7 +185,7 @@ func dht_ordered(first, second, third *crypto.NodeID) bool { // Reads a request, performs a lookup, and responds. // Update info about the node that sent the request. -func (t *dht) handleReq(req *dhtReq) { +func (t *dht) handleReq(req *dhtReq, rpath []byte) { // Send them what they asked for res := dhtRes{ Key: t.router.core.boxPub, @@ -193,7 +193,7 @@ func (t *dht) handleReq(req *dhtReq) { Dest: req.Dest, Infos: t.lookup(&req.Dest, false), } - t.sendRes(&res, req) + t.sendRes(&res, req, rpath) // Also add them to our DHT info := dhtInfo{ key: req.Key, @@ -213,13 +213,15 @@ func (t *dht) handleReq(req *dhtReq) { } // Sends a lookup response to the specified node. -func (t *dht) sendRes(res *dhtRes, req *dhtReq) { +func (t *dht) sendRes(res *dhtRes, req *dhtReq, rpath []byte) { // Send a reply for a dhtReq bs := res.encode() shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &req.Key) payload, nonce := crypto.BoxSeal(shared, bs, nil) + path := append([]byte{0}, switch_reverseCoordBytes(rpath)...) p := wire_protoTrafficPacket{ - Coords: req.Coords, + Offset: 1, + Coords: path, ToKey: req.Key, FromKey: t.router.core.boxPub, Nonce: *nonce, diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index f89b26f9..089c49e7 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -204,7 +204,7 @@ func (r *router) _handleProto(packet []byte) { case wire_NodeInfoResponse: r._handleNodeInfo(bs, &p.FromKey) case wire_DHTLookupRequest: - r._handleDHTReq(bs, &p.FromKey) + r._handleDHTReq(bs, &p.FromKey, p.RPath) case wire_DHTLookupResponse: r._handleDHTRes(bs, &p.FromKey) default: @@ -227,13 +227,13 @@ func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) } // Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response. -func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { req := dhtReq{} if !req.decode(bs) { return } req.Key = *fromKey - r.dht.handleReq(&req) + r.dht.handleReq(&req, rpath) } // Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). From 428789f24c62ac5a936e363a9f4b7488b5ebcd41 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Mon, 9 Nov 2020 19:01:11 -0600 Subject: [PATCH 09/11] simplify switch parent selection and minor source routing improvements --- src/yggdrasil/dht.go | 1 - src/yggdrasil/session.go | 12 ++++--- src/yggdrasil/switch.go | 73 ++++++---------------------------------- 3 files changed, 19 insertions(+), 67 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index a68ba2bd..21ff8472 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -94,7 +94,6 @@ func (t *dht) reset() { t.ping(info, nil) } } - t.reqs = make(map[dhtReqKey]time.Time) t.table = make(map[crypto.NodeID]*dhtInfo) t.imp = nil } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 9a69db94..8fd7cd5b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -92,13 +92,15 @@ func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool { if !bytes.Equal(sinfo.coords, p.Coords) { // allocate enough space for additional coords sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) - } - sinfo.time = time.Now() - sinfo.tstamp = p.Tstamp - if p.IsPong { + path := switch_reverseCoordBytes(rpath) + sinfo.path = append(sinfo.path[:0], path...) + defer sinfo._sendPingPong(false, nil) + } else if p.IsPong { path := switch_reverseCoordBytes(rpath) sinfo.path = append(sinfo.path[:0], path...) } + sinfo.time = time.Now() + sinfo.tstamp = p.Tstamp sinfo.reset = false defer func() { recover() }() // Recover if the below panics select { @@ -423,6 +425,8 @@ func (ss *sessions) reset() { sinfo := _sinfo // So we can safely put it in a closure sinfo.Act(ss.router, func() { sinfo.reset = true + sinfo._sendPingPong(false, sinfo.path) + sinfo._sendPingPong(false, nil) }) } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a245560e..6cab5bca 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -20,10 +20,9 @@ import ( ) const ( - switch_timeout = time.Minute - switch_updateInterval = switch_timeout / 2 - switch_throttle = switch_updateInterval / 2 - switch_faster_threshold = 240 //Number of switch updates before switching to a faster parent + switch_timeout = time.Minute + switch_updateInterval = switch_timeout / 2 + switch_throttle = switch_updateInterval / 2 ) // The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it. @@ -136,15 +135,14 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool { // Information about a peer, used by the switch to build the tree and eventually make routing decisions. type peerInfo struct { - key crypto.SigPubKey // ID of this peer - locator switchLocator // Should be able to respond with signatures upon request - degree uint64 // Self-reported degree - time time.Time // Time this node was last seen - faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower - port switchPort // Interface number of this peer - msg switchMsg // The wire switchMsg used - readBlock bool // True if the link notified us of a read that blocked too long - writeBlock bool // True of the link notified us of a write that blocked too long + key crypto.SigPubKey // ID of this peer + locator switchLocator // Should be able to respond with signatures upon request + degree uint64 // Self-reported degree + time time.Time // Time this node was last seen + port switchPort // Interface number of this peer + msg switchMsg // The wire switchMsg used + readBlock bool // True if the link notified us of a read that blocked too long + writeBlock bool // True of the link notified us of a write that blocked too long } func (pinfo *peerInfo) blocked() bool { @@ -427,37 +425,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi doUpdate := false oldSender := t.data.peers[fromPort] if !equiv(&sender.locator, &oldSender.locator) { - // Reset faster info, we'll start refilling it right after this - sender.faster = nil doUpdate = true } - // Update the matrix of peer "faster" thresholds if reprocessing { - sender.faster = oldSender.faster sender.time = oldSender.time sender.readBlock = oldSender.readBlock sender.writeBlock = oldSender.writeBlock - } else { - sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) - for port, peer := range t.data.peers { - if port == fromPort { - continue - } else if sender.locator.root != peer.locator.root || sender.locator.tstamp > peer.locator.tstamp { - // We were faster than this node, so increment, as long as we don't overflow because of it - if oldSender.faster[peer.port] < switch_faster_threshold { - sender.faster[port] = oldSender.faster[peer.port] + 1 - } else { - sender.faster[port] = switch_faster_threshold - } - } else { - // Slower than this node, penalize (more than the reward amount) - if oldSender.faster[port] > 1 { - sender.faster[port] = oldSender.faster[peer.port] - 2 - } else { - sender.faster[port] = 0 - } - } - } } if sender.blocked() != oldSender.blocked() { doUpdate = true @@ -496,35 +469,11 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi case noParent: // We currently have no working parent, and at this point in the switch statement, anything is better than nothing. updateRoot = true - case sender.faster[t.parent] >= switch_faster_threshold: - // The is reliably faster than the current parent. - updateRoot = true case !sender.blocked() && oldParent.blocked(): // Replace a blocked parent updateRoot = true case reprocessing && sender.blocked() && !oldParent.blocked(): // Don't replace an unblocked parent when reprocessing - case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: - // The sender seems to be reliably faster than the current parent, so switch to them instead. - updateRoot = true - case sender.port != t.parent: - // Ignore further cases if the sender isn't our parent. - case !reprocessing && !equiv(&sender.locator, &t.data.locator): - // Special case: - // If coords changed, then we need to penalize this node somehow, to prevent flapping. - // First, reset all faster-related info to 0. - // Then, de-parent the node and reprocess all messages to find a new parent. - t.parent = 0 - for _, peer := range t.data.peers { - if peer.port == sender.port { - continue - } - t._handleMsg(&peer.msg, peer.port, true) - } - // Process the sender last, to avoid keeping them as a parent if at all possible. - t._handleMsg(&sender.msg, sender.port, true) - case now.Sub(t.time) < switch_throttle: - // We've already gotten an update from this root recently, so ignore this one to avoid flooding. case sender.locator.tstamp > t.data.locator.tstamp: // The timestamp was updated, so we need to update locally and send to our peers. updateRoot = true From 939ffb02f8dfcfe3ba27cf11be52eee9ababfb1a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 14 Nov 2020 15:05:02 -0600 Subject: [PATCH 10/11] adjust when dht reqs are reset --- src/yggdrasil/dht.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 21ff8472..1f74f4d5 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -89,6 +89,7 @@ func (t *dht) reconfigure() { // Resets the DHT in response to coord changes. // This empties all info from the DHT and drops outstanding requests. func (t *dht) reset() { + t.reqs = make(map[dhtReqKey]time.Time) for _, info := range t.table { if t.isImportant(info) { t.ping(info, nil) From 0ba2ad74fec7d23fcc4aee0788ac8f4de95700be Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 19 Dec 2020 06:03:28 -0600 Subject: [PATCH 11/11] use source routes in the dht (when available) --- src/yggdrasil/dht.go | 20 +++++++++++++++++--- src/yggdrasil/router.go | 6 +++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 21ff8472..2ce18576 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -28,7 +28,8 @@ type dhtInfo struct { recv time.Time // When we last received a message pings int // Time out if at least 3 consecutive maintenance pings drop throttle time.Duration - dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node + path []byte // source route the destination, learned from response rpath + dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node } // Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. @@ -91,7 +92,12 @@ func (t *dht) reconfigure() { func (t *dht) reset() { for _, info := range t.table { if t.isImportant(info) { - t.ping(info, nil) + t.ping(info, nil) // This will source route if a path is already known + if info.path != nil { + // In case the source route died, but the dest coords are still OK... + info.path = nil + t.ping(info, nil) + } } } t.table = make(map[crypto.NodeID]*dhtInfo) @@ -116,6 +122,9 @@ func (t *dht) lookup(nodeID *crypto.NodeID, everything bool) []*dhtInfo { results = newRes results = results[:dht_lookup_size] } + for _, info := range results { + info.dirty = true + } return results } @@ -243,7 +252,7 @@ func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses -func (t *dht) handleRes(res *dhtRes) { +func (t *dht) handleRes(res *dhtRes, rpath []byte) { rq := dhtReqKey{res.Key, res.Dest} if callbacks, isIn := t.callbacks[rq]; isIn { for _, callback := range callbacks { @@ -259,6 +268,7 @@ func (t *dht) handleRes(res *dhtRes) { rinfo := dhtInfo{ key: res.Key, coords: res.Coords, + path: switch_reverseCoordBytes(rpath), } if t.isImportant(&rinfo) { t.insert(&rinfo) @@ -290,6 +300,10 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { Nonce: *nonce, Payload: payload, } + if dest.path != nil { + p.Coords = append([]byte{0}, dest.path...) + p.Offset += 1 + } packet := p.encode() t.router.out(packet) rq := dhtReqKey{dest.key, req.Dest} diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 089c49e7..db81068a 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -206,7 +206,7 @@ func (r *router) _handleProto(packet []byte) { case wire_DHTLookupRequest: r._handleDHTReq(bs, &p.FromKey, p.RPath) case wire_DHTLookupResponse: - r._handleDHTRes(bs, &p.FromKey) + r._handleDHTRes(bs, &p.FromKey, p.RPath) default: } } @@ -237,13 +237,13 @@ func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey, rpath []byt } // Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). -func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) { +func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) { res := dhtRes{} if !res.decode(bs) { return } res.Key = *fromKey - r.dht.handleRes(&res) + r.dht.handleRes(&res, rpath) } // Decodes nodeinfo request