Searches called from api.go, various other tweaks, searches now have a callback for success/failure, node ID now reported by admin socket

This commit is contained in:
Neil Alexander 2019-04-18 23:38:23 +01:00
parent eef2a02d0a
commit 160e01e84f
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
7 changed files with 177 additions and 81 deletions

View File

@ -10,6 +10,7 @@ import (
"os/signal" "os/signal"
"strings" "strings"
"syscall" "syscall"
"time"
"golang.org/x/text/encoding/unicode" "golang.org/x/text/encoding/unicode"
@ -267,6 +268,18 @@ func main() {
defer func() { defer func() {
n.core.Stop() n.core.Stop()
}() }()
// Some stuff
go func() {
time.Sleep(time.Second * 2)
session, err := n.core.Dial("nodeid", "babd4e4bccb216f77bb723c1b034b63a652060aabfe9506b51f687183e9b0fd13f438876f5a3ab21cac9c8101eb88e2613fe2a8b0724add09d7ef5a72146c31f")
logger.Println(session, err)
b := []byte{1, 2, 3, 4, 5}
for {
logger.Println(session.Write(b))
logger.Println(session.Read(b))
time.Sleep(time.Second)
}
}()
// Make some nice output that tells us what our IPv6 address and subnet are. // Make some nice output that tells us what our IPv6 address and subnet are.
// This is just logged to stdout for the user. // This is just logged to stdout for the user.
address := n.core.Address() address := n.core.Address()

View File

@ -277,6 +277,9 @@ func main() {
fmt.Println("Coords:", coords) fmt.Println("Coords:", coords)
} }
if *verbose { if *verbose {
if nodeID, ok := v.(map[string]interface{})["node_id"].(string); ok {
fmt.Println("Node ID:", nodeID)
}
if boxPubKey, ok := v.(map[string]interface{})["box_pub_key"].(string); ok { if boxPubKey, ok := v.(map[string]interface{})["box_pub_key"].(string); ok {
fmt.Println("Public encryption key:", boxPubKey) fmt.Println("Public encryption key:", boxPubKey)
} }

View File

@ -640,7 +640,9 @@ func (a *admin) startTunWithMTU(ifname string, iftapmode bool, ifmtu int) error
func (a *admin) getData_getSelf() *admin_nodeInfo { func (a *admin) getData_getSelf() *admin_nodeInfo {
table := a.core.switchTable.table.Load().(lookupTable) table := a.core.switchTable.table.Load().(lookupTable)
coords := table.self.getCoords() coords := table.self.getCoords()
nodeid := *crypto.GetNodeID(&a.core.boxPub)
self := admin_nodeInfo{ self := admin_nodeInfo{
{"node_id", hex.EncodeToString(nodeid[:])},
{"box_pub_key", hex.EncodeToString(a.core.boxPub[:])}, {"box_pub_key", hex.EncodeToString(a.core.boxPub[:])},
{"ip", a.core.Address().String()}, {"ip", a.core.Address().String()},
{"subnet", a.core.Subnet().String()}, {"subnet", a.core.Subnet().String()},

View File

@ -6,11 +6,13 @@ import (
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
func (c *Core) Dial(network, address string) (Conn, error) { func (c *Core) Dial(network, address string) (Conn, error) {
var nodeID *crypto.NodeID conn := Conn{}
var nodeMask *crypto.NodeID nodeID := crypto.NodeID{}
nodeMask := crypto.NodeID{}
// Process // Process
switch network { switch network {
case "nodeid": case "nodeid":
@ -20,22 +22,51 @@ func (c *Core) Dial(network, address string) (Conn, error) {
return Conn{}, err return Conn{}, err
} }
copy(nodeID[:], dest) copy(nodeID[:], dest)
var m crypto.NodeID for i := range nodeMask {
for i := range dest { nodeMask[i] = 0xFF
m[i] = 0xFF
} }
copy(nodeMask[:], m[:])
default: default:
// An unexpected address type was given, so give up // An unexpected address type was given, so give up
return Conn{}, errors.New("unexpected address type") return Conn{}, errors.New("unexpected address type")
} }
conn.core = c
conn.nodeID = &nodeID
conn.nodeMask = &nodeMask
conn.core.router.doAdmin(func() {
conn.startSearch()
})
return conn, nil
}
type Conn struct {
core *Core
nodeID *crypto.NodeID
nodeMask *crypto.NodeID
session *sessionInfo
readDeadline time.Time
writeDeadline time.Time
}
// This method should only be called from the router goroutine
func (c *Conn) startSearch() {
searchCompleted := func(sinfo *sessionInfo, err error) {
if err != nil {
c.core.log.Debugln("DHT search failed:", err)
return
}
if sinfo != nil {
c.session = sinfo
c.core.log.Println("Search from API found", hex.EncodeToString(sinfo.theirPermPub[:]))
}
}
// Try and search for the node on the network // Try and search for the node on the network
doSearch := func() { doSearch := func() {
sinfo, isIn := c.searches.searches[*nodeID] sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn { if !isIn {
sinfo = c.searches.newIterSearch(nodeID, nodeMask) c.core.log.Debugln("Starting search for", hex.EncodeToString(c.nodeID[:]))
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
} }
c.searches.continueSearch(sinfo) c.core.searches.continueSearch(sinfo)
} }
var sinfo *sessionInfo var sinfo *sessionInfo
var isIn bool var isIn bool
@ -61,27 +92,62 @@ func (c *Core) Dial(network, address string) (Conn, error) {
if time.Since(sinfo.pingSend) > time.Second { if time.Since(sinfo.pingSend) > time.Second {
// Send at most 1 ping per second // Send at most 1 ping per second
sinfo.pingSend = now sinfo.pingSend = now
c.sessions.sendPingPong(sinfo, false) c.core.sessions.sendPingPong(sinfo, false)
} }
} }
} }
return Conn{
session: sinfo,
}, nil
}
type Conn struct {
session *sessionInfo
readDeadline time.Time
writeDeadline time.Time
} }
func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Read(b []byte) (int, error) {
return 0, nil if c.session == nil {
return 0, errors.New("invalid session")
}
p := <-c.session.recv
defer util.PutBytes(p.Payload)
if !c.session.nonceIsOK(&p.Nonce) {
return 0, errors.New("invalid nonce")
}
bs, isOK := crypto.BoxOpen(&c.session.sharedSesKey, p.Payload, &p.Nonce)
if !isOK {
util.PutBytes(bs)
return 0, errors.New("failed to decrypt")
}
b = b[:0]
b = append(b, bs...)
c.session.updateNonce(&p.Nonce)
c.session.time = time.Now()
c.session.bytesRecvd += uint64(len(bs))
return len(b), nil
} }
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
return 0, nil if c.session == nil {
c.core.router.doAdmin(func() {
c.startSearch()
})
return 0, errors.New("invalid session")
}
defer util.PutBytes(b)
if !c.session.init {
// To prevent using empty session keys
return 0, errors.New("session not initialised")
}
// code isn't multithreaded so appending to this is safe
coords := c.session.coords
// Prepare the payload
payload, nonce := crypto.BoxSeal(&c.session.sharedSesKey, b, &c.session.myNonce)
defer util.PutBytes(payload)
p := wire_trafficPacket{
Coords: coords,
Handle: c.session.theirHandle,
Nonce: *nonce,
Payload: payload,
}
packet := p.encode()
c.session.bytesSent += uint64(len(b))
c.session.send <- packet
//c.session.core.router.out(packet)
return len(b), nil
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {

View File

@ -245,6 +245,12 @@ func (r *router) sendPacket(bs []byte) {
return return
} }
} }
searchCompleted := func(sinfo *sessionInfo, err error) {
if err != nil {
r.core.log.Debugln("DHT search failed:", err)
return
}
}
doSearch := func(packet []byte) { doSearch := func(packet []byte) {
var nodeID, mask *crypto.NodeID var nodeID, mask *crypto.NodeID
switch { switch {
@ -270,7 +276,7 @@ func (r *router) sendPacket(bs []byte) {
} }
sinfo, isIn := r.core.searches.searches[*nodeID] sinfo, isIn := r.core.searches.searches[*nodeID]
if !isIn { if !isIn {
sinfo = r.core.searches.newIterSearch(nodeID, mask) sinfo = r.core.searches.newIterSearch(nodeID, mask, searchCompleted)
} }
if packet != nil { if packet != nil {
sinfo.packet = packet sinfo.packet = packet

View File

@ -15,6 +15,7 @@ package yggdrasil
// Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there? // Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there?
import ( import (
"errors"
"sort" "sort"
"time" "time"
@ -38,6 +39,7 @@ type searchInfo struct {
packet []byte packet []byte
toVisit []*dhtInfo toVisit []*dhtInfo
visited map[crypto.NodeID]bool visited map[crypto.NodeID]bool
callback func(*sessionInfo, error)
} }
// This stores a map of active searches. // This stores a map of active searches.
@ -61,7 +63,7 @@ func (s *searches) init(core *Core) {
} }
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. // Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searchInfo { func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now() now := time.Now()
for dest, sinfo := range s.searches { for dest, sinfo := range s.searches {
if now.Sub(sinfo.time) > time.Minute { if now.Sub(sinfo.time) > time.Minute {
@ -72,6 +74,7 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searc
dest: *dest, dest: *dest,
mask: *mask, mask: *mask,
time: now.Add(-time.Second), time: now.Add(-time.Second),
callback: callback,
} }
s.searches[*dest] = &info s.searches[*dest] = &info
return &info return &info
@ -137,8 +140,9 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup // Dead end, do cleanup
delete(s.searches, sinfo.dest) delete(s.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
return return
} else { }
// Send to the next search target // Send to the next search target
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
@ -146,7 +150,6 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
s.core.dht.addCallback(&rq, s.handleDHTRes) s.core.dht.addCallback(&rq, s.handleDHTRes)
s.core.dht.ping(next, &sinfo.dest) s.core.dht.ping(next, &sinfo.dest)
} }
}
// If we've recenty sent a ping for this search, do nothing. // If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
@ -173,8 +176,8 @@ func (s *searches) continueSearch(sinfo *searchInfo) {
} }
// Calls create search, and initializes the iterative search parts of the struct before returning it. // Calls create search, and initializes the iterative search parts of the struct before returning it.
func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID) *searchInfo { func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
sinfo := s.createSearch(dest, mask) sinfo := s.createSearch(dest, mask, callback)
sinfo.toVisit = s.core.dht.lookup(dest, true) sinfo.toVisit = s.core.dht.lookup(dest, true)
sinfo.visited = make(map[crypto.NodeID]bool) sinfo.visited = make(map[crypto.NodeID]bool)
return sinfo return sinfo
@ -200,6 +203,7 @@ func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool {
sinfo = s.core.sessions.createSession(&res.Key) sinfo = s.core.sessions.createSession(&res.Key)
if sinfo == nil { if sinfo == nil {
// nil if the DHT search finished but the session wasn't allowed // nil if the DHT search finished but the session wasn't allowed
info.callback(nil, errors.New("session not allowed"))
return true return true
} }
_, isIn := s.core.sessions.getByTheirPerm(&res.Key) _, isIn := s.core.sessions.getByTheirPerm(&res.Key)
@ -211,6 +215,7 @@ func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool {
sinfo.coords = res.Coords sinfo.coords = res.Coords
sinfo.packet = info.packet sinfo.packet = info.packet
s.core.sessions.ping(sinfo) s.core.sessions.ping(sinfo)
info.callback(sinfo, nil)
// Cleanup // Cleanup
delete(s.searches, res.Dest) delete(s.searches, res.Dest)
return true return true

View File

@ -7,6 +7,7 @@ package yggdrasil
import ( import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"sync"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
@ -29,7 +30,9 @@ type sessionInfo struct {
theirHandle crypto.Handle theirHandle crypto.Handle
myHandle crypto.Handle myHandle crypto.Handle
theirNonce crypto.BoxNonce theirNonce crypto.BoxNonce
theirNonceMutex sync.RWMutex // protects the above
myNonce crypto.BoxNonce myNonce crypto.BoxNonce
myNonceMutex sync.RWMutex // protects the above
theirMTU uint16 theirMTU uint16
myMTU uint16 myMTU uint16
wasMTUFixed bool // Was the MTU fixed by a receive error? wasMTUFixed bool // Was the MTU fixed by a receive error?
@ -41,6 +44,7 @@ type sessionInfo struct {
recv chan *wire_trafficPacket recv chan *wire_trafficPacket
nonceMask uint64 nonceMask uint64
tstamp int64 // tstamp from their last session ping, replay attack mitigation tstamp int64 // tstamp from their last session ping, replay attack mitigation
tstampMutex int64 // protects the above
mtuTime time.Time // time myMTU was last changed mtuTime time.Time // time myMTU was last changed
pingTime time.Time // time the first ping was sent since the last received packet pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent pingSend time.Time // time the last ping was sent
@ -104,14 +108,11 @@ type sessions struct {
core *Core core *Core
reconfigure chan chan error reconfigure chan chan error
lastCleanup time.Time lastCleanup time.Time
// Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info
// Maps (secret) handle onto session info conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections
sinfos map[crypto.Handle]*sessionInfo byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle
// Maps mySesPub onto handle byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
byMySes map[crypto.BoxPubKey]*crypto.Handle
// Maps theirPermPub onto handle
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle
addrToPerm map[address.Address]*crypto.BoxPubKey addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey subnetToPerm map[address.Subnet]*crypto.BoxPubKey
} }