From 160e01e84f82763d77e7bbfd5863e8192707da7f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 18 Apr 2019 23:38:23 +0100 Subject: [PATCH] Searches called from api.go, various other tweaks, searches now have a callback for success/failure, node ID now reported by admin socket --- cmd/yggdrasil/main.go | 13 +++++ cmd/yggdrasilctl/main.go | 3 ++ src/yggdrasil/admin.go | 2 + src/yggdrasil/api.go | 108 +++++++++++++++++++++++++++++++-------- src/yggdrasil/router.go | 8 ++- src/yggdrasil/search.go | 43 +++++++++------- src/yggdrasil/session.go | 81 ++++++++++++++--------------- 7 files changed, 177 insertions(+), 81 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index fd8cd7b6..36866b3f 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -10,6 +10,7 @@ import ( "os/signal" "strings" "syscall" + "time" "golang.org/x/text/encoding/unicode" @@ -267,6 +268,18 @@ func main() { defer func() { n.core.Stop() }() + // Some stuff + go func() { + time.Sleep(time.Second * 2) + session, err := n.core.Dial("nodeid", "babd4e4bccb216f77bb723c1b034b63a652060aabfe9506b51f687183e9b0fd13f438876f5a3ab21cac9c8101eb88e2613fe2a8b0724add09d7ef5a72146c31f") + logger.Println(session, err) + b := []byte{1, 2, 3, 4, 5} + for { + logger.Println(session.Write(b)) + logger.Println(session.Read(b)) + time.Sleep(time.Second) + } + }() // Make some nice output that tells us what our IPv6 address and subnet are. // This is just logged to stdout for the user. address := n.core.Address() diff --git a/cmd/yggdrasilctl/main.go b/cmd/yggdrasilctl/main.go index b8864dc3..b70bbe9e 100644 --- a/cmd/yggdrasilctl/main.go +++ b/cmd/yggdrasilctl/main.go @@ -277,6 +277,9 @@ func main() { fmt.Println("Coords:", coords) } if *verbose { + if nodeID, ok := v.(map[string]interface{})["node_id"].(string); ok { + fmt.Println("Node ID:", nodeID) + } if boxPubKey, ok := v.(map[string]interface{})["box_pub_key"].(string); ok { fmt.Println("Public encryption key:", boxPubKey) } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index a9595f8c..0f91cd15 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -640,7 +640,9 @@ func (a *admin) startTunWithMTU(ifname string, iftapmode bool, ifmtu int) error func (a *admin) getData_getSelf() *admin_nodeInfo { table := a.core.switchTable.table.Load().(lookupTable) coords := table.self.getCoords() + nodeid := *crypto.GetNodeID(&a.core.boxPub) self := admin_nodeInfo{ + {"node_id", hex.EncodeToString(nodeid[:])}, {"box_pub_key", hex.EncodeToString(a.core.boxPub[:])}, {"ip", a.core.Address().String()}, {"subnet", a.core.Subnet().String()}, diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 9181269d..0b38525b 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -6,11 +6,13 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) func (c *Core) Dial(network, address string) (Conn, error) { - var nodeID *crypto.NodeID - var nodeMask *crypto.NodeID + conn := Conn{} + nodeID := crypto.NodeID{} + nodeMask := crypto.NodeID{} // Process switch network { case "nodeid": @@ -20,22 +22,51 @@ func (c *Core) Dial(network, address string) (Conn, error) { return Conn{}, err } copy(nodeID[:], dest) - var m crypto.NodeID - for i := range dest { - m[i] = 0xFF + for i := range nodeMask { + nodeMask[i] = 0xFF } - copy(nodeMask[:], m[:]) default: // An unexpected address type was given, so give up return Conn{}, errors.New("unexpected address type") } + conn.core = c + conn.nodeID = &nodeID + conn.nodeMask = &nodeMask + conn.core.router.doAdmin(func() { + conn.startSearch() + }) + return conn, nil +} + +type Conn struct { + core *Core + nodeID *crypto.NodeID + nodeMask *crypto.NodeID + session *sessionInfo + readDeadline time.Time + writeDeadline time.Time +} + +// This method should only be called from the router goroutine +func (c *Conn) startSearch() { + searchCompleted := func(sinfo *sessionInfo, err error) { + if err != nil { + c.core.log.Debugln("DHT search failed:", err) + return + } + if sinfo != nil { + c.session = sinfo + c.core.log.Println("Search from API found", hex.EncodeToString(sinfo.theirPermPub[:])) + } + } // Try and search for the node on the network doSearch := func() { - sinfo, isIn := c.searches.searches[*nodeID] + sinfo, isIn := c.core.searches.searches[*c.nodeID] if !isIn { - sinfo = c.searches.newIterSearch(nodeID, nodeMask) + c.core.log.Debugln("Starting search for", hex.EncodeToString(c.nodeID[:])) + sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) } - c.searches.continueSearch(sinfo) + c.core.searches.continueSearch(sinfo) } var sinfo *sessionInfo var isIn bool @@ -61,27 +92,62 @@ func (c *Core) Dial(network, address string) (Conn, error) { if time.Since(sinfo.pingSend) > time.Second { // Send at most 1 ping per second sinfo.pingSend = now - c.sessions.sendPingPong(sinfo, false) + c.core.sessions.sendPingPong(sinfo, false) } } } - return Conn{ - session: sinfo, - }, nil -} - -type Conn struct { - session *sessionInfo - readDeadline time.Time - writeDeadline time.Time } func (c *Conn) Read(b []byte) (int, error) { - return 0, nil + if c.session == nil { + return 0, errors.New("invalid session") + } + p := <-c.session.recv + defer util.PutBytes(p.Payload) + if !c.session.nonceIsOK(&p.Nonce) { + return 0, errors.New("invalid nonce") + } + bs, isOK := crypto.BoxOpen(&c.session.sharedSesKey, p.Payload, &p.Nonce) + if !isOK { + util.PutBytes(bs) + return 0, errors.New("failed to decrypt") + } + b = b[:0] + b = append(b, bs...) + c.session.updateNonce(&p.Nonce) + c.session.time = time.Now() + c.session.bytesRecvd += uint64(len(bs)) + return len(b), nil } func (c *Conn) Write(b []byte) (int, error) { - return 0, nil + if c.session == nil { + c.core.router.doAdmin(func() { + c.startSearch() + }) + return 0, errors.New("invalid session") + } + defer util.PutBytes(b) + if !c.session.init { + // To prevent using empty session keys + return 0, errors.New("session not initialised") + } + // code isn't multithreaded so appending to this is safe + coords := c.session.coords + // Prepare the payload + payload, nonce := crypto.BoxSeal(&c.session.sharedSesKey, b, &c.session.myNonce) + defer util.PutBytes(payload) + p := wire_trafficPacket{ + Coords: coords, + Handle: c.session.theirHandle, + Nonce: *nonce, + Payload: payload, + } + packet := p.encode() + c.session.bytesSent += uint64(len(b)) + c.session.send <- packet + //c.session.core.router.out(packet) + return len(b), nil } func (c *Conn) Close() error { diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index a3d3d68c..d7923f51 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -245,6 +245,12 @@ func (r *router) sendPacket(bs []byte) { return } } + searchCompleted := func(sinfo *sessionInfo, err error) { + if err != nil { + r.core.log.Debugln("DHT search failed:", err) + return + } + } doSearch := func(packet []byte) { var nodeID, mask *crypto.NodeID switch { @@ -270,7 +276,7 @@ func (r *router) sendPacket(bs []byte) { } sinfo, isIn := r.core.searches.searches[*nodeID] if !isIn { - sinfo = r.core.searches.newIterSearch(nodeID, mask) + sinfo = r.core.searches.newIterSearch(nodeID, mask, searchCompleted) } if packet != nil { sinfo.packet = packet diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index c391dda2..e81a9723 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -15,6 +15,7 @@ package yggdrasil // Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there? import ( + "errors" "sort" "time" @@ -32,12 +33,13 @@ const search_RETRY_TIME = time.Second // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. type searchInfo struct { - dest crypto.NodeID - mask crypto.NodeID - time time.Time - packet []byte - toVisit []*dhtInfo - visited map[crypto.NodeID]bool + dest crypto.NodeID + mask crypto.NodeID + time time.Time + packet []byte + toVisit []*dhtInfo + visited map[crypto.NodeID]bool + callback func(*sessionInfo, error) } // This stores a map of active searches. @@ -61,7 +63,7 @@ func (s *searches) init(core *Core) { } // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. -func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searchInfo { +func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { now := time.Now() for dest, sinfo := range s.searches { if now.Sub(sinfo.time) > time.Minute { @@ -69,9 +71,10 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searc } } info := searchInfo{ - dest: *dest, - mask: *mask, - time: now.Add(-time.Second), + dest: *dest, + mask: *mask, + time: now.Add(-time.Second), + callback: callback, } s.searches[*dest] = &info return &info @@ -137,15 +140,15 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup delete(s.searches, sinfo.dest) + sinfo.callback(nil, errors.New("search reached dead end")) return - } else { - // Send to the next search target - var next *dhtInfo - next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] - rq := dhtReqKey{next.key, sinfo.dest} - s.core.dht.addCallback(&rq, s.handleDHTRes) - s.core.dht.ping(next, &sinfo.dest) } + // Send to the next search target + var next *dhtInfo + next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] + rq := dhtReqKey{next.key, sinfo.dest} + s.core.dht.addCallback(&rq, s.handleDHTRes) + s.core.dht.ping(next, &sinfo.dest) } // If we've recenty sent a ping for this search, do nothing. @@ -173,8 +176,8 @@ func (s *searches) continueSearch(sinfo *searchInfo) { } // Calls create search, and initializes the iterative search parts of the struct before returning it. -func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searchInfo { - sinfo := s.createSearch(dest, mask) +func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { + sinfo := s.createSearch(dest, mask, callback) sinfo.toVisit = s.core.dht.lookup(dest, true) sinfo.visited = make(map[crypto.NodeID]bool) return sinfo @@ -200,6 +203,7 @@ func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { sinfo = s.core.sessions.createSession(&res.Key) if sinfo == nil { // nil if the DHT search finished but the session wasn't allowed + info.callback(nil, errors.New("session not allowed")) return true } _, isIn := s.core.sessions.getByTheirPerm(&res.Key) @@ -211,6 +215,7 @@ func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { sinfo.coords = res.Coords sinfo.packet = info.packet s.core.sessions.ping(sinfo) + info.callback(sinfo, nil) // Cleanup delete(s.searches, res.Dest) return true diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 74255c09..fd3a985b 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -7,6 +7,7 @@ package yggdrasil import ( "bytes" "encoding/hex" + "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -17,35 +18,38 @@ import ( // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - core *Core - reconfigure chan chan error - theirAddr address.Address - theirSubnet address.Subnet - theirPermPub crypto.BoxPubKey - theirSesPub crypto.BoxPubKey - mySesPub crypto.BoxPubKey - mySesPriv crypto.BoxPrivKey - sharedSesKey crypto.BoxSharedKey // derived from session keys - theirHandle crypto.Handle - myHandle crypto.Handle - theirNonce crypto.BoxNonce - myNonce crypto.BoxNonce - theirMTU uint16 - myMTU uint16 - wasMTUFixed bool // Was the MTU fixed by a receive error? - time time.Time // Time we last received a packet - coords []byte // coords of destination - packet []byte // a buffered packet, sent immediately on ping/pong - init bool // Reset if coords change - send chan []byte - recv chan *wire_trafficPacket - nonceMask uint64 - tstamp int64 // tstamp from their last session ping, replay attack mitigation - mtuTime time.Time // time myMTU was last changed - pingTime time.Time // time the first ping was sent since the last received packet - pingSend time.Time // time the last ping was sent - bytesSent uint64 // Bytes of real traffic sent in this session - bytesRecvd uint64 // Bytes of real traffic received in this session + core *Core + reconfigure chan chan error + theirAddr address.Address + theirSubnet address.Subnet + theirPermPub crypto.BoxPubKey + theirSesPub crypto.BoxPubKey + mySesPub crypto.BoxPubKey + mySesPriv crypto.BoxPrivKey + sharedSesKey crypto.BoxSharedKey // derived from session keys + theirHandle crypto.Handle + myHandle crypto.Handle + theirNonce crypto.BoxNonce + theirNonceMutex sync.RWMutex // protects the above + myNonce crypto.BoxNonce + myNonceMutex sync.RWMutex // protects the above + theirMTU uint16 + myMTU uint16 + wasMTUFixed bool // Was the MTU fixed by a receive error? + time time.Time // Time we last received a packet + coords []byte // coords of destination + packet []byte // a buffered packet, sent immediately on ping/pong + init bool // Reset if coords change + send chan []byte + recv chan *wire_trafficPacket + nonceMask uint64 + tstamp int64 // tstamp from their last session ping, replay attack mitigation + tstampMutex int64 // protects the above + mtuTime time.Time // time myMTU was last changed + pingTime time.Time // time the first ping was sent since the last received packet + pingSend time.Time // time the last ping was sent + bytesSent uint64 // Bytes of real traffic sent in this session + bytesRecvd uint64 // Bytes of real traffic received in this session } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -101,17 +105,14 @@ func (s *sessionInfo) timedout() bool { // Sessions are indexed by handle. // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { - core *Core - reconfigure chan chan error - lastCleanup time.Time - // Maps known permanent keys to their shared key, used by DHT a lot - permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey - // Maps (secret) handle onto session info - sinfos map[crypto.Handle]*sessionInfo - // Maps mySesPub onto handle - byMySes map[crypto.BoxPubKey]*crypto.Handle - // Maps theirPermPub onto handle - byTheirPerm map[crypto.BoxPubKey]*crypto.Handle + core *Core + reconfigure chan chan error + lastCleanup time.Time + permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot + sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info + conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections + byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle + byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle addrToPerm map[address.Address]*crypto.BoxPubKey subnetToPerm map[address.Subnet]*crypto.BoxPubKey }