diff --git a/README.md b/README.md index e4f517a4..e44024eb 100644 --- a/README.md +++ b/README.md @@ -11,12 +11,12 @@ It's named Yggdrasil after the world tree from Norse mythology, because that see For a longer, rambling version of this readme with more information, see: [doc](doc/README.md). A very early incomplete draft of a [whitepaper](doc/Whitepaper.md) describing the protocol is also available. -This is a toy / proof-of-principle, so it's not even alpha quality software--any nontrivial update is likely to break backwards compatibility with no possibility for a clean upgrade path. +This is a toy / proof-of-principle, and considered alpha quality by the developers. It's not expected to be feature complete, and future updates may not be backwards compatible, though it should warn you if it sees a connection attempt with a node running a newer version. You're encouraged to play with it, but it is strongly advised not to use it for anything mission critical. ## Building -1. Install Go (tested on 1.9+, [godeb](https://github.com/niemeyer/godeb) is recommended). +1. Install Go (tested on 1.9+, [godeb](https://github.com/niemeyer/godeb) is recommended for debian-based linux distributions). 2. Clone this repository. 2. `./build` @@ -44,10 +44,9 @@ In practice, you probably want to run this instead: This keeps a persistent set of keys (and by extension, IP address) and gives you the option of editing the configuration file. If you want to use it as an overlay network on top of e.g. the internet, then you can do so by adding the remote devices domain/address and port (as a string, e.g. `"1.2.3.4:5678"`) to the list of `Peers` in the configuration file. -You can control whether or not it peers over TCP or UDP by adding `tcp://` or `udp://` to the start of the string, i.e. `"udp://1.2.3.4:5678"`. -It is also possible to route outgoing TCP connections through a socks proxy using the syntax: `"socks://socksHost:socksPort/destHost:destPort"`. -It is currently configured to accept incoming TCP and UDP connections. -In the interest of testing the TCP machinery, it's set to create TCP connections for auto-peering (over link-local IPv6), and to use TCP by default if no transport is specified for a manually configured peer. +By default, it peers over TCP (which can be forced with `"tcp://1.2.3.4:5678"` syntax), but it's also possible to connect over a socks proxy (`"socks://socksHost:socksPort/1.2.3.4:5678"`). +The socks proxy approach is useful for e.g. [peering over tor hidden services](https://github.com/yggdrasil-network/public-peers/blob/master/other/tor.md). +UDP support was removed as part of v0.2, and may be replaced by a better implementation at a later date. ### Platforms diff --git a/VERSION b/VERSION index 49d59571..3b04cfb6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1 +0.2 diff --git a/src/yggdrasil/address.go b/src/yggdrasil/address.go index b5ac73d2..e0baf9ca 100644 --- a/src/yggdrasil/address.go +++ b/src/yggdrasil/address.go @@ -1,10 +1,17 @@ package yggdrasil -type address [16]byte // IPv6 address within the network -type subnet [8]byte // It's a /64 +// address represents an IPv6 address in the yggdrasil address range. +type address [16]byte -var address_prefix = [...]byte{0xfd} // For node addresses + local subnets +// subnet represents an IPv6 /64 subnet in the yggdrasil subnet range. +type subnet [8]byte +// address_prefix is the prefix used for all addresses and subnets in the network. +// The current implementation requires this to be a multiple of 8 bits. +// Nodes that configure this differently will be unable to communicate with eachother, though routing and the DHT machinery *should* still work. +var address_prefix = [...]byte{0xfd} + +// isValid returns true if an address falls within the range used by nodes in the network. func (a *address) isValid() bool { for idx := range address_prefix { if (*a)[idx] != address_prefix[idx] { @@ -14,6 +21,7 @@ func (a *address) isValid() bool { return (*a)[len(address_prefix)]&0x80 == 0 } +// isValid returns true if a prefix falls within the range usable by the network. func (s *subnet) isValid() bool { for idx := range address_prefix { if (*s)[idx] != address_prefix[idx] { @@ -23,6 +31,11 @@ func (s *subnet) isValid() bool { return (*s)[len(address_prefix)]&0x80 != 0 } +// address_addrForNodeID takes a *NodeID as an argument and returns an *address. +// This address begins with the address prefix. +// The next bit is 0 for an address, and 1 for a subnet. +// The following 7 bits are set to the number of leading 1 bits in the NodeID. +// The NodeID, excluding the leading 1 bits and the first leading 1 bit, is truncated to the appropriate length and makes up the remainder of the address. func address_addrForNodeID(nid *NodeID) *address { // 128 bit address // Begins with prefix @@ -59,6 +72,11 @@ func address_addrForNodeID(nid *NodeID) *address { return &addr } +// address_subnetForNodeID takes a *NodeID as an argument and returns a *subnet. +// This subnet begins with the address prefix. +// The next bit is 0 for an address, and 1 for a subnet. +// The following 7 bits are set to the number of leading 1 bits in the NodeID. +// The NodeID, excluding the leading 1 bits and the first leading 1 bit, is truncated to the appropriate length and makes up the remainder of the subnet. func address_subnetForNodeID(nid *NodeID) *subnet { // Exactly as the address version, with two exceptions: // 1) The first bit after the fixed prefix is a 1 instead of a 0 @@ -70,6 +88,10 @@ func address_subnetForNodeID(nid *NodeID) *subnet { return &snet } +// getNodeIDandMask returns two *NodeID. +// The first is a NodeID with all the bits known from the address set to their correct values. +// The second is a bitmask with 1 bit set for each bit that was known from the address. +// This is used to look up NodeIDs in the DHT and tell if they match an address. func (a *address) getNodeIDandMask() (*NodeID, *NodeID) { // Mask is a bitmask to mark the bits visible from the address // This means truncated leading 1s, first leading 0, and visible part of addr @@ -95,6 +117,10 @@ func (a *address) getNodeIDandMask() (*NodeID, *NodeID) { return &nid, &mask } +// getNodeIDandMask returns two *NodeID. +// The first is a NodeID with all the bits known from the address set to their correct values. +// The second is a bitmask with 1 bit set for each bit that was known from the subnet. +// This is used to look up NodeIDs in the DHT and tell if they match a subnet. func (s *subnet) getNodeIDandMask() (*NodeID, *NodeID) { // As with the address version, but visible parts of the subnet prefix instead var nid NodeID diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 01ed93a5..b32e20de 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -29,17 +29,21 @@ type admin_handlerInfo struct { handler func(admin_info) (admin_info, error) // First is input map, second is output } -// Maps things like "IP", "port", "bucket", or "coords" onto strings +// admin_pair maps things like "IP", "port", "bucket", or "coords" onto values. type admin_pair struct { key string val interface{} } + +// admin_nodeInfo represents the information we know about a node for an admin response. type admin_nodeInfo []admin_pair +// addHandler is called for each admin function to add the handler and help documentation to the API. func (a *admin) addHandler(name string, args []string, handler func(admin_info) (admin_info, error)) { a.handlers = append(a.handlers, admin_handlerInfo{name, args, handler}) } +// init runs the initial admin setup. func (a *admin) init(c *Core, listenaddr string) { a.core = c a.listenaddr = listenaddr @@ -215,11 +219,13 @@ func (a *admin) init(c *Core, listenaddr string) { }) } +// start runs the admin API socket to listen for / respond to admin API calls. func (a *admin) start() error { go a.listen() return nil } +// listen is run by start and manages API connections. func (a *admin) listen() { l, err := net.Listen("tcp", a.listenaddr) if err != nil { @@ -236,6 +242,7 @@ func (a *admin) listen() { } } +// handleRequest calls the request handler for each request sent to the admin API. func (a *admin) handleRequest(conn net.Conn) { decoder := json.NewDecoder(conn) encoder := json.NewEncoder(conn) @@ -328,6 +335,7 @@ func (a *admin) handleRequest(conn net.Conn) { } } +// asMap converts an admin_nodeInfo into a map of key/value pairs. func (n *admin_nodeInfo) asMap() map[string]interface{} { m := make(map[string]interface{}, len(*n)) for _, p := range *n { @@ -336,6 +344,7 @@ func (n *admin_nodeInfo) asMap() map[string]interface{} { return m } +// toString creates a printable string representation of an admin_nodeInfo. func (n *admin_nodeInfo) toString() string { // TODO return something nicer looking than this var out []string @@ -346,6 +355,7 @@ func (n *admin_nodeInfo) toString() string { return fmt.Sprint(*n) } +// printInfos returns a newline separated list of strings from admin_nodeInfos, e.g. a printable string of info about all peers. func (a *admin) printInfos(infos []admin_nodeInfo) string { var out []string for _, info := range infos { @@ -355,6 +365,7 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string { return strings.Join(out, "\n") } +// addPeer triggers a connection attempt to a node. func (a *admin) addPeer(addr string) error { u, err := url.Parse(addr) if err == nil { @@ -378,6 +389,7 @@ func (a *admin) addPeer(addr string) error { return nil } +// removePeer disconnects an existing node (given by the node's port number). func (a *admin) removePeer(p string) error { iport, err := strconv.Atoi(p) if err != nil { @@ -387,6 +399,7 @@ func (a *admin) removePeer(p string) error { return nil } +// startTunWithMTU creates the tun/tap device, sets its address, and sets the MTU to the provided value. func (a *admin) startTunWithMTU(ifname string, iftapmode bool, ifmtu int) error { // Close the TUN first if open _ = a.core.tun.close() @@ -415,6 +428,7 @@ func (a *admin) startTunWithMTU(ifname string, iftapmode bool, ifmtu int) error return nil } +// getData_getSelf returns the self node's info for admin responses. func (a *admin) getData_getSelf() *admin_nodeInfo { table := a.core.switchTable.table.Load().(lookupTable) coords := table.self.getCoords() @@ -426,6 +440,7 @@ func (a *admin) getData_getSelf() *admin_nodeInfo { return &self } +// getData_getPeers returns info from Core.peers for an admin response. func (a *admin) getData_getPeers() []admin_nodeInfo { ports := a.core.peers.ports.Load().(map[switchPort]*peer) var peerInfos []admin_nodeInfo @@ -449,6 +464,7 @@ func (a *admin) getData_getPeers() []admin_nodeInfo { return peerInfos } +// getData_getSwitchPeers returns info from Core.switchTable for an admin response. func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { var peerInfos []admin_nodeInfo table := a.core.switchTable.table.Load().(lookupTable) @@ -470,6 +486,7 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo { return peerInfos } +// getData_getDHT returns info from Core.dht for an admin response. func (a *admin) getData_getDHT() []admin_nodeInfo { var infos []admin_nodeInfo now := time.Now() @@ -497,6 +514,7 @@ func (a *admin) getData_getDHT() []admin_nodeInfo { return infos } +// getData_getSessions returns info from Core.sessions for an admin response. func (a *admin) getData_getSessions() []admin_nodeInfo { var infos []admin_nodeInfo getSessions := func() { @@ -517,6 +535,7 @@ func (a *admin) getData_getSessions() []admin_nodeInfo { return infos } +// getAllowedEncryptionPublicKeys returns the public keys permitted for incoming peer connections. func (a *admin) getAllowedEncryptionPublicKeys() []string { pubs := a.core.peers.getAllowedEncryptionPublicKeys() var out []string @@ -526,6 +545,7 @@ func (a *admin) getAllowedEncryptionPublicKeys() []string { return out } +// addAllowedEncryptionPublicKey whitelists a key for incoming peer connections. func (a *admin) addAllowedEncryptionPublicKey(bstr string) (err error) { boxBytes, err := hex.DecodeString(bstr) if err == nil { @@ -536,6 +556,8 @@ func (a *admin) addAllowedEncryptionPublicKey(bstr string) (err error) { return } +// removeAllowedEncryptionPublicKey removes a key from the whitelist for incoming peer connections. +// If none are set, an empty list permits all incoming connections. func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) { boxBytes, err := hex.DecodeString(bstr) if err == nil { @@ -546,6 +568,9 @@ func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) { return } +// getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. +// This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. +// The graph is structured as a tree with directed links leading away from the root. func (a *admin) getResponse_dot() []byte { self := a.getData_getSelf() peers := a.getData_getSwitchPeers() diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index 33933914..3d341e71 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -23,12 +23,20 @@ import "time" //import "fmt" -// Maximum size for buckets and lookups -// Exception for buckets if the next one is non-full -const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed -const dht_bucket_size = 2 // This should be at least 2 -const dht_lookup_size = 16 // This should be at least 1, below 2 is impractical +// Number of DHT buckets, equal to the number of bits in a NodeID. +// Note that, in practice, nearly all of these will be empty. +const dht_bucket_number = 8 * NodeIDLen +// Number of nodes to keep in each DHT bucket. +// Additional entries may be kept for peers, for bootstrapping reasons, if they don't already have an entry in the bucket. +const dht_bucket_size = 2 + +// Number of responses to include in a lookup. +// If extras are given, they will be truncated from the response handler to prevent abuse. +const dht_lookup_size = 16 + +// dhtInfo represents everything we know about a node in the DHT. +// This includes its key, a cache of it's NodeID, coords, and timing/ping related info for deciding who/when to ping nodes for maintenance. type dhtInfo struct { nodeID_hidden *NodeID key boxPubKey @@ -39,6 +47,7 @@ type dhtInfo struct { throttle uint8 // Number of seconds to wait before pinging a node to bootstrap buckets, gradually increases up to 1 minute } +// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times. func (info *dhtInfo) getNodeID() *NodeID { if info.nodeID_hidden == nil { info.nodeID_hidden = getNodeID(&info.key) @@ -46,17 +55,23 @@ func (info *dhtInfo) getNodeID() *NodeID { return info.nodeID_hidden } +// The nodes we known in a bucket (a region of keyspace with a matching prefix of some length). type bucket struct { peers []*dhtInfo other []*dhtInfo } +// Request for a node to do a lookup. +// Includes our key and coords so they can send a response back, and the destination NodeID we want to ask about. type dhtReq struct { Key boxPubKey // Key of whoever asked Coords []byte // Coords of whoever asked Dest NodeID // NodeID they're asking about } +// Response to a DHT lookup. +// Includes the key and coords of the node that's responding, and the destination they were asked about. +// The main part is Infos []*dhtInfo, the lookup response. type dhtRes struct { Key boxPubKey // key to respond to Coords []byte // coords to respond to @@ -64,11 +79,16 @@ type dhtRes struct { Infos []*dhtInfo // response } +// Information about a node, either taken from our table or from a lookup response. +// Used to schedule pings at a later time (they're throttled to 1/second for background maintenance traffic). type dht_rumor struct { info *dhtInfo target *NodeID } +// The main DHT struct. +// Includes a slice of buckets, to organize known nodes based on their region of keyspace. +// Also includes information about outstanding DHT requests and the rumor mill of nodes to ping at some point. type dht struct { core *Core nodeID NodeID @@ -79,6 +99,7 @@ type dht struct { rumorMill []dht_rumor } +// Initializes the DHT. func (t *dht) init(c *Core) { t.core = c t.nodeID = *t.core.GetNodeID() @@ -86,6 +107,8 @@ func (t *dht) init(c *Core) { t.reqs = make(map[boxPubKey]map[NodeID]time.Time) } +// Reads a request, performs a lookup, and responds. +// If the node that sent the request isn't in our DHT, but should be, then we add them. func (t *dht) handleReq(req *dhtReq) { // Send them what they asked for loc := t.core.switchTable.getLocator() @@ -106,6 +129,8 @@ func (t *dht) handleReq(req *dhtReq) { //if req.dest != t.nodeID { t.ping(&info, info.getNodeID()) } // Or spam... } +// Reads a lookup response, checks that we had sent a matching request, and processes the response info. +// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and adding the response info to the rumor mill. func (t *dht) handleRes(res *dhtRes) { t.core.searches.handleDHTRes(res) reqs, isIn := t.reqs[res.Key] @@ -157,6 +182,7 @@ func (t *dht) handleRes(res *dhtRes) { } } +// Does a DHT lookup and returns the results, sorted in ascending order of distance from the destination. func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo { // FIXME this allocates a bunch, sorts, and keeps the part it likes // It would be better to only track the part it likes to begin with @@ -192,14 +218,18 @@ func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo { return res } +// Gets the bucket for a specified matching prefix length. func (t *dht) getBucket(bidx int) *bucket { return &t.buckets_hidden[bidx] } +// Lists the number of buckets. func (t *dht) nBuckets() int { return len(t.buckets_hidden) } +// Inserts a node into the DHT if they meet certain requirements. +// In particular, they must either be a peer that's not already in the DHT, or else be someone we should insert into the DHT (see: shouldInsert). func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords) // Insert if no "other" entry already exists @@ -219,6 +249,7 @@ func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) { } } +// Adds a node to the DHT, possibly removing another node in the process. func (t *dht) insert(info *dhtInfo, isPeer bool) { //fmt.Println("DEBUG: dht insert:", info.getNodeID(), info.coords) // First update the time on this info @@ -253,6 +284,7 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) { } } +// Gets the bucket index for the bucket where we would put the given NodeID. func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) { for bidx := 0; bidx < t.nBuckets(); bidx++ { them := nodeID[bidx/8] & (0x80 >> byte(bidx%8)) @@ -264,6 +296,8 @@ func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) { return t.nBuckets(), false } +// Helper called by containsPeer, containsOther, and contains. +// Returns true if a node with the same ID *and coords* is already in the given part of the bucket. func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool { // Compares if key and coords match if newInfo == nil { @@ -293,18 +327,22 @@ func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool { return false } +// Calls bucket_check over the bucket's peers infos. func (b *bucket) containsPeer(info *dhtInfo) bool { return dht_bucket_check(info, b.peers) } +// Calls bucket_check over the bucket's other info. func (b *bucket) containsOther(info *dhtInfo) bool { return dht_bucket_check(info, b.other) } +// returns containsPeer || containsOther func (b *bucket) contains(info *dhtInfo) bool { return b.containsPeer(info) || b.containsOther(info) } +// Removes a node with the corresponding key, if any, from a bucket. func (b *bucket) drop(key *boxPubKey) { clean := func(infos []*dhtInfo) []*dhtInfo { cleaned := infos[:0] @@ -320,6 +358,7 @@ func (b *bucket) drop(key *boxPubKey) { b.other = clean(b.other) } +// Sends a lookup request to the specified node. func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { // Send a dhtReq to the node in dhtInfo bs := req.encode() @@ -345,6 +384,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) { reqsToDest[req.Dest] = time.Now() } +// Sends a lookup response to the specified node. func (t *dht) sendRes(res *dhtRes, req *dhtReq) { // Send a reply for a dhtReq bs := res.encode() @@ -361,10 +401,14 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) { t.core.router.out(packet) } +// Returns true of a bucket contains no peers and no other nodes. func (b *bucket) isEmpty() bool { return len(b.peers)+len(b.other) == 0 } +// Gets the next node that should be pinged from the bucket. +// There's a cooldown of 6 seconds between ping attempts for each node, to give them time to respond. +// It returns the least recently pinged node, subject to that send cooldown. func (b *bucket) nextToPing() *dhtInfo { // Check the nodes in the bucket // Return whichever one responded least recently @@ -387,12 +431,16 @@ func (b *bucket) nextToPing() *dhtInfo { return toPing } +// Returns a useful target address to ask about for pings. +// Equal to the our node's ID, except for exactly 1 bit at the bucket index. func (t *dht) getTarget(bidx int) *NodeID { targetID := t.nodeID targetID[bidx/8] ^= 0x80 >> byte(bidx%8) return &targetID } +// Sends a ping to a node, or removes the node if it has failed to respond to too many pings. +// If target is nil, we will ask the node about our own NodeID. func (t *dht) ping(info *dhtInfo, target *NodeID) { if info.pings > 2 { bidx, isOK := t.getBucketIndex(info.getNodeID()) @@ -418,6 +466,8 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) { t.sendReq(&req, info) } +// Adds a node info and target to the rumor mill. +// The node will be asked about the target at a later point, if doing so would still be useful at the time. func (t *dht) addToMill(info *dhtInfo, target *NodeID) { rumor := dht_rumor{ info: info, @@ -426,6 +476,11 @@ func (t *dht) addToMill(info *dhtInfo, target *NodeID) { t.rumorMill = append(t.rumorMill, rumor) } +// Regular periodic maintenance. +// If the mill is empty, it adds two pings to the rumor mill. +// The first is to the node that responded least recently, provided that it's been at least 1 minute, to make sure we eventually detect and remove unresponsive nodes. +// The second is used for bootstrapping, and attempts to fill some bucket, iterating over buckets and resetting after it hits the last non-empty one. +// If the mill is not empty, it pops nodes from the mill until it finds one that would be useful to ping (see: shouldInsert), and then pings it. func (t *dht) doMaintenance() { // First clean up reqs for key, reqs := range t.reqs { @@ -489,6 +544,8 @@ func (t *dht) doMaintenance() { } } +// Returns true if it would be worth pinging the specified node. +// This requires that the bucket doesn't already contain the node, and that either the bucket isn't full yet or the node is closer to us in keyspace than some other node in that bucket. func (t *dht) shouldInsert(info *dhtInfo) bool { bidx, isOK := t.getBucketIndex(info.getNodeID()) if !isOK { @@ -509,6 +566,7 @@ func (t *dht) shouldInsert(info *dhtInfo) bool { return false } +// Returns true if the keyspace distance between the first and second node is smaller than the keyspace distance between the second and third node. func dht_firstCloserThanThird(first *NodeID, second *NodeID, third *NodeID) bool { @@ -523,6 +581,10 @@ func dht_firstCloserThanThird(first *NodeID, return false } +// Resets the DHT in response to coord changes. +// This empties all buckets, resets the bootstrapping cycle to 0, and empties the rumor mill. +// It adds all old "other" node info to the rumor mill, so they'll be pinged quickly. +// If those nodes haven't also changed coords, then this is a relatively quick way to notify those nodes of our new coords and re-add them to our own DHT if they respond. func (t *dht) reset() { // This is mostly so bootstrapping will reset to resend coords into the network t.offset = 0 diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 2a322cf7..5e522f36 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -4,19 +4,16 @@ package yggdrasil // Commented code should be removed // Live code should be better commented -// FIXME (!) this part may be at least sligtly vulnerable to replay attacks -// The switch message part should catch / drop old tstamps -// So the damage is limited -// But you could still mess up msgAnc / msgHops and break some things there -// It needs to ignore messages with a lower seq -// Probably best to start setting seq to a timestamp in that case... - import "time" import "sync" import "sync/atomic" //import "fmt" +// The peers struct represents peers with an active connection. +// Incomping packets are passed to the corresponding peer, which handles them somehow. +// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. +// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { core *Core mutex sync.Mutex // Synchronize writes to atomic @@ -26,6 +23,7 @@ type peers struct { allowedEncryptionPublicKeys map[boxPubKey]struct{} } +// Initializes the peers struct. func (ps *peers) init(c *Core) { ps.mutex.Lock() defer ps.mutex.Unlock() @@ -34,6 +32,7 @@ func (ps *peers) init(c *Core) { ps.allowedEncryptionPublicKeys = make(map[boxPubKey]struct{}) } +// Returns true if an incoming peer connection to a key is allowed, either because the key is in the whitelist or because the whitelist is empty. func (ps *peers) isAllowedEncryptionPublicKey(box *boxPubKey) bool { ps.authMutex.RLock() defer ps.authMutex.RUnlock() @@ -41,18 +40,21 @@ func (ps *peers) isAllowedEncryptionPublicKey(box *boxPubKey) bool { return isIn || len(ps.allowedEncryptionPublicKeys) == 0 } +// Adds a key to the whitelist. func (ps *peers) addAllowedEncryptionPublicKey(box *boxPubKey) { ps.authMutex.Lock() defer ps.authMutex.Unlock() ps.allowedEncryptionPublicKeys[*box] = struct{}{} } +// Removes a key from the whitelist. func (ps *peers) removeAllowedEncryptionPublicKey(box *boxPubKey) { ps.authMutex.Lock() defer ps.authMutex.Unlock() delete(ps.allowedEncryptionPublicKeys, *box) } +// Gets the whitelist of allowed keys for incoming connections. func (ps *peers) getAllowedEncryptionPublicKeys() []boxPubKey { ps.authMutex.RLock() defer ps.authMutex.RUnlock() @@ -63,14 +65,17 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []boxPubKey { return keys } +// Atomically gets a map[switchPort]*peer of known peers. func (ps *peers) getPorts() map[switchPort]*peer { return ps.ports.Load().(map[switchPort]*peer) } +// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time). func (ps *peers) putPorts(ports map[switchPort]*peer) { ps.ports.Store(ports) } +// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure. type peer struct { queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers @@ -90,14 +95,17 @@ type peer struct { close func() // Called when a peer is removed, to close the underlying connection, or via admin api } +// Size of the queue of packets to be sent to the node. func (p *peer) getQueueSize() int64 { return atomic.LoadInt64(&p.queueSize) } +// Used to increment or decrement the queue. func (p *peer) updateQueueSize(delta int64) { atomic.AddInt64(&p.queueSize, delta) } +// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer { now := time.Now() p := peer{box: *box, @@ -125,12 +133,13 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKe return &p } +// Removes a peer for a given port, if one exists. func (ps *peers) removePeer(port switchPort) { if port == 0 { return } // Can't remove self peer ps.core.router.doAdmin(func() { - ps.core.switchTable.removePeer(port) + ps.core.switchTable.unlockedRemovePeer(port) }) ps.mutex.Lock() oldPorts := ps.getPorts() @@ -150,6 +159,8 @@ func (ps *peers) removePeer(port switchPort) { } } +// If called, sends a notification to each peer that they should send a new switch message. +// Mainly called by the switch after an update. func (ps *peers) sendSwitchMsgs() { ports := ps.getPorts() for _, p := range ports { @@ -163,6 +174,8 @@ func (ps *peers) sendSwitchMsgs() { } } +// This must be launched in a separate goroutine by whatever sets up the peer struct. +// It handles link protocol traffic. func (p *peer) linkLoop() { go func() { p.doSend <- struct{}{} }() tick := time.NewTicker(time.Second) @@ -182,6 +195,8 @@ func (p *peer) linkLoop() { } } +// Called to handle incoming packets. +// Passes the packet to a handler for that packet type. func (p *peer) handlePacket(packet []byte) { // TODO See comment in sendPacket about atomics technically being done wrong atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) @@ -197,10 +212,12 @@ func (p *peer) handlePacket(packet []byte) { case wire_LinkProtocolTraffic: p.handleLinkTraffic(packet) default: - return + util_putBytes(packet) } } +// Called to handle traffic or protocolTraffic packets. +// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) handleTraffic(packet []byte, pTypeLen int) { if p.port != 0 && p.dinfo == nil { // Drop traffic until the peer manages to send us at least one good switchMsg @@ -221,14 +238,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { to.sendPacket(packet) } +// This just calls p.out(packet) for now. func (p *peer) sendPacket(packet []byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? p.out(packet) - // TODO this should really happen at the interface, to account for LIFO packet drops and additional per-packet/per-message overhead, but this should be pretty close... better to move it to the tcp/udp stuff *after* rewriting both to give a common interface - atomic.AddUint64(&p.bytesSent, uint64(len(packet))) } +// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. +// It sends it to p.linkOut, which bypasses the usual packet queues. func (p *peer) sendLinkPacket(packet []byte) { innerPayload, innerNonce := boxSeal(&p.linkShared, packet, nil) innerLinkPacket := wire_linkProtoTrafficPacket{ @@ -245,6 +263,8 @@ func (p *peer) sendLinkPacket(packet []byte) { p.linkOut <- packet } +// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. +// Identifies the link traffic type and calls the appropriate handler. func (p *peer) handleLinkTraffic(bs []byte) { packet := wire_linkProtoTrafficPacket{} if !packet.decode(bs) { @@ -269,10 +289,12 @@ func (p *peer) handleLinkTraffic(bs []byte) { switch pType { case wire_SwitchMsg: p.handleSwitchMsg(payload) - default: // TODO?... + default: + util_putBytes(bs) } } +// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. func (p *peer) sendSwitchMsg() { msg := p.core.switchTable.getMsg() if msg == nil { @@ -290,6 +312,8 @@ func (p *peer) sendSwitchMsg() { p.sendLinkPacket(packet) } +// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch. +// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins). func (p *peer) handleSwitchMsg(packet []byte) { var msg switchMsg if !msg.decode(packet) { @@ -330,6 +354,8 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.dinfo = &dinfo } +// This generates the bytes that we sign or check the signature of for a switchMsg. +// It begins with the next node's key, followed by the root and the timetsamp, followed by coords being advertised to the next node. func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte { var loc switchLocator for _, hop := range msg.Hops { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 9e48668e..4f15dea3 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -29,6 +29,8 @@ import "golang.org/x/net/ipv6" //import "fmt" //import "net" +// The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. +// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { core *Core addr address @@ -40,6 +42,7 @@ type router struct { admin chan func() // pass a lambda for the admin socket to query stuff } +// Initializes the router struct, which includes setting up channels to/from the tun/tap. func (r *router) init(core *Core) { r.core = core r.addr = *address_addrForNodeID(&r.core.dht.nodeID) @@ -67,12 +70,17 @@ func (r *router) init(core *Core) { // go r.mainLoop() } +// Starts the mainLoop goroutine. func (r *router) start() error { r.core.log.Println("Starting router") go r.mainLoop() return nil } +// Takes traffic from the tun/tap and passes it to router.send, or from r.in and handles incoming traffic. +// Also adds new peer info to the DHT. +// Also resets the DHT and sesssions in the event of a coord change. +// Also does periodic maintenance stuff. func (r *router) mainLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -102,6 +110,11 @@ func (r *router) mainLoop() { } } +// Checks a packet's to/from address to make sure it's in the allowed range. +// If a session to the destination exists, gets the session and passes the packet to it. +// If no session exists, it triggers (or continues) a search. +// If the session hasn't responded recently, it triggers a ping or search to keep things alive or deal with broken coords *relatively* quickly. +// It also deals with oversized packets if there are MTU issues by calling into icmpv6.go to spoof PacketTooBig traffic, or DestinationUnreachable if the other side has their tun/tap disabled. func (r *router) sendPacket(bs []byte) { if len(bs) < 40 { panic("Tried to send a packet shorter than a header...") @@ -226,6 +239,8 @@ func (r *router) sendPacket(bs []byte) { } } +// Called for incoming traffic by the session worker for that connection. +// Checks that the IP address is correct (matches the session) and passes the packet to the tun/tap. func (r *router) recvPacket(bs []byte, theirAddr *address, theirSubnet *subnet) { // Note: called directly by the session worker, not the router goroutine //fmt.Println("Recv packet") @@ -248,6 +263,7 @@ func (r *router) recvPacket(bs []byte, theirAddr *address, theirSubnet *subnet) r.recv <- bs } +// Checks incoming traffic type and passes it to the appropriate handler. func (r *router) handleIn(packet []byte) { pType, pTypeLen := wire_decode_uint64(packet) if pTypeLen == 0 { @@ -262,6 +278,8 @@ func (r *router) handleIn(packet []byte) { } } +// Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets. +// Passes them to the crypto session worker to be decrypted and sent to the tun/tap. func (r *router) handleTraffic(packet []byte) { defer util_putBytes(packet) p := wire_trafficPacket{} @@ -276,6 +294,7 @@ func (r *router) handleTraffic(packet []byte) { sinfo.recv <- &p } +// Handles protocol traffic by decrypting it, checking its type, and passing it to the appropriate handler for that traffic type. func (r *router) handleProto(packet []byte) { // First parse the packet p := wire_protoTrafficPacket{} @@ -312,11 +331,12 @@ func (r *router) handleProto(packet []byte) { r.handleDHTReq(bs, &p.FromKey) case wire_DHTLookupResponse: r.handleDHTRes(bs, &p.FromKey) - default: /*panic("Should not happen in testing") ;*/ - return + default: + util_putBytes(packet) } } +// Decodes session pings from wire format and passes them to sessions.handlePing where they either create or update a session. func (r *router) handlePing(bs []byte, fromKey *boxPubKey) { ping := sessionPing{} if !ping.decode(bs) { @@ -326,10 +346,12 @@ func (r *router) handlePing(bs []byte, fromKey *boxPubKey) { r.core.sessions.handlePing(&ping) } +// Handles session pongs (which are really pings with an extra flag to prevent acknowledgement). func (r *router) handlePong(bs []byte, fromKey *boxPubKey) { r.handlePing(bs, fromKey) } +// Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response. func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { req := dhtReq{} if !req.decode(bs) { @@ -339,6 +361,7 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) { r.core.dht.handleReq(&req) } +// Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { res := dhtRes{} if !res.decode(bs) { @@ -348,6 +371,9 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) { r.core.dht.handleRes(&res) } +// Passed a function to call. +// This will send the function to r.admin and block until it finishes. +// It's used by the admin socket to ask the router mainLoop goroutine about information in the session or dht structs, which cannot be read safely from outside that goroutine. func (r *router) doAdmin(f func()) { // Pass this a function that needs to be run by the router's main goroutine // It will pass the function to the router and wait for the router to finish diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index d9dc76e3..772b3848 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -16,9 +16,16 @@ import "time" //import "fmt" +// This defines the maximum number of dhtInfo that we keep track of for nodes to query in an ongoing search. const search_MAX_SEARCH_SIZE = 16 + +// This defines the time after which we send a new search packet. +// Search packets are sent automatically immediately after a response is received. +// So this allows for timeouts and for long searches to become increasingly parallel. const search_RETRY_TIME = time.Second +// Information about an ongoing search. +// Includes the targed NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. type searchInfo struct { dest NodeID mask NodeID @@ -28,16 +35,19 @@ type searchInfo struct { visited map[NodeID]bool } +// This stores a map of active searches. type searches struct { core *Core searches map[NodeID]*searchInfo } +// Intializes the searches struct. func (s *searches) init(core *Core) { s.core = core s.searches = make(map[NodeID]*searchInfo) } +// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { now := time.Now() for dest, sinfo := range s.searches { @@ -56,6 +66,9 @@ func (s *searches) createSearch(dest *NodeID, mask *NodeID) *searchInfo { //////////////////////////////////////////////////////////////////////////////// +// Checks if there's an ongoing search relaed to a dhtRes. +// If there is, it adds the response info to the search and triggers a new search step. +// If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. func (s *searches) handleDHTRes(res *dhtRes) { sinfo, isIn := s.searches[res.Dest] if !isIn || s.checkDHTRes(sinfo, res) { @@ -68,6 +81,10 @@ func (s *searches) handleDHTRes(res *dhtRes) { } } +// Adds the information from a dhtRes to an ongoing search. +// Info about a node that has already been visited is not re-added to the search. +// Duplicate information about nodes toVisit is deduplicated (the newest information is kept). +// The toVisit list is sorted in ascending order of keyspace distance from the destination. func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} @@ -98,6 +115,8 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { } } +// If there are no nodes left toVisit, then this cleans up the search. +// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. func (s *searches) doSearchStep(sinfo *searchInfo) { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup @@ -115,6 +134,8 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { } } +// If we've recenty sent a ping for this search, do nothing. +// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. func (s *searches) continueSearch(sinfo *searchInfo) { if time.Since(sinfo.time) < search_RETRY_TIME { return @@ -137,6 +158,7 @@ func (s *searches) continueSearch(sinfo *searchInfo) { }() } +// Calls create search, and initializes the iterative search parts of the struct before returning it. func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { sinfo := s.createSearch(dest, mask) sinfo.toVisit = s.core.dht.lookup(dest, true) @@ -144,6 +166,9 @@ func (s *searches) newIterSearch(dest *NodeID, mask *NodeID) *searchInfo { return sinfo } +// Checks if a dhtRes is good (called by handleDHTRes). +// If the response is from the target, get/create a session, trigger a session ping, and return true. +// Otherwise return false. func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { them := getNodeID(&res.Key) var destMasked NodeID diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 33111424..01bfb959 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -6,6 +6,8 @@ package yggdrasil import "time" +// All the information we know about an active session. +// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { core *Core theirAddr address @@ -37,6 +39,7 @@ type sessionInfo struct { bytesRecvd uint64 // Bytes of real traffic received in this session } +// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. type sessionPing struct { SendPermPub boxPubKey // Sender's permanent key Handle handle // Random number to ID session @@ -47,7 +50,8 @@ type sessionPing struct { MTU uint16 } -// Returns true if the session was updated, false otherwise +// Updates session info in response to a ping, after checking that the ping is OK. +// Returns true if the session was updated, or false otherwise. func (s *sessionInfo) update(p *sessionPing) bool { if !(p.Tstamp > s.tstamp) { // To protect against replay attacks @@ -76,10 +80,14 @@ func (s *sessionInfo) update(p *sessionPing) bool { return true } +// Returns true if the session has been idle for longer than the allowed timeout. func (s *sessionInfo) timedout() bool { return time.Since(s.time) > time.Minute } +// Struct of all active sessions. +// Sessions are indexed by handle. +// Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { core *Core // Maps known permanent keys to their shared key, used by DHT a lot @@ -94,6 +102,7 @@ type sessions struct { subnetToPerm map[subnet]*boxPubKey } +// Initializes the session struct. func (ss *sessions) init(core *Core) { ss.core = core ss.permShared = make(map[boxPubKey]*boxSharedKey) @@ -104,6 +113,7 @@ func (ss *sessions) init(core *Core) { ss.subnetToPerm = make(map[subnet]*boxPubKey) } +// Gets the session corresponding to a given handle. func (ss *sessions) getSessionForHandle(handle *handle) (*sessionInfo, bool) { sinfo, isIn := ss.sinfos[*handle] if isIn && sinfo.timedout() { @@ -113,6 +123,7 @@ func (ss *sessions) getSessionForHandle(handle *handle) (*sessionInfo, bool) { return sinfo, isIn } +// Gets a session corresponding to an ephemeral session key used by this node. func (ss *sessions) getByMySes(key *boxPubKey) (*sessionInfo, bool) { h, isIn := ss.byMySes[*key] if !isIn { @@ -122,6 +133,7 @@ func (ss *sessions) getByMySes(key *boxPubKey) (*sessionInfo, bool) { return sinfo, isIn } +// Gets a session corresponding to a permanent key used by the remote node. func (ss *sessions) getByTheirPerm(key *boxPubKey) (*sessionInfo, bool) { h, isIn := ss.byTheirPerm[*key] if !isIn { @@ -131,6 +143,7 @@ func (ss *sessions) getByTheirPerm(key *boxPubKey) (*sessionInfo, bool) { return sinfo, isIn } +// Gets a session corresponding to an IPv6 address used by the remote node. func (ss *sessions) getByTheirAddr(addr *address) (*sessionInfo, bool) { p, isIn := ss.addrToPerm[*addr] if !isIn { @@ -140,6 +153,7 @@ func (ss *sessions) getByTheirAddr(addr *address) (*sessionInfo, bool) { return sinfo, isIn } +// Gets a session corresponding to an IPv6 /64 subnet used by the remote node/network. func (ss *sessions) getByTheirSubnet(snet *subnet) (*sessionInfo, bool) { p, isIn := ss.subnetToPerm[*snet] if !isIn { @@ -149,6 +163,8 @@ func (ss *sessions) getByTheirSubnet(snet *subnet) (*sessionInfo, bool) { return sinfo, isIn } +// Creates a new session and lazily cleans up old/timedout existing sessions. +// This includse initializing session info to sane defaults (e.g. lowest supported MTU). func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { sinfo := sessionInfo{} sinfo.core = ss.core @@ -201,6 +217,7 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo { return &sinfo } +// Closes a session, removing it from sessions maps and killing the worker goroutine. func (sinfo *sessionInfo) close() { delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.byMySes, sinfo.mySesPub) @@ -211,6 +228,7 @@ func (sinfo *sessionInfo) close() { close(sinfo.recv) } +// Returns a session ping appropriate for the given session info. func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing { loc := ss.core.switchTable.getLocator() coords := loc.getCoords() @@ -226,6 +244,9 @@ func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing { return ref } +// Gets the shared key for a pair of box keys. +// Used to cache recently used shared keys for protocol traffic. +// This comes up with dht req/res and session ping/pong traffic. func (ss *sessions) getSharedKey(myPriv *boxPrivKey, theirPub *boxPubKey) *boxSharedKey { if skey, isIn := ss.permShared[*theirPub]; isIn { @@ -244,10 +265,13 @@ func (ss *sessions) getSharedKey(myPriv *boxPrivKey, return ss.permShared[*theirPub] } +// Sends a session ping by calling sendPingPong in ping mode. func (ss *sessions) ping(sinfo *sessionInfo) { ss.sendPingPong(sinfo, false) } +// Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. +// Updates the time the last ping was sent in the session info. func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { ping := ss.getPing(sinfo) ping.IsPong = isPong @@ -268,6 +292,8 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { } } +// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. +// If the session has a packet cached (common when first setting up a session), it will be sent. func (ss *sessions) handlePing(ping *sessionPing) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) @@ -296,6 +322,9 @@ func (ss *sessions) handlePing(ping *sessionPing) { } } +// Used to subtract one nonce from another, staying in the range +- 64. +// This is used by the nonce progression machinery to advance the bitmask of recently received packets (indexed by nonce), or to check the appropriate bit of the bitmask. +// It's basically part of the machinery that prevents replays and duplicate packets. func (n *boxNonce) minus(m *boxNonce) int64 { diff := int64(0) for idx := range n { @@ -311,6 +340,9 @@ func (n *boxNonce) minus(m *boxNonce) int64 { return diff } +// Get the MTU of the session. +// Will be equal to the smaller of this node's MTU or the remote node's MTU. +// If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280. func (sinfo *sessionInfo) getMTU() uint16 { if sinfo.theirMTU == 0 || sinfo.myMTU == 0 { return 0 @@ -321,6 +353,7 @@ func (sinfo *sessionInfo) getMTU() uint16 { return sinfo.myMTU } +// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. func (sinfo *sessionInfo) nonceIsOK(theirNonce *boxNonce) bool { // The bitmask is to allow for some non-duplicate out-of-order packets diff := theirNonce.minus(&sinfo.theirNonce) @@ -330,19 +363,24 @@ func (sinfo *sessionInfo) nonceIsOK(theirNonce *boxNonce) bool { return ^sinfo.nonceMask&(0x01< 0 { + // This nonce is newer, so shift the window before setting the bit, and update theirNonce in the session info. sinfo.nonceMask <<= uint64(diff) sinfo.nonceMask &= 0x01 + sinfo.theirNonce = *theirNonce } else { + // This nonce is older, so set the bit but do not shift the window. sinfo.nonceMask &= 0x01 << uint64(-diff) } - sinfo.theirNonce = *theirNonce } +// Resets all sessions to an uninitialized state. +// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) resetInits() { for _, sinfo := range ss.sinfos { sinfo.init = false @@ -351,10 +389,9 @@ func (ss *sessions) resetInits() { //////////////////////////////////////////////////////////////////////////////// -// This is for a per-session worker -// It handles calling the relatively expensive crypto operations -// It's also responsible for keeping nonces consistent - +// This is for a per-session worker. +// It handles calling the relatively expensive crypto operations. +// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. func (sinfo *sessionInfo) doWorker() { for { select { @@ -374,6 +411,7 @@ func (sinfo *sessionInfo) doWorker() { } } +// This encrypts a packet, creates a trafficPacket struct, encodes it, and sends it to router.out to pass it to the switch layer. func (sinfo *sessionInfo) doSend(bs []byte) { defer util_putBytes(bs) if !sinfo.init { @@ -392,6 +430,11 @@ func (sinfo *sessionInfo) doSend(bs []byte) { sinfo.core.router.out(packet) } +// This takes a trafficPacket and checks the nonce. +// If the nonce is OK, it decrypts the packet. +// If the decrypted packet is OK, it calls router.recvPacket to pass the packet to the tun/tap. +// If a packet does not decrypt successfully, it assumes the packet was truncated, and updates the MTU accordingly. +// TODO? remove the MTU updating part? That should never happen with TCP peers, and the old UDP code that caused it was removed (and if replaced, should be replaced with something that can reliably send messages with an arbitrary size). func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) { defer util_putBytes(p.Payload) payloadSize := uint16(len(p.Payload)) diff --git a/src/yggdrasil/signature.go b/src/yggdrasil/signature.go index b146b9f9..f7b88ae9 100644 --- a/src/yggdrasil/signature.go +++ b/src/yggdrasil/signature.go @@ -6,40 +6,52 @@ package yggdrasil import "sync" import "time" +// This keeps track of what signatures have already been checked. +// It's used to skip expensive crypto operations, given that many signatures are likely to be the same for the average node's peers. type sigManager struct { mutex sync.RWMutex checked map[sigBytes]knownSig lastCleaned time.Time } +// Represents a known signature. +// Includes the key, the signature bytes, the bytes that were signed, and the time it was last used. type knownSig struct { + key sigPubKey + sig sigBytes bs []byte time time.Time } +// Initializes the signature manager. func (m *sigManager) init() { m.checked = make(map[sigBytes]knownSig) } +// Checks if a key and signature match the supplied bytes. +// If the same key/sig/bytes have been checked before, it returns true from the cached results. +// If not, it checks the key, updates it in the cache if successful, and returns the checked results. func (m *sigManager) check(key *sigPubKey, sig *sigBytes, bs []byte) bool { - if m.isChecked(sig, bs) { + if m.isChecked(key, sig, bs) { return true } verified := verify(key, bs, sig) if verified { - m.putChecked(sig, bs) + m.putChecked(key, sig, bs) } return verified } -func (m *sigManager) isChecked(sig *sigBytes, bs []byte) bool { +// Checks the cache to see if this key/sig/bytes combination has already been verified. +// Returns true if it finds a match. +func (m *sigManager) isChecked(key *sigPubKey, sig *sigBytes, bs []byte) bool { m.mutex.RLock() defer m.mutex.RUnlock() k, isIn := m.checked[*sig] if !isIn { return false } - if len(bs) != len(k.bs) { + if k.key != *key || k.sig != *sig || len(bs) != len(k.bs) { return false } for idx := 0; idx < len(bs); idx++ { @@ -51,7 +63,10 @@ func (m *sigManager) isChecked(sig *sigBytes, bs []byte) bool { return true } -func (m *sigManager) putChecked(newsig *sigBytes, bs []byte) { +// Puts a new result into the cache. +// This result is then used by isChecked to skip the expensive crypto verification if it's needed again. +// This is useful because, for nodes with multiple peers, there is often a lot of overlap between the signatures provided by each peer. +func (m *sigManager) putChecked(key *sigPubKey, newsig *sigBytes, bs []byte) { m.mutex.Lock() defer m.mutex.Unlock() now := time.Now() @@ -64,6 +79,6 @@ func (m *sigManager) putChecked(newsig *sigBytes, bs []byte) { } m.lastCleaned = now } - k := knownSig{bs: bs, time: now} + k := knownSig{key: *key, sig: *newsig, bs: bs, time: now} m.checked[*newsig] = k } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 6c42f456..a819594b 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -22,15 +22,17 @@ const switch_timeout = time.Minute const switch_updateInterval = switch_timeout / 2 const switch_throttle = switch_updateInterval / 2 -// You should be able to provide crypto signatures for this -// 1 signature per coord, from the *sender* to that coord -// E.g. A->B->C has sigA(A->B) and sigB(A->B->C) +// The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it. +// Nodes will pick the best root they see, provided that the root continues to push out updates with new timestamps. +// The coords represent a path from the root to a node. +// This path is generally part of a spanning tree, except possibly the last hop (it can loop when sending coords to your parent, but they see this and know not to use a looping path). type switchLocator struct { root sigPubKey tstamp int64 coords []switchPort } +// Returns true if the first sigPubKey has a higher TreeID. func firstIsBetter(first, second *sigPubKey) bool { // Higher TreeID is better ftid := getTreeID(first) @@ -45,6 +47,7 @@ func firstIsBetter(first, second *sigPubKey) bool { return false } +// Returns a copy of the locator which can safely be mutated. func (l *switchLocator) clone() switchLocator { // Used to create a deep copy for use in messages // Copy required because we need to mutate coords before sending @@ -55,6 +58,7 @@ func (l *switchLocator) clone() switchLocator { return loc } +// Gets the distance a locator is from the provided destination coords, with the coords provided in []byte format (used to compress integers sent over the wire). func (l *switchLocator) dist(dest []byte) int { // Returns distance (on the tree) from these coords offset := 0 @@ -85,6 +89,7 @@ func (l *switchLocator) dist(dest []byte) int { return dist } +// Gets coords in wire encoded format, with *no* length prefix. func (l *switchLocator) getCoords() []byte { bs := make([]byte, 0, len(l.coords)) for _, coord := range l.coords { @@ -94,6 +99,8 @@ func (l *switchLocator) getCoords() []byte { return bs } +// Returns true if the this locator represents an ancestor of the locator given as an argument. +// Ancestor means that it's the parent node, or the parent of parent, and so on... func (x *switchLocator) isAncestorOf(y *switchLocator) bool { if x.root != y.root { return false @@ -109,6 +116,7 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool { return true } +// Information about a peer, used by the switch to build the tree and eventually make routing decisions. type peerInfo struct { key sigPubKey // ID of this peer locator switchLocator // Should be able to respond with signatures upon request @@ -119,17 +127,23 @@ type peerInfo struct { msg switchMsg // The wire switchMsg used } +// This is just a uint64 with a named type for clarity reasons. type switchPort uint64 + +// This is the subset of the information about a peer needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). type tableElem struct { port switchPort locator switchLocator } +// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). type lookupTable struct { self switchLocator elems []tableElem } +// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. +// Use the switchTable functions to access it safely using the RWMutex for synchronization. type switchData struct { // All data that's mutable and used by exported Table methods // To be read/written with atomic.Value Store/Load calls @@ -139,6 +153,7 @@ type switchData struct { msg *switchMsg } +// All the information stored by the switch. type switchTable struct { core *Core key sigPubKey // Our own key @@ -151,6 +166,7 @@ type switchTable struct { table atomic.Value //lookupTable } +// Initializes the switchTable struct. func (t *switchTable) init(core *Core, key sigPubKey) { now := time.Now() t.core = core @@ -163,12 +179,14 @@ func (t *switchTable) init(core *Core, key sigPubKey) { t.drop = make(map[sigPubKey]int64) } +// Safely gets a copy of this node's locator. func (t *switchTable) getLocator() switchLocator { t.mutex.RLock() defer t.mutex.RUnlock() return t.data.locator.clone() } +// Regular maintenance to possibly timeout/reset the root and similar. func (t *switchTable) doMaintenance() { // Periodic maintenance work to keep things internally consistent t.mutex.Lock() // Write lock @@ -177,6 +195,7 @@ func (t *switchTable) doMaintenance() { t.cleanDropped() } +// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. func (t *switchTable) cleanRoot() { // TODO rethink how this is done?... // Get rid of the root if it looks like its timed out @@ -219,15 +238,23 @@ func (t *switchTable) cleanRoot() { } } -func (t *switchTable) removePeer(port switchPort) { +// Removes a peer. +// Must be called by the router mainLoop goroutine, e.g. call router.doAdmin with a lambda that calls this. +// If the removed peer was this node's parent, it immediately tries to find a new parent. +func (t *switchTable) unlockedRemovePeer(port switchPort) { delete(t.data.peers, port) t.updater.Store(&sync.Once{}) - // TODO if parent, find a new peer to use as parent instead + if port != t.parent { + return + } for _, info := range t.data.peers { t.unlockedHandleMsg(&info.msg, info.port) } } +// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. +// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. +// This function is called periodically to do that cleanup. func (t *switchTable) cleanDropped() { // TODO? only call this after root changes, not periodically for root := range t.drop { @@ -237,18 +264,23 @@ func (t *switchTable) cleanDropped() { } } +// A switchMsg contains the root node's sig key, timestamp, and signed per-hop information about a path from the root node to some other node in the network. +// This is exchanged with peers to construct the spanning tree. +// A subset of this information, excluding the signatures, is used to construct locators that are used elsewhere in the code. type switchMsg struct { Root sigPubKey TStamp int64 Hops []switchMsgHop } +// This represents the signed information about the path leading from the root the Next node, via the Port specified here. type switchMsgHop struct { Port switchPort Next sigPubKey Sig sigBytes } +// This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer. func (t *switchTable) getMsg() *switchMsg { t.mutex.RLock() defer t.mutex.RUnlock() @@ -263,6 +295,8 @@ func (t *switchTable) getMsg() *switchMsg { } } +// This function checks that the root information in a switchMsg is OK. +// In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout. func (t *switchTable) checkRoot(msg *switchMsg) bool { // returns false if it's a dropped root, not a better root, or has an older timestamp // returns true otherwise @@ -284,12 +318,18 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool { } } +// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch. func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) { t.mutex.Lock() defer t.mutex.Unlock() t.unlockedHandleMsg(msg, fromPort) } +// This updates the switch with information about a peer. +// Then the tricky part, it decides if it should update our own locator as a result. +// That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc... +// There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing. +// It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used along side nodes that used the previous order. func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { // TODO directly use a switchMsg instead of switchMessage + sigs now := time.Now() @@ -299,10 +339,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { sender.locator.tstamp = msg.TStamp prevKey := msg.Root for _, hop := range msg.Hops { - // Build locator and signatures - var sig sigInfo - sig.next = hop.Next - sig.sig = hop.Sig + // Build locator sender.locator.coords = append(sender.locator.coords, hop.Port) sender.key = prevKey prevKey = hop.Next @@ -401,6 +438,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) { return } +// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. func (t *switchTable) updateTable() { // WARNING this should only be called from within t.data.updater.Do() // It relies on the sync.Once for synchronization with messages and lookups @@ -434,6 +472,12 @@ func (t *switchTable) updateTable() { t.table.Store(newTable) } +// This does the switch layer lookups that decide how to route traffic. +// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree. +// Traffic must be routed to a node that is closer to the destination via the metric space distance. +// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over). +// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself. +// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists. func (t *switchTable) lookup(dest []byte) switchPort { t.updater.Load().(*sync.Once).Do(t.updateTable) table := t.table.Load().(lookupTable) @@ -463,14 +507,3 @@ func (t *switchTable) lookup(dest []byte) switchPort { //t.core.log.Println("DEBUG: sending to", best, "cost", bestCost) return best } - -//////////////////////////////////////////////////////////////////////////////// - -//Signature stuff - -type sigInfo struct { - next sigPubKey - sig sigBytes -} - -//////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index e325fcfe..75a5618e 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -18,12 +18,13 @@ import "net" import "time" import "errors" import "sync" +import "sync/atomic" import "fmt" import "golang.org/x/net/proxy" const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense -// wrapper function for non tcp/ip connections +// Wrapper function for non tcp/ip connections. func setNoDelay(c net.Conn, delay bool) { tcp, ok := c.(*net.TCPConn) if ok { @@ -31,6 +32,7 @@ func setNoDelay(c net.Conn, delay bool) { } } +// The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core serv net.Listener @@ -39,6 +41,8 @@ type tcpInterface struct { conns map[tcpInfo](chan struct{}) } +// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. +// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable). type tcpInfo struct { box boxPubKey sig sigPubKey @@ -46,15 +50,21 @@ type tcpInfo struct { remoteAddr string } +// Returns the address of the listener. func (iface *tcpInterface) getAddr() *net.TCPAddr { return iface.serv.Addr().(*net.TCPAddr) } +// Attempts to initiate a connection to the provided address. func (iface *tcpInterface) connect(addr string) { iface.call(addr) } +// Attempst to initiate a connection to the provided address, viathe provided socks proxy address. func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { + // TODO make sure this doesn't keep attempting/killing connections when one is already active. + // I think some of the interaction between this and callWithConn needs work, so the dial isn't even attempted if there's already an outgoing call to peeraddr. + // Or maybe only if there's already an outgoing call to peeraddr via this socksaddr? go func() { dialer, err := proxy.SOCKS5("tcp", socksaddr, nil, proxy.Direct) if err == nil { @@ -72,6 +82,7 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { }() } +// Initializes the struct. func (iface *tcpInterface) init(core *Core, addr string) (err error) { iface.core = core @@ -85,6 +96,7 @@ func (iface *tcpInterface) init(core *Core, addr string) (err error) { return err } +// Runs the listener, which spawns off goroutines for incoming connections. func (iface *tcpInterface) listener() { defer iface.serv.Close() iface.core.log.Println("Listening for TCP on:", iface.serv.Addr().String()) @@ -97,6 +109,7 @@ func (iface *tcpInterface) listener() { } } +// Called by connectSOCKS, it's like call but with the connection already established. func (iface *tcpInterface) callWithConn(conn net.Conn) { go func() { raddr := conn.RemoteAddr().String() @@ -117,6 +130,11 @@ func (iface *tcpInterface) callWithConn(conn net.Conn) { }() } +// Checks if a connection already exists. +// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. +// If the dial is successful, it launches the handler. +// When finished, it removes the outgoing call, so reconnection attempts can be made later. +// This all happens in a separate goroutine that it spawns. func (iface *tcpInterface) call(saddr string) { go func() { quit := false @@ -142,6 +160,8 @@ func (iface *tcpInterface) call(saddr string) { }() } +// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. +// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() // Get our keys @@ -233,7 +253,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { out := make(chan []byte, 32) // TODO? what size makes sense defer close(out) go func() { - var shadow uint64 + var shadow int64 var stack [][]byte put := func(msg []byte) { stack = append(stack, msg) @@ -247,14 +267,16 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { msgLen := wire_encode_uint64(uint64(len(msg))) buf := net.Buffers{tcp_msg[:], msgLen, msg} buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) util_putBytes(msg) } timerInterval := 4 * time.Second timer := time.NewTimer(timerInterval) defer timer.Stop() for { - for ; shadow > 0; shadow-- { - p.updateQueueSize(-1) + if shadow != 0 { + p.updateQueueSize(-shadow) + shadow = 0 } timer.Stop() select { @@ -319,6 +341,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { return } +// This reads from the socket into a []byte buffer for incomping messages. +// It copies completed messages out of the cache into a new slice, and passes them to the peer struct via the provided `in func([]byte)` argument. +// Then it shifts the incomplete fragments of data forward so future reads won't overwrite it. func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) { bs := make([]byte, 2*tcp_msgSize) frag := bs[:0] @@ -350,9 +375,13 @@ func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) { //////////////////////////////////////////////////////////////////////////////// -// Magic bytes to check +// These are 4 bytes of padding used to catch if something went horribly wrong with the tcp connection. var tcp_msg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" +// This takes a pointer to a slice as an argument. +// It checks if there's a complete message and, if so, slices out those parts and returns the message, true, and nil. +// If there's no error, but also no complete message, it returns nil, false, and nil. +// If there's an error, it returns nil, false, and the error, which the reader then handles (currently, by returning from the reader, which causes the connection to close). func tcp_chop_msg(bs *[]byte) ([]byte, bool, error) { // Returns msg, ok, err if len(*bs) < len(tcp_msg) { diff --git a/src/yggdrasil/util.go b/src/yggdrasil/util.go index 908b1ae9..fec5747c 100644 --- a/src/yggdrasil/util.go +++ b/src/yggdrasil/util.go @@ -6,14 +6,17 @@ import "runtime" //import "sync" +// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func util_yield() { runtime.Gosched() } +// A wrapper around runtime.LockOSThread() so it doesn't need to be imported elsewhere. func util_lockthread() { runtime.LockOSThread() } +// A wrapper around runtime.UnlockOSThread() so it doesn't need to be imported elsewhere. func util_unlockthread() { runtime.UnlockOSThread() } @@ -32,14 +35,18 @@ func util_putBytes(bs []byte) { } */ +// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops. +// It's used like a sync.Pool, but with a fixed size and typechecked without type casts to/from interface{} (which were making the profiles look ugly). var byteStore chan []byte +// Initializes the byteStore func util_initByteStore() { if byteStore == nil { byteStore = make(chan []byte, 32) } } +// Gets an empty slice from the byte store, if one is available, or else returns a new nil slice. func util_getBytes() []byte { select { case bs := <-byteStore: @@ -49,6 +56,7 @@ func util_getBytes() []byte { } } +// Puts a slice in the store, if there's room, or else returns and lets the slice get collected. func util_putBytes(bs []byte) { select { case byteStore <- bs: diff --git a/src/yggdrasil/version.go b/src/yggdrasil/version.go index 40a5b965..75b08a6f 100644 --- a/src/yggdrasil/version.go +++ b/src/yggdrasil/version.go @@ -4,6 +4,9 @@ package yggdrasil // Used in the inital connection setup and key exchange // Some of this could arguably go in wire.go instead +// This is the version-specific metadata exchanged at the start of a connection. +// It must always beign with the 4 bytes "meta" and a wire formatted uint64 major version number. +// The current version also includes a minor version number, and the box/sig/link keys that need to be exchanged to open an connection. type version_metadata struct { meta [4]byte ver uint64 // 1 byte in this version @@ -14,6 +17,7 @@ type version_metadata struct { link boxPubKey } +// Gets a base metadata with no keys set, but with the correct version numbers. func version_getBaseMetadata() version_metadata { return version_metadata{ meta: [4]byte{'m', 'e', 't', 'a'}, @@ -22,16 +26,18 @@ func version_getBaseMetadata() version_metadata { } } +// Gest the length of the metadata for this version, used to know how many bytes to read from the start of a connection. func version_getMetaLength() (mlen int) { mlen += 4 // meta - mlen += 1 // ver - mlen += 1 // minorVer + mlen += 1 // ver, as long as it's < 127, which it is in this version + mlen += 1 // minorVer, as long as it's < 127, which it is in this version mlen += boxPubKeyLen // box mlen += sigPubKeyLen // sig mlen += boxPubKeyLen // link return } +// Encodes version metadata into its wire format. func (m *version_metadata) encode() []byte { bs := make([]byte, 0, version_getMetaLength()) bs = append(bs, m.meta[:]...) @@ -46,6 +52,7 @@ func (m *version_metadata) encode() []byte { return bs } +// Decodes version metadata from its wire format into the struct. func (m *version_metadata) decode(bs []byte) bool { switch { case !wire_chop_slice(m.meta[:], &bs): @@ -64,6 +71,7 @@ func (m *version_metadata) decode(bs []byte) bool { return true } +// Checks that the "meta" bytes and the version numbers are the expected values. func (m *version_metadata) check() bool { base := version_getBaseMetadata() return base.meta == m.meta && base.ver == m.ver && base.minorVer == m.minorVer diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 23e9ab3d..e92b4fcf 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -5,9 +5,8 @@ package yggdrasil // TODO clean up unused/commented code, and add better comments to whatever is left -// Packet types, as an Encode_uint64 at the start of each packet -// TODO? make things still work after reordering (after things stabilize more?) -// Type safety would also be nice, `type wire_type uint64`, rewrite as needed? +// Packet types, as wire_encode_uint64(type) at the start of each packet + const ( wire_Traffic = iota // data being routed somewhere, handle for crypto wire_ProtocolTraffic // protocol traffic, pub keys for crypto @@ -19,13 +18,13 @@ const ( wire_DHTLookupResponse // inside protocol traffic header ) -// Encode uint64 using a variable length scheme -// Similar to binary.Uvarint, but big-endian +// Calls wire_put_uint64 on a nil slice. func wire_encode_uint64(elem uint64) []byte { return wire_put_uint64(elem, nil) } -// Occasionally useful for appending to an existing slice (if there's room) +// Encode uint64 using a variable length scheme. +// Similar to binary.Uvarint, but big-endian. func wire_put_uint64(elem uint64, out []byte) []byte { bs := make([]byte, 0, 10) bs = append(bs, byte(elem&0x7f)) @@ -41,6 +40,7 @@ func wire_put_uint64(elem uint64, out []byte) []byte { return append(out, bs...) } +// Returns the length of a wire encoded uint64 of this value. func wire_uint64_len(elem uint64) int { l := 1 for e := elem >> 7; e > 0; e >>= 7 { @@ -49,8 +49,8 @@ func wire_uint64_len(elem uint64) int { return l } -// Decode uint64 from a []byte slice -// Returns the decoded uint64 and the number of bytes used +// Decode uint64 from a []byte slice. +// Returns the decoded uint64 and the number of bytes used. func wire_decode_uint64(bs []byte) (uint64, int) { length := 0 elem := uint64(0) @@ -65,20 +65,22 @@ func wire_decode_uint64(bs []byte) (uint64, int) { return elem, length } +// Converts an int64 into uint64 so it can be written to the wire. +// Non-negative integers are mapped to even integers: 0 -> 0, 1 -> 2, etc. +// Negative integres are mapped to odd integes: -1 -> 1, -2 -> 3, etc. +// This means the least significant bit is a sign bit. func wire_intToUint(i int64) uint64 { - // Non-negative integers mapped to even integers: 0 -> 0, 1 -> 2, etc. - // Negative integres mapped to odd integes: -1 -> 1, -2 -> 3, etc. - // This means the least significant bit is a sign bit. return ((uint64(-(i+1))<<1)|0x01)*(uint64(i)>>63) + (uint64(i)<<1)*(^uint64(i)>>63) } +// Converts uint64 back to int64, genreally when being read from the wire. func wire_intFromUint(u uint64) int64 { return int64(u&0x01)*(-int64(u>>1)-1) + int64(^u&0x01)*int64(u>>1) } //////////////////////////////////////////////////////////////////////////////// -// Takes coords, returns coords prefixed with encoded coord length +// Takes coords, returns coords prefixed with encoded coord length. func wire_encode_coords(coords []byte) []byte { coordLen := wire_encode_uint64(uint64(len(coords))) bs := make([]byte, 0, len(coordLen)+len(coords)) @@ -87,14 +89,17 @@ func wire_encode_coords(coords []byte) []byte { return bs } +// Puts a length prefix and the coords into bs, returns the wire formatted coords. +// Useful in hot loops where we don't want to allocate and we know the rest of the later parts of the slice are safe to overwrite. func wire_put_coords(coords []byte, bs []byte) []byte { bs = wire_put_uint64(uint64(len(coords)), bs) bs = append(bs, coords...) return bs } -// Takes a packet that begins with coords (starting with coord length) -// Returns a slice of coords and the number of bytes read +// Takes a slice that begins with coords (starting with coord length). +// Returns a slice of coords and the number of bytes read. +// Used as part of various decode() functions for structs. func wire_decode_coords(packet []byte) ([]byte, int) { coordLen, coordBegin := wire_decode_uint64(packet) coordEnd := coordBegin + int(coordLen) @@ -106,6 +111,7 @@ func wire_decode_coords(packet []byte) ([]byte, int) { //////////////////////////////////////////////////////////////////////////////// +// Encodes a swtichMsg into its wire format. func (m *switchMsg) encode() []byte { bs := wire_encode_uint64(wire_SwitchMsg) bs = append(bs, m.Root[:]...) @@ -118,6 +124,7 @@ func (m *switchMsg) encode() []byte { return bs } +// Decodes a wire formatted switchMsg into the struct, returns true if successful. func (m *switchMsg) decode(bs []byte) bool { var pType uint64 var tstamp uint64 @@ -149,6 +156,7 @@ func (m *switchMsg) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// +// A utility function used to copy bytes into a slice and advance the beginning of the source slice, returns true if successful. func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { if len(*fromSlice) < len(toSlice) { return false @@ -158,6 +166,7 @@ func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool { return true } +// A utility function to extract coords from a slice and advance the source slices, returning true if successful. func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool { coords, coordLen := wire_decode_coords(*fromSlice) if coordLen == 0 { @@ -168,6 +177,7 @@ func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool { return true } +// A utility function to extract a wire encoded uint64 into the provided pointer while advancing the start of the source slice, returning true if successful. func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { dec, decLen := wire_decode_uint64(*fromSlice) if decLen == 0 { @@ -182,6 +192,7 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool { // Wire traffic packets +// The wire format for ordinary IPv6 traffic encapsulated by the network. type wire_trafficPacket struct { Coords []byte Handle handle @@ -189,7 +200,7 @@ type wire_trafficPacket struct { Payload []byte } -// This is basically MarshalBinary, but decode doesn't allow that... +// Encodes a wire_trafficPacket into its wire format. func (p *wire_trafficPacket) encode() []byte { bs := util_getBytes() bs = wire_put_uint64(wire_Traffic, bs) @@ -200,7 +211,7 @@ func (p *wire_trafficPacket) encode() []byte { return bs } -// Not just UnmarshalBinary becuase the original slice isn't always copied from +// Decodes an encoded wire_trafficPacket into the struct, returning true if successful. func (p *wire_trafficPacket) decode(bs []byte) bool { var pType uint64 switch { @@ -219,6 +230,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool { return true } +// The wire format for protocol traffic, such as dht req/res or session ping/pong packets. type wire_protoTrafficPacket struct { Coords []byte ToKey boxPubKey @@ -227,6 +239,7 @@ type wire_protoTrafficPacket struct { Payload []byte } +// Encodes a wire_protoTrafficPacket into its wire format. func (p *wire_protoTrafficPacket) encode() []byte { coords := wire_encode_coords(p.Coords) bs := wire_encode_uint64(wire_ProtocolTraffic) @@ -238,6 +251,7 @@ func (p *wire_protoTrafficPacket) encode() []byte { return bs } +// Decodes an encoded wire_protoTrafficPacket into the struct, returning true if successful. func (p *wire_protoTrafficPacket) decode(bs []byte) bool { var pType uint64 switch { @@ -258,11 +272,16 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool { return true } +// The wire format for link protocol traffic, namely switchMsg. +// There's really two layers of this, with the outer layer using permanent keys, and the inner layer using ephemeral keys. +// The keys themselves are exchanged as part of the connection setup, and then omitted from the packets. +// The two layer logic is handled in peers.go, but it's kind of ugly. type wire_linkProtoTrafficPacket struct { Nonce boxNonce Payload []byte } +// Encodes a wire_linkProtoTrafficPacket into its wire format. func (p *wire_linkProtoTrafficPacket) encode() []byte { bs := wire_encode_uint64(wire_LinkProtocolTraffic) bs = append(bs, p.Nonce[:]...) @@ -270,6 +289,7 @@ func (p *wire_linkProtoTrafficPacket) encode() []byte { return bs } +// Decodes an encoded wire_linkProtoTrafficPacket into the struct, returning true if successful. func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool { var pType uint64 switch { @@ -286,6 +306,7 @@ func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// +// Encodes a sessionPing into its wire format. func (p *sessionPing) encode() []byte { var pTypeVal uint64 if p.IsPong { @@ -304,6 +325,7 @@ func (p *sessionPing) encode() []byte { return bs } +// Decodes an encoded sessionPing into the struct, returning true if successful. func (p *sessionPing) decode(bs []byte) bool { var pType uint64 var tstamp uint64 @@ -335,6 +357,7 @@ func (p *sessionPing) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// +// Encodes a dhtReq into its wire format. func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.Coords) bs := wire_encode_uint64(wire_DHTLookupRequest) @@ -343,6 +366,7 @@ func (r *dhtReq) encode() []byte { return bs } +// Decodes an encoded dhtReq into the struct, returning true if successful. func (r *dhtReq) decode(bs []byte) bool { var pType uint64 switch { @@ -359,6 +383,7 @@ func (r *dhtReq) decode(bs []byte) bool { } } +// Encodes a dhtRes into its wire format. func (r *dhtRes) encode() []byte { coords := wire_encode_coords(r.Coords) bs := wire_encode_uint64(wire_DHTLookupResponse) @@ -372,6 +397,7 @@ func (r *dhtRes) encode() []byte { return bs } +// Decodes an encoded dhtRes into the struct, returning true if successful. func (r *dhtRes) decode(bs []byte) bool { var pType uint64 switch {