Merge branch 'develop' into circlecirpm

This commit is contained in:
Neil Alexander 2019-07-16 11:28:46 +01:00
commit 0c4e2cc41e
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
30 changed files with 636 additions and 562 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"bytes" "bytes"
"encoding/hex"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
@ -14,28 +15,28 @@ import (
"golang.org/x/text/encoding/unicode" "golang.org/x/text/encoding/unicode"
"github.com/gologme/log" "github.com/gologme/log"
gsyslog "github.com/hashicorp/go-syslog"
"github.com/hjson/hjson-go" "github.com/hjson/hjson-go"
"github.com/kardianos/minwinsvc" "github.com/kardianos/minwinsvc"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/yggdrasil-network/yggdrasil-go/src/admin" "github.com/yggdrasil-network/yggdrasil-go/src/admin"
"github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/tuntap" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
) )
type nodeConfig = config.NodeConfig
type Core = yggdrasil.Core
type node struct { type node struct {
core Core core yggdrasil.Core
state *config.NodeState
tuntap tuntap.TunAdapter tuntap tuntap.TunAdapter
multicast multicast.Multicast multicast multicast.Multicast
admin admin.AdminSocket admin admin.AdminSocket
} }
func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeConfig { func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *config.NodeConfig {
// Use a configuration file. If -useconf, the configuration will be read // Use a configuration file. If -useconf, the configuration will be read
// from stdin. If -useconffile, the configuration will be read from the // from stdin. If -useconffile, the configuration will be read from the
// filesystem. // filesystem.
@ -114,9 +115,10 @@ func main() {
autoconf := flag.Bool("autoconf", false, "automatic mode (dynamic IP, peer with IPv6 neighbors)") autoconf := flag.Bool("autoconf", false, "automatic mode (dynamic IP, peer with IPv6 neighbors)")
version := flag.Bool("version", false, "prints the version of this build") version := flag.Bool("version", false, "prints the version of this build")
logging := flag.String("logging", "info,warn,error", "comma-separated list of logging levels to enable") logging := flag.String("logging", "info,warn,error", "comma-separated list of logging levels to enable")
logto := flag.String("logto", "stdout", "file path to log to, \"syslog\" or \"stdout\"")
flag.Parse() flag.Parse()
var cfg *nodeConfig var cfg *config.NodeConfig
var err error var err error
switch { switch {
case *version: case *version:
@ -161,7 +163,23 @@ func main() {
return return
} }
// Create a new logger that logs output to stdout. // Create a new logger that logs output to stdout.
logger := log.New(os.Stdout, "", log.Flags()) var logger *log.Logger
switch *logto {
case "stdout":
logger = log.New(os.Stdout, "", log.Flags())
case "syslog":
if syslogger, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "DAEMON", yggdrasil.BuildName()); err == nil {
logger = log.New(syslogger, "", log.Flags())
}
default:
if logfd, err := os.OpenFile(*logto, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644); err == nil {
logger = log.New(logfd, "", log.Flags())
}
}
if logger == nil {
logger = log.New(os.Stdout, "", log.Flags())
logger.Warnln("Logging defaulting to stdout")
}
//logger.EnableLevel("error") //logger.EnableLevel("error")
//logger.EnableLevel("warn") //logger.EnableLevel("warn")
//logger.EnableLevel("info") //logger.EnableLevel("info")
@ -181,18 +199,20 @@ func main() {
n := node{} n := node{}
// Now start Yggdrasil - this starts the DHT, router, switch and other core // Now start Yggdrasil - this starts the DHT, router, switch and other core
// components needed for Yggdrasil to operate // components needed for Yggdrasil to operate
state, err := n.core.Start(cfg, logger) n.state, err = n.core.Start(cfg, logger)
if err != nil { if err != nil {
logger.Errorln("An error occurred during startup") logger.Errorln("An error occurred during startup")
panic(err) panic(err)
} }
// Register the session firewall gatekeeper function
n.core.SetSessionGatekeeper(n.sessionFirewall)
// Start the admin socket // Start the admin socket
n.admin.Init(&n.core, state, logger, nil) n.admin.Init(&n.core, n.state, logger, nil)
if err := n.admin.Start(); err != nil { if err := n.admin.Start(); err != nil {
logger.Errorln("An error occurred starting admin socket:", err) logger.Errorln("An error occurred starting admin socket:", err)
} }
// Start the multicast interface // Start the multicast interface
n.multicast.Init(&n.core, state, logger, nil) n.multicast.Init(&n.core, n.state, logger, nil)
if err := n.multicast.Start(); err != nil { if err := n.multicast.Start(); err != nil {
logger.Errorln("An error occurred starting multicast:", err) logger.Errorln("An error occurred starting multicast:", err)
} }
@ -200,7 +220,7 @@ func main() {
// Start the TUN/TAP interface // Start the TUN/TAP interface
if listener, err := n.core.ConnListen(); err == nil { if listener, err := n.core.ConnListen(); err == nil {
if dialer, err := n.core.ConnDialer(); err == nil { if dialer, err := n.core.ConnDialer(); err == nil {
n.tuntap.Init(state, logger, listener, dialer) n.tuntap.Init(n.state, logger, listener, dialer)
if err := n.tuntap.Start(); err != nil { if err := n.tuntap.Start(); err != nil {
logger.Errorln("An error occurred starting TUN/TAP:", err) logger.Errorln("An error occurred starting TUN/TAP:", err)
} }
@ -211,11 +231,6 @@ func main() {
} else { } else {
logger.Errorln("Unable to get Listener:", err) logger.Errorln("Unable to get Listener:", err)
} }
// The Stop function ensures that the TUN/TAP adapter is correctly shut down
// before the program exits.
defer func() {
n.core.Stop()
}()
// Make some nice output that tells us what our IPv6 address and subnet are. // Make some nice output that tells us what our IPv6 address and subnet are.
// This is just logged to stdout for the user. // This is just logged to stdout for the user.
address := n.core.Address() address := n.core.Address()
@ -227,15 +242,15 @@ func main() {
r := make(chan os.Signal, 1) r := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM) signal.Notify(c, os.Interrupt, syscall.SIGTERM)
signal.Notify(r, os.Interrupt, syscall.SIGHUP) signal.Notify(r, os.Interrupt, syscall.SIGHUP)
// Create a function to capture the service being stopped on Windows. // Capture the service being stopped on Windows.
winTerminate := func() { minwinsvc.SetOnExit(n.shutdown)
c <- os.Interrupt defer n.shutdown()
}
minwinsvc.SetOnExit(winTerminate)
// Wait for the terminate/interrupt signal. Once a signal is received, the // Wait for the terminate/interrupt signal. Once a signal is received, the
// deferred Stop function above will run which will shut down TUN/TAP. // deferred Stop function above will run which will shut down TUN/TAP.
for { for {
select { select {
case _ = <-c:
goto exit
case _ = <-r: case _ = <-r:
if *useconffile != "" { if *useconffile != "" {
cfg = readConfig(useconf, useconffile, normaliseconf) cfg = readConfig(useconf, useconffile, normaliseconf)
@ -245,9 +260,78 @@ func main() {
} else { } else {
logger.Errorln("Reloading config at runtime is only possible with -useconffile") logger.Errorln("Reloading config at runtime is only possible with -useconffile")
} }
case _ = <-c:
goto exit
} }
} }
exit: exit:
} }
func (n *node) shutdown() {
n.core.Stop()
n.admin.Stop()
n.multicast.Stop()
n.tuntap.Stop()
os.Exit(0)
}
func (n *node) sessionFirewall(pubkey *crypto.BoxPubKey, initiator bool) bool {
n.state.Mutex.RLock()
defer n.state.Mutex.RUnlock()
// Allow by default if the session firewall is disabled
if !n.state.Current.SessionFirewall.Enable {
return true
}
// Prepare for checking whitelist/blacklist
var box crypto.BoxPubKey
// Reject blacklisted nodes
for _, b := range n.state.Current.SessionFirewall.BlacklistEncryptionPublicKeys {
key, err := hex.DecodeString(b)
if err == nil {
copy(box[:crypto.BoxPubKeyLen], key)
if box == *pubkey {
return false
}
}
}
// Allow whitelisted nodes
for _, b := range n.state.Current.SessionFirewall.WhitelistEncryptionPublicKeys {
key, err := hex.DecodeString(b)
if err == nil {
copy(box[:crypto.BoxPubKeyLen], key)
if box == *pubkey {
return true
}
}
}
// Allow outbound sessions if appropriate
if n.state.Current.SessionFirewall.AlwaysAllowOutbound {
if initiator {
return true
}
}
// Look and see if the pubkey is that of a direct peer
var isDirectPeer bool
for _, peer := range n.core.GetPeers() {
if peer.PublicKey == *pubkey {
isDirectPeer = true
break
}
}
// Allow direct peers if appropriate
if n.state.Current.SessionFirewall.AllowFromDirect && isDirectPeer {
return true
}
// Allow remote nodes if appropriate
if n.state.Current.SessionFirewall.AllowFromRemote && !isDirectPeer {
return true
}
// Finally, default-deny if not matching any of the above rules
return false
}

View File

@ -1,4 +1,4 @@
# Yggdasil # Yggdrasil
Note: This is a very rough early draft. Note: This is a very rough early draft.

1
go.mod
View File

@ -3,6 +3,7 @@ module github.com/yggdrasil-network/yggdrasil-go
require ( require (
github.com/docker/libcontainer v2.2.1+incompatible github.com/docker/libcontainer v2.2.1+incompatible
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
github.com/hashicorp/go-syslog v1.0.0
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222
github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0
github.com/mitchellh/mapstructure v1.1.2 github.com/mitchellh/mapstructure v1.1.2

2
go.sum
View File

@ -2,6 +2,8 @@ github.com/docker/libcontainer v2.2.1+incompatible h1:++SbbkCw+X8vAd4j2gOCzZ2Nn7
github.com/docker/libcontainer v2.2.1+incompatible/go.mod h1:osvj61pYsqhNCMLGX31xr7klUBhHb/ZBuXS0o1Fvwbw= github.com/docker/libcontainer v2.2.1+incompatible/go.mod h1:osvj61pYsqhNCMLGX31xr7klUBhHb/ZBuXS0o1Fvwbw=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 h1:xmvkbxXDeN1ffWq8kvrhyqVYAO2aXuRBsbpxVTR+JyU= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 h1:xmvkbxXDeN1ffWq8kvrhyqVYAO2aXuRBsbpxVTR+JyU=
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio=
github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 h1:YnZmFjg0Nvk8851WTVWlqMC1ecJH07Ctz+Ezxx4u54g= github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 h1:YnZmFjg0Nvk8851WTVWlqMC1ecJH07Ctz+Ezxx4u54g=

View File

@ -381,11 +381,11 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
if r != nil { if r != nil {
send = Info{ send = Info{
"status": "error", "status": "error",
"error": "Unrecoverable error, possibly as a result of invalid input types or malformed syntax", "error": "Check your syntax and input types",
} }
a.log.Errorln("Admin socket error:", r) a.log.Debugln("Admin socket error:", r)
if err := encoder.Encode(&send); err != nil { if err := encoder.Encode(&send); err != nil {
a.log.Errorln("Admin socket JSON encode error:", err) a.log.Debugln("Admin socket JSON encode error:", err)
} }
conn.Close() conn.Close()
} }
@ -407,13 +407,14 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
send["request"] = recv send["request"] = recv
send["status"] = "error" send["status"] = "error"
n := strings.ToLower(recv["request"].(string))
if _, ok := recv["request"]; !ok { if _, ok := recv["request"]; !ok {
send["error"] = "No request sent" send["error"] = "No request sent"
break goto respond
} }
n := strings.ToLower(recv["request"].(string)) if h, ok := a.handlers[n]; ok {
if h, ok := a.handlers[strings.ToLower(n)]; ok {
// Check that we have all the required arguments // Check that we have all the required arguments
for _, arg := range h.args { for _, arg := range h.args {
// An argument in [square brackets] is optional and not required, // An argument in [square brackets] is optional and not required,
@ -428,7 +429,7 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
"error": "Expected field missing: " + arg, "error": "Expected field missing: " + arg,
"expecting": arg, "expecting": arg,
} }
break goto respond
} }
} }
@ -439,16 +440,28 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
send["error"] = err.Error() send["error"] = err.Error()
if response != nil { if response != nil {
send["response"] = response send["response"] = response
goto respond
} }
} else { } else {
send["status"] = "success" send["status"] = "success"
if response != nil { if response != nil {
send["response"] = response send["response"] = response
goto respond
} }
} }
} else {
// Start with a clean response on each request, which defaults to an error
// state. If a handler is found below then this will be overwritten
send = Info{
"request": recv,
"status": "error",
"error": fmt.Sprintf("Unknown action '%s', try 'list' for help", recv["request"].(string)),
}
goto respond
} }
// Send the response back // Send the response back
respond:
if err := encoder.Encode(&send); err != nil { if err := encoder.Encode(&send); err != nil {
return return
} }

View File

@ -102,7 +102,7 @@ func GenerateConfig() *NodeConfig {
cfg.Peers = []string{} cfg.Peers = []string{}
cfg.InterfacePeers = map[string][]string{} cfg.InterfacePeers = map[string][]string{}
cfg.AllowedEncryptionPublicKeys = []string{} cfg.AllowedEncryptionPublicKeys = []string{}
cfg.MulticastInterfaces = []string{".*"} cfg.MulticastInterfaces = defaults.GetDefaults().DefaultMulticastInterfaces
cfg.IfName = defaults.GetDefaults().DefaultIfName cfg.IfName = defaults.GetDefaults().DefaultIfName
cfg.IfMTU = defaults.GetDefaults().DefaultIfMTU cfg.IfMTU = defaults.GetDefaults().DefaultIfMTU
cfg.IfTAPMode = defaults.GetDefaults().DefaultIfTAPMode cfg.IfTAPMode = defaults.GetDefaults().DefaultIfTAPMode

View File

@ -10,6 +10,9 @@ type platformDefaultParameters struct {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile string DefaultConfigFile string
// Multicast interfaces
DefaultMulticastInterfaces []string
// TUN/TAP // TUN/TAP
MaximumIfMTU int MaximumIfMTU int
DefaultIfMTU int DefaultIfMTU int

View File

@ -12,6 +12,12 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
"en.*",
"bridge.*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 65535, MaximumIfMTU: 65535,
DefaultIfMTU: 65535, DefaultIfMTU: 65535,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 32767, MaximumIfMTU: 32767,
DefaultIfMTU: 32767, DefaultIfMTU: 32767,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 65535, MaximumIfMTU: 65535,
DefaultIfMTU: 65535, DefaultIfMTU: 65535,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 9000, MaximumIfMTU: 9000,
DefaultIfMTU: 9000, DefaultIfMTU: 9000,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 16384, MaximumIfMTU: 16384,
DefaultIfMTU: 16384, DefaultIfMTU: 16384,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "/etc/yggdrasil.conf", DefaultConfigFile: "/etc/yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 65535, MaximumIfMTU: 65535,
DefaultIfMTU: 65535, DefaultIfMTU: 65535,

View File

@ -12,6 +12,11 @@ func GetDefaults() platformDefaultParameters {
// Configuration (used for yggdrasilctl) // Configuration (used for yggdrasilctl)
DefaultConfigFile: "C:\\Program Files\\Yggdrasil\\yggdrasil.conf", DefaultConfigFile: "C:\\Program Files\\Yggdrasil\\yggdrasil.conf",
// Multicast interfaces
DefaultMulticastInterfaces: []string{
".*",
},
// TUN/TAP // TUN/TAP
MaximumIfMTU: 65535, MaximumIfMTU: 65535,
DefaultIfMTU: 65535, DefaultIfMTU: 65535,

View File

@ -5,7 +5,7 @@ import "github.com/yggdrasil-network/yggdrasil-go/src/admin"
func (m *Multicast) SetupAdminHandlers(a *admin.AdminSocket) { func (m *Multicast) SetupAdminHandlers(a *admin.AdminSocket) {
a.AddHandler("getMulticastInterfaces", []string{}, func(in admin.Info) (admin.Info, error) { a.AddHandler("getMulticastInterfaces", []string{}, func(in admin.Info) (admin.Info, error) {
var intfs []string var intfs []string
for _, v := range m.interfaces() { for _, v := range m.Interfaces() {
intfs = append(intfs, v.Name) intfs = append(intfs, v.Name)
} }
return admin.Info{"multicast_interfaces": intfs}, nil return admin.Info{"multicast_interfaces": intfs}, nil

View File

@ -19,14 +19,14 @@ import (
// configured multicast interface, Yggdrasil will attempt to peer with that node // configured multicast interface, Yggdrasil will attempt to peer with that node
// automatically. // automatically.
type Multicast struct { type Multicast struct {
core *yggdrasil.Core core *yggdrasil.Core
config *config.NodeState config *config.NodeState
log *log.Logger log *log.Logger
reconfigure chan chan error sock *ipv6.PacketConn
sock *ipv6.PacketConn groupAddr string
groupAddr string listeners map[string]*yggdrasil.TcpListener
listeners map[string]*yggdrasil.TcpListener listenPort uint16
listenPort uint16 isOpen bool
} }
// Init prepares the multicast interface for use. // Init prepares the multicast interface for use.
@ -34,25 +34,10 @@ func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log
m.core = core m.core = core
m.config = state m.config = state
m.log = log m.log = log
m.reconfigure = make(chan chan error, 1)
m.listeners = make(map[string]*yggdrasil.TcpListener) m.listeners = make(map[string]*yggdrasil.TcpListener)
current, _ := m.config.Get() current, _ := m.config.Get()
m.listenPort = current.LinkLocalTCPPort m.listenPort = current.LinkLocalTCPPort
go func() {
for {
e := <-m.reconfigure
// There's nothing particularly to do here because the multicast module
// already consults the config.NodeState when enumerating multicast
// interfaces on each pass. We just need to return nil so that the
// reconfiguration doesn't block indefinitely
e <- nil
}
}()
m.groupAddr = "[ff02::114]:9001" m.groupAddr = "[ff02::114]:9001"
// Check if we've been given any expressions
if count := len(m.interfaces()); count != 0 {
m.log.Infoln("Found", count, "multicast interface(s)")
}
return nil return nil
} }
@ -60,37 +45,35 @@ func (m *Multicast) Init(core *yggdrasil.Core, state *config.NodeState, log *log
// listen for multicast beacons from other hosts and will advertise multicast // listen for multicast beacons from other hosts and will advertise multicast
// beacons out to the network. // beacons out to the network.
func (m *Multicast) Start() error { func (m *Multicast) Start() error {
current, _ := m.config.Get() addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
if len(current.MulticastInterfaces) == 0 { if err != nil {
m.log.Infoln("Multicast discovery is disabled") return err
} else {
m.log.Infoln("Multicast discovery is enabled")
addr, err := net.ResolveUDPAddr("udp", m.groupAddr)
if err != nil {
return err
}
listenString := fmt.Sprintf("[::]:%v", addr.Port)
lc := net.ListenConfig{
Control: m.multicastReuse,
}
conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
if err != nil {
return err
}
m.sock = ipv6.NewPacketConn(conn)
if err = m.sock.SetControlMessage(ipv6.FlagDst, true); err != nil {
// Windows can't set this flag, so we need to handle it in other ways
}
go m.multicastStarted()
go m.listen()
go m.announce()
} }
listenString := fmt.Sprintf("[::]:%v", addr.Port)
lc := net.ListenConfig{
Control: m.multicastReuse,
}
conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
if err != nil {
return err
}
m.sock = ipv6.NewPacketConn(conn)
if err = m.sock.SetControlMessage(ipv6.FlagDst, true); err != nil {
// Windows can't set this flag, so we need to handle it in other ways
}
m.isOpen = true
go m.multicastStarted()
go m.listen()
go m.announce()
return nil return nil
} }
// Stop is not implemented for multicast yet. // Stop is not implemented for multicast yet.
func (m *Multicast) Stop() error { func (m *Multicast) Stop() error {
m.isOpen = false
m.sock.Close()
return nil return nil
} }
@ -99,37 +82,19 @@ func (m *Multicast) Stop() error {
// needed. // needed.
func (m *Multicast) UpdateConfig(config *config.NodeConfig) { func (m *Multicast) UpdateConfig(config *config.NodeConfig) {
m.log.Debugln("Reloading multicast configuration...") m.log.Debugln("Reloading multicast configuration...")
m.config.Replace(*config) m.config.Replace(*config)
m.log.Infoln("Multicast configuration reloaded successfully")
errors := 0
components := []chan chan error{
m.reconfigure,
}
for _, component := range components {
response := make(chan error)
component <- response
if err := <-response; err != nil {
m.log.Errorln(err)
errors++
}
}
if errors > 0 {
m.log.Warnln(errors, "multicast module(s) reported errors during configuration reload")
} else {
m.log.Infoln("Multicast configuration reloaded successfully")
}
} }
func (m *Multicast) interfaces() map[string]net.Interface { // GetInterfaces returns the currently known/enabled multicast interfaces. It is
// expected that UpdateInterfaces has been called at least once before calling
// this method.
func (m *Multicast) Interfaces() map[string]net.Interface {
interfaces := make(map[string]net.Interface)
// Get interface expressions from config // Get interface expressions from config
current, _ := m.config.Get() current, _ := m.config.Get()
exprs := current.MulticastInterfaces exprs := current.MulticastInterfaces
// Ask the system for network interfaces // Ask the system for network interfaces
interfaces := make(map[string]net.Interface)
allifaces, err := net.Interfaces() allifaces, err := net.Interfaces()
if err != nil { if err != nil {
panic(err) panic(err)
@ -173,7 +138,7 @@ func (m *Multicast) announce() {
panic(err) panic(err)
} }
for { for {
interfaces := m.interfaces() interfaces := m.Interfaces()
// There might be interfaces that we configured listeners for but are no // There might be interfaces that we configured listeners for but are no
// longer up - if that's the case then we should stop the listeners // longer up - if that's the case then we should stop the listeners
for name, listener := range m.listeners { for name, listener := range m.listeners {
@ -285,6 +250,9 @@ func (m *Multicast) listen() {
for { for {
nBytes, rcm, fromAddr, err := m.sock.ReadFrom(bs) nBytes, rcm, fromAddr, err := m.sock.ReadFrom(bs)
if err != nil { if err != nil {
if !m.isOpen {
return
}
panic(err) panic(err)
} }
if rcm != nil { if rcm != nil {
@ -307,9 +275,11 @@ func (m *Multicast) listen() {
if addr.IP.String() != from.IP.String() { if addr.IP.String() != from.IP.String() {
continue continue
} }
addr.Zone = "" if _, ok := m.Interfaces()[from.Zone]; ok {
if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil { addr.Zone = ""
m.log.Debugln("Call from multicast failed:", err) if err := m.core.CallPeer("tcp://"+addr.String(), from.Zone); err != nil {
m.log.Debugln("Call from multicast failed:", err)
}
} }
} }
} }

View File

@ -35,12 +35,12 @@ func (m *Multicast) multicastStarted() {
if awdlGoroutineStarted { if awdlGoroutineStarted {
return return
} }
m.log.Infoln("Multicast discovery will wake up AWDL if required")
awdlGoroutineStarted = true awdlGoroutineStarted = true
for { for {
C.StopAWDLBrowsing() C.StopAWDLBrowsing()
for _, intf := range m.interfaces() { for intf := range m.Interfaces() {
if intf.Name == "awdl0" { if intf == "awdl0" {
m.log.Infoln("Multicast discovery is using AWDL discovery")
C.StartAWDLBrowsing() C.StartAWDLBrowsing()
break break
} }

View File

@ -48,19 +48,21 @@ func (c *cryptokey) init(tun *TunAdapter) {
} }
}() }()
c.tun.log.Debugln("Configuring CKR...")
if err := c.configure(); err != nil { if err := c.configure(); err != nil {
c.tun.log.Errorln("CKR configuration failed:", err) c.tun.log.Errorln("CKR configuration failed:", err)
} else {
c.tun.log.Debugln("CKR configured")
} }
} }
// Configure the CKR routes - this must only ever be called from the router // Configure the CKR routes - this must only ever be called from the router
// goroutine, e.g. through router.doAdmin // goroutine, e.g. through router.doAdmin
func (c *cryptokey) configure() error { func (c *cryptokey) configure() error {
c.tun.config.Mutex.RLock() current, _ := c.tun.config.Get()
defer c.tun.config.Mutex.RUnlock()
// Set enabled/disabled state // Set enabled/disabled state
c.setEnabled(c.tun.config.Current.TunnelRouting.Enable) c.setEnabled(current.TunnelRouting.Enable)
// Clear out existing routes // Clear out existing routes
c.mutexroutes.Lock() c.mutexroutes.Lock()
@ -69,14 +71,14 @@ func (c *cryptokey) configure() error {
c.mutexroutes.Unlock() c.mutexroutes.Unlock()
// Add IPv6 routes // Add IPv6 routes
for ipv6, pubkey := range c.tun.config.Current.TunnelRouting.IPv6Destinations { for ipv6, pubkey := range current.TunnelRouting.IPv6Destinations {
if err := c.addRoute(ipv6, pubkey); err != nil { if err := c.addRoute(ipv6, pubkey); err != nil {
return err return err
} }
} }
// Add IPv4 routes // Add IPv4 routes
for ipv4, pubkey := range c.tun.config.Current.TunnelRouting.IPv4Destinations { for ipv4, pubkey := range current.TunnelRouting.IPv4Destinations {
if err := c.addRoute(ipv4, pubkey); err != nil { if err := c.addRoute(ipv4, pubkey); err != nil {
return err return err
} }
@ -90,7 +92,7 @@ func (c *cryptokey) configure() error {
// Add IPv6 sources // Add IPv6 sources
c.ipv6sources = make([]net.IPNet, 0) c.ipv6sources = make([]net.IPNet, 0)
for _, source := range c.tun.config.Current.TunnelRouting.IPv6Sources { for _, source := range current.TunnelRouting.IPv6Sources {
if err := c.addSourceSubnet(source); err != nil { if err := c.addSourceSubnet(source); err != nil {
return err return err
} }
@ -98,7 +100,7 @@ func (c *cryptokey) configure() error {
// Add IPv4 sources // Add IPv4 sources
c.ipv4sources = make([]net.IPNet, 0) c.ipv4sources = make([]net.IPNet, 0)
for _, source := range c.tun.config.Current.TunnelRouting.IPv4Sources { for _, source := range current.TunnelRouting.IPv4Sources {
if err := c.addSourceSubnet(source); err != nil { if err := c.addSourceSubnet(source); err != nil {
return err return err
} }

View File

@ -13,6 +13,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"net" "net"
"sync"
"time" "time"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
@ -21,19 +22,18 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
) )
type macAddress [6]byte
const len_ETHER = 14 const len_ETHER = 14
type ICMPv6 struct { type ICMPv6 struct {
tun *TunAdapter tun *TunAdapter
mylladdr net.IP mylladdr net.IP
mymac macAddress mymac net.HardwareAddr
peermacs map[address.Address]neighbor peermacs map[address.Address]neighbor
peermacsmutex sync.RWMutex
} }
type neighbor struct { type neighbor struct {
mac macAddress mac net.HardwareAddr
learned bool learned bool
lastadvertisement time.Time lastadvertisement time.Time
lastsolicitation time.Time lastsolicitation time.Time
@ -61,10 +61,12 @@ func ipv6Header_Marshal(h *ipv6.Header) ([]byte, error) {
// addresses. // addresses.
func (i *ICMPv6) Init(t *TunAdapter) { func (i *ICMPv6) Init(t *TunAdapter) {
i.tun = t i.tun = t
i.peermacsmutex.Lock()
i.peermacs = make(map[address.Address]neighbor) i.peermacs = make(map[address.Address]neighbor)
i.peermacsmutex.Unlock()
// Our MAC address and link-local address // Our MAC address and link-local address
i.mymac = macAddress{ i.mymac = net.HardwareAddr{
0x02, 0x00, 0x00, 0x00, 0x00, 0x02} 0x02, 0x00, 0x00, 0x00, 0x00, 0x02}
i.mylladdr = net.IP{ i.mylladdr = net.IP{
0xFE, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFE, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
@ -181,16 +183,30 @@ func (i *ICMPv6) UnmarshalPacket(datain []byte, datamac *[]byte) ([]byte, error)
if datamac != nil { if datamac != nil {
var addr address.Address var addr address.Address
var target address.Address var target address.Address
var mac macAddress mac := net.HardwareAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
copy(addr[:], ipv6Header.Src[:]) copy(addr[:], ipv6Header.Src[:])
copy(target[:], datain[48:64]) copy(target[:], datain[48:64])
copy(mac[:], (*datamac)[:]) copy(mac[:], (*datamac)[:])
// fmt.Printf("Learning peer MAC %x for %x\n", mac, target) i.peermacsmutex.Lock()
neighbor := i.peermacs[target] neighbor := i.peermacs[target]
neighbor.mac = mac neighbor.mac = mac
neighbor.learned = true neighbor.learned = true
neighbor.lastadvertisement = time.Now() neighbor.lastadvertisement = time.Now()
i.peermacs[target] = neighbor i.peermacs[target] = neighbor
i.peermacsmutex.Unlock()
i.tun.log.Debugln("Learned peer MAC", mac.String(), "for", net.IP(target[:]).String())
/*
i.tun.log.Debugln("Peer MAC table:")
i.peermacsmutex.RLock()
for t, n := range i.peermacs {
if n.learned {
i.tun.log.Debugln("- Target", net.IP(t[:]).String(), "has MAC", n.mac.String())
} else {
i.tun.log.Debugln("- Target", net.IP(t[:]).String(), "is not learned yet")
}
}
i.peermacsmutex.RUnlock()
*/
} }
return nil, errors.New("No response needed") return nil, errors.New("No response needed")
} }
@ -201,7 +217,7 @@ func (i *ICMPv6) UnmarshalPacket(datain []byte, datamac *[]byte) ([]byte, error)
// Creates an ICMPv6 packet based on the given icmp.MessageBody and other // Creates an ICMPv6 packet based on the given icmp.MessageBody and other
// parameters, complete with ethernet and IP headers, which can be written // parameters, complete with ethernet and IP headers, which can be written
// directly to a TAP adapter. // directly to a TAP adapter.
func (i *ICMPv6) CreateICMPv6L2(dstmac macAddress, dst net.IP, src net.IP, mtype ipv6.ICMPType, mcode int, mbody icmp.MessageBody) ([]byte, error) { func (i *ICMPv6) CreateICMPv6L2(dstmac net.HardwareAddr, dst net.IP, src net.IP, mtype ipv6.ICMPType, mcode int, mbody icmp.MessageBody) ([]byte, error) {
// Pass through to CreateICMPv6 // Pass through to CreateICMPv6
ipv6packet, err := CreateICMPv6(dst, src, mtype, mcode, mbody) ipv6packet, err := CreateICMPv6(dst, src, mtype, mcode, mbody)
if err != nil { if err != nil {
@ -264,13 +280,54 @@ func CreateICMPv6(dst net.IP, src net.IP, mtype ipv6.ICMPType, mcode int, mbody
return responsePacket, nil return responsePacket, nil
} }
func (i *ICMPv6) CreateNDPL2(dst address.Address) ([]byte, error) { func (i *ICMPv6) Solicit(addr address.Address) {
retries := 5
for retries > 0 {
retries--
i.peermacsmutex.RLock()
if n, ok := i.peermacs[addr]; ok && n.learned {
i.tun.log.Debugln("MAC learned for", net.IP(addr[:]).String())
i.peermacsmutex.RUnlock()
return
}
i.peermacsmutex.RUnlock()
i.tun.log.Debugln("Sending neighbor solicitation for", net.IP(addr[:]).String())
i.peermacsmutex.Lock()
if n, ok := i.peermacs[addr]; !ok {
i.peermacs[addr] = neighbor{
lastsolicitation: time.Now(),
}
} else {
n.lastsolicitation = time.Now()
}
i.peermacsmutex.Unlock()
request, err := i.createNDPL2(addr)
if err != nil {
panic(err)
}
if _, err := i.tun.iface.Write(request); err != nil {
panic(err)
}
i.tun.log.Debugln("Sent neighbor solicitation for", net.IP(addr[:]).String())
time.Sleep(time.Second)
}
}
func (i *ICMPv6) getNeighbor(addr address.Address) (neighbor, bool) {
i.peermacsmutex.RLock()
defer i.peermacsmutex.RUnlock()
n, ok := i.peermacs[addr]
return n, ok
}
func (i *ICMPv6) createNDPL2(dst address.Address) ([]byte, error) {
// Create the ND payload // Create the ND payload
var payload [28]byte var payload [28]byte
copy(payload[:4], []byte{0x00, 0x00, 0x00, 0x00}) copy(payload[:4], []byte{0x00, 0x00, 0x00, 0x00}) // Flags
copy(payload[4:20], dst[:]) copy(payload[4:20], dst[:]) // Destination
copy(payload[20:22], []byte{0x01, 0x01}) copy(payload[20:22], []byte{0x01, 0x01}) // Type & length
copy(payload[22:28], i.mymac[:6]) copy(payload[22:28], i.mymac[:6]) // Link layer address
// Create the ICMPv6 solicited-node address // Create the ICMPv6 solicited-node address
var dstaddr address.Address var dstaddr address.Address
@ -281,7 +338,7 @@ func (i *ICMPv6) CreateNDPL2(dst address.Address) ([]byte, error) {
copy(dstaddr[13:], dst[13:16]) copy(dstaddr[13:], dst[13:16])
// Create the multicast MAC // Create the multicast MAC
var dstmac macAddress dstmac := net.HardwareAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
copy(dstmac[:2], []byte{0x33, 0x33}) copy(dstmac[:2], []byte{0x33, 0x33})
copy(dstmac[2:6], dstaddr[12:16]) copy(dstmac[2:6], dstaddr[12:16])
@ -293,9 +350,6 @@ func (i *ICMPv6) CreateNDPL2(dst address.Address) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
neighbor := i.peermacs[dstaddr]
neighbor.lastsolicitation = time.Now()
i.peermacs[dstaddr] = neighbor
return requestPacket, nil return requestPacket, nil
} }
@ -319,10 +373,10 @@ func (i *ICMPv6) HandleNDP(in []byte) ([]byte, error) {
// Create our NDP message body response // Create our NDP message body response
body := make([]byte, 28) body := make([]byte, 28)
binary.BigEndian.PutUint32(body[:4], uint32(0x20000000)) binary.BigEndian.PutUint32(body[:4], uint32(0x40000000)) // Flags
copy(body[4:20], in[8:24]) // Target address copy(body[4:20], in[8:24]) // Target address
body[20] = uint8(2) body[20] = uint8(2) // Type: Target link-layer address
body[21] = uint8(1) body[21] = uint8(1) // Length: 1x address (8 bytes)
copy(body[22:28], i.mymac[:6]) copy(body[22:28], i.mymac[:6])
// Send it back // Send it back

View File

@ -3,6 +3,7 @@ package tuntap
import ( import (
"bytes" "bytes"
"errors" "errors"
"net"
"time" "time"
"github.com/songgao/packets/ethernet" "github.com/songgao/packets/ethernet"
@ -40,22 +41,13 @@ func (tun *TunAdapter) writer() error {
return errors.New("Invalid address family") return errors.New("Invalid address family")
} }
sendndp := func(dstAddr address.Address) { sendndp := func(dstAddr address.Address) {
neigh, known := tun.icmpv6.peermacs[dstAddr] neigh, known := tun.icmpv6.getNeighbor(dstAddr)
known = known && (time.Since(neigh.lastsolicitation).Seconds() < 30) known = known && (time.Since(neigh.lastsolicitation).Seconds() < 30)
if !known { if !known {
request, err := tun.icmpv6.CreateNDPL2(dstAddr) tun.icmpv6.Solicit(dstAddr)
if err != nil {
panic(err)
}
if _, err := tun.iface.Write(request); err != nil {
panic(err)
}
tun.icmpv6.peermacs[dstAddr] = neighbor{
lastsolicitation: time.Now(),
}
} }
} }
var peermac macAddress peermac := net.HardwareAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
var peerknown bool var peerknown bool
if b[0]&0xf0 == 0x40 { if b[0]&0xf0 == 0x40 {
dstAddr = tun.addr dstAddr = tun.addr
@ -64,15 +56,20 @@ func (tun *TunAdapter) writer() error {
dstAddr = tun.addr dstAddr = tun.addr
} }
} }
if neighbor, ok := tun.icmpv6.peermacs[dstAddr]; ok && neighbor.learned { if neighbor, ok := tun.icmpv6.getNeighbor(dstAddr); ok && neighbor.learned {
// If we've learned the MAC of a 300::/7 address, for example, or a CKR
// address, use the MAC address of that
peermac = neighbor.mac peermac = neighbor.mac
peerknown = true peerknown = true
} else if neighbor, ok := tun.icmpv6.peermacs[tun.addr]; ok && neighbor.learned { } else if neighbor, ok := tun.icmpv6.getNeighbor(tun.addr); ok && neighbor.learned {
// Otherwise send directly to the MAC address of the host if that's
// known instead
peermac = neighbor.mac peermac = neighbor.mac
peerknown = true peerknown = true
sendndp(dstAddr)
} else { } else {
// Nothing has been discovered, try to discover the destination
sendndp(tun.addr) sendndp(tun.addr)
} }
if peerknown { if peerknown {
var proto ethernet.Ethertype var proto ethernet.Ethertype
@ -92,12 +89,17 @@ func (tun *TunAdapter) writer() error {
copy(frame[tun_ETHER_HEADER_LENGTH:], b[:n]) copy(frame[tun_ETHER_HEADER_LENGTH:], b[:n])
n += tun_ETHER_HEADER_LENGTH n += tun_ETHER_HEADER_LENGTH
w, err = tun.iface.Write(frame[:n]) w, err = tun.iface.Write(frame[:n])
} else {
tun.log.Errorln("TUN/TAP iface write error: no peer MAC known for", net.IP(dstAddr[:]).String(), "- dropping packet")
} }
} else { } else {
w, err = tun.iface.Write(b[:n]) w, err = tun.iface.Write(b[:n])
util.PutBytes(b) util.PutBytes(b)
} }
if err != nil { if err != nil {
if !tun.isOpen {
return err
}
tun.log.Errorln("TUN/TAP iface write error:", err) tun.log.Errorln("TUN/TAP iface write error:", err)
continue continue
} }
@ -114,6 +116,9 @@ func (tun *TunAdapter) reader() error {
// Wait for a packet to be delivered to us through the TUN/TAP adapter // Wait for a packet to be delivered to us through the TUN/TAP adapter
n, err := tun.iface.Read(bs) n, err := tun.iface.Read(bs)
if err != nil { if err != nil {
if !tun.isOpen {
return err
}
panic(err) panic(err)
} }
if n == 0 { if n == 0 {
@ -134,7 +139,7 @@ func (tun *TunAdapter) reader() error {
} }
// Then offset the buffer so that we can now just treat it as an IP // Then offset the buffer so that we can now just treat it as an IP
// packet from now on // packet from now on
bs = bs[offset:] bs = bs[offset:] // FIXME this breaks bs for the next read and means n is the wrong value
} }
// From the IP header, work out what our source and destination addresses // From the IP header, work out what our source and destination addresses
// and node IDs are. We will need these in order to work out where to send // and node IDs are. We will need these in order to work out where to send
@ -178,7 +183,7 @@ func (tun *TunAdapter) reader() error {
// Unknown address length or protocol, so drop the packet and ignore it // Unknown address length or protocol, so drop the packet and ignore it
continue continue
} }
if !tun.ckr.isValidSource(srcAddr, addrlen) { if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) {
// The packet had a source address that doesn't belong to us or our // The packet had a source address that doesn't belong to us or our
// configured crypto-key routing source subnets // configured crypto-key routing source subnets
continue continue
@ -225,21 +230,46 @@ func (tun *TunAdapter) reader() error {
panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen")
} }
// Dial to the remote node // Dial to the remote node
if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { packet := append(util.GetBytes(), bs[:n]...)
// We've been given a connection so prepare the session wrapper go func() {
if s, err := tun.wrap(conn); err != nil { // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes
// Something went wrong when storing the connection, typically that tun.mutex.Lock()
// something already exists for this address or subnet _, known := tun.dials[*dstNodeID]
tun.log.Debugln("TUN/TAP iface wrap:", err) tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet)
} else { for len(tun.dials[*dstNodeID]) > 32 {
// Update our reference to the connection util.PutBytes(tun.dials[*dstNodeID][0])
session, isIn = s, true tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:]
} }
} else { tun.mutex.Unlock()
// We weren't able to dial for some reason so there's no point in if known {
// continuing this iteration - skip to the next one return
continue }
} var tc *tunConn
if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil {
// We've been given a connection so prepare the session wrapper
if tc, err = tun.wrap(conn); err != nil {
// Something went wrong when storing the connection, typically that
// something already exists for this address or subnet
tun.log.Debugln("TUN/TAP iface wrap:", err)
}
}
tun.mutex.Lock()
packets := tun.dials[*dstNodeID]
delete(tun.dials, *dstNodeID)
tun.mutex.Unlock()
if tc != nil {
for _, packet := range packets {
select {
case tc.send <- packet:
default:
util.PutBytes(packet)
}
}
}
}()
// While the dial is going on we can't do much else
// continuing this iteration - skip to the next one
continue
} }
// If we have a connection now, try writing to it // If we have a connection now, try writing to it
if isIn && session != nil { if isIn && session != nil {

View File

@ -14,7 +14,6 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
"github.com/gologme/log" "github.com/gologme/log"
"github.com/yggdrasil-network/water" "github.com/yggdrasil-network/water"
@ -49,6 +48,7 @@ type TunAdapter struct {
mutex sync.RWMutex // Protects the below mutex sync.RWMutex // Protects the below
addrToConn map[address.Address]*tunConn addrToConn map[address.Address]*tunConn
subnetToConn map[address.Subnet]*tunConn subnetToConn map[address.Subnet]*tunConn
dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes
isOpen bool isOpen bool
} }
@ -113,18 +113,18 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener
tun.dialer = dialer tun.dialer = dialer
tun.addrToConn = make(map[address.Address]*tunConn) tun.addrToConn = make(map[address.Address]*tunConn)
tun.subnetToConn = make(map[address.Subnet]*tunConn) tun.subnetToConn = make(map[address.Subnet]*tunConn)
tun.dials = make(map[crypto.NodeID][][]byte)
} }
// Start the setup process for the TUN/TAP adapter. If successful, starts the // Start the setup process for the TUN/TAP adapter. If successful, starts the
// read/write goroutines to handle packets on that interface. // read/write goroutines to handle packets on that interface.
func (tun *TunAdapter) Start() error { func (tun *TunAdapter) Start() error {
tun.config.Mutex.RLock() current, _ := tun.config.Get()
defer tun.config.Mutex.RUnlock()
if tun.config == nil || tun.listener == nil || tun.dialer == nil { if tun.config == nil || tun.listener == nil || tun.dialer == nil {
return errors.New("No configuration available to TUN/TAP") return errors.New("No configuration available to TUN/TAP")
} }
var boxPub crypto.BoxPubKey var boxPub crypto.BoxPubKey
boxPubHex, err := hex.DecodeString(tun.config.Current.EncryptionPublicKey) boxPubHex, err := hex.DecodeString(current.EncryptionPublicKey)
if err != nil { if err != nil {
return err return err
} }
@ -132,9 +132,9 @@ func (tun *TunAdapter) Start() error {
nodeID := crypto.GetNodeID(&boxPub) nodeID := crypto.GetNodeID(&boxPub)
tun.addr = *address.AddrForNodeID(nodeID) tun.addr = *address.AddrForNodeID(nodeID)
tun.subnet = *address.SubnetForNodeID(nodeID) tun.subnet = *address.SubnetForNodeID(nodeID)
tun.mtu = tun.config.Current.IfMTU tun.mtu = current.IfMTU
ifname := tun.config.Current.IfName ifname := current.IfName
iftapmode := tun.config.Current.IfTAPMode iftapmode := current.IfTAPMode
addr := fmt.Sprintf("%s/%d", net.IP(tun.addr[:]).String(), 8*len(address.GetPrefix())-1) addr := fmt.Sprintf("%s/%d", net.IP(tun.addr[:]).String(), 8*len(address.GetPrefix())-1)
if ifname != "none" { if ifname != "none" {
if err := tun.setup(ifname, iftapmode, addr, tun.mtu); err != nil { if err := tun.setup(ifname, iftapmode, addr, tun.mtu); err != nil {
@ -150,21 +150,6 @@ func (tun *TunAdapter) Start() error {
tun.send = make(chan []byte, 32) // TODO: is this a sensible value? tun.send = make(chan []byte, 32) // TODO: is this a sensible value?
tun.reconfigure = make(chan chan error) tun.reconfigure = make(chan chan error)
tun.mutex.Unlock() tun.mutex.Unlock()
if iftapmode {
go func() {
for {
if _, ok := tun.icmpv6.peermacs[tun.addr]; ok {
break
}
request, err := tun.icmpv6.CreateNDPL2(tun.addr)
if err != nil {
panic(err)
}
tun.send <- request
time.Sleep(time.Second)
}
}()
}
go func() { go func() {
for { for {
e := <-tun.reconfigure e := <-tun.reconfigure
@ -175,10 +160,23 @@ func (tun *TunAdapter) Start() error {
go tun.reader() go tun.reader()
go tun.writer() go tun.writer()
tun.icmpv6.Init(tun) tun.icmpv6.Init(tun)
if iftapmode {
go tun.icmpv6.Solicit(tun.addr)
}
tun.ckr.init(tun) tun.ckr.init(tun)
return nil return nil
} }
// Start the setup process for the TUN/TAP adapter. If successful, starts the
// read/write goroutines to handle packets on that interface.
func (tun *TunAdapter) Stop() error {
tun.isOpen = false
// TODO: we have nothing that cleanly stops all the various goroutines opened
// by TUN/TAP, e.g. readers/writers, sessions
tun.iface.Close()
return nil
}
// UpdateConfig updates the TUN/TAP module with the provided config.NodeConfig // UpdateConfig updates the TUN/TAP module with the provided config.NodeConfig
// and then signals the various module goroutines to reconfigure themselves if // and then signals the various module goroutines to reconfigure themselves if
// needed. // needed.
@ -235,6 +233,7 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
stop: make(chan struct{}), stop: make(chan struct{}),
alive: make(chan struct{}, 1), alive: make(chan struct{}, 1),
} }
c = &s
// Get the remote address and subnet of the other side // Get the remote address and subnet of the other side
remoteNodeID := conn.RemoteAddr() remoteNodeID := conn.RemoteAddr()
s.addr = *address.AddrForNodeID(&remoteNodeID) s.addr = *address.AddrForNodeID(&remoteNodeID)

View File

@ -31,18 +31,18 @@ func (tun *TunAdapter) setup(ifname string, iftapmode bool, addr string, mtu int
} }
// Disable/enable the interface to resets its configuration (invalidating iface) // Disable/enable the interface to resets its configuration (invalidating iface)
cmd := exec.Command("netsh", "interface", "set", "interface", iface.Name(), "admin=DISABLED") cmd := exec.Command("netsh", "interface", "set", "interface", iface.Name(), "admin=DISABLED")
tun.log.Printf("netsh command: %v", strings.Join(cmd.Args, " ")) tun.log.Debugln("netsh command:", strings.Join(cmd.Args, " "))
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
tun.log.Errorf("Windows netsh failed: %v.", err) tun.log.Errorln("Windows netsh failed:", err)
tun.log.Traceln(string(output)) tun.log.Traceln(string(output))
return err return err
} }
cmd = exec.Command("netsh", "interface", "set", "interface", iface.Name(), "admin=ENABLED") cmd = exec.Command("netsh", "interface", "set", "interface", iface.Name(), "admin=ENABLED")
tun.log.Printf("netsh command: %v", strings.Join(cmd.Args, " ")) tun.log.Debugln("netsh command:", strings.Join(cmd.Args, " "))
output, err = cmd.CombinedOutput() output, err = cmd.CombinedOutput()
if err != nil { if err != nil {
tun.log.Errorf("Windows netsh failed: %v.", err) tun.log.Errorln("Windows netsh failed:", err)
tun.log.Traceln(string(output)) tun.log.Traceln(string(output))
return err return err
} }
@ -71,10 +71,10 @@ func (tun *TunAdapter) setupMTU(mtu int) error {
fmt.Sprintf("interface=%s", tun.iface.Name()), fmt.Sprintf("interface=%s", tun.iface.Name()),
fmt.Sprintf("mtu=%d", mtu), fmt.Sprintf("mtu=%d", mtu),
"store=active") "store=active")
tun.log.Debugln("netsh command: %v", strings.Join(cmd.Args, " ")) tun.log.Debugln("netsh command:", strings.Join(cmd.Args, " "))
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
tun.log.Errorf("Windows netsh failed: %v.", err) tun.log.Errorln("Windows netsh failed:", err)
tun.log.Traceln(string(output)) tun.log.Traceln(string(output))
return err return err
} }
@ -88,10 +88,10 @@ func (tun *TunAdapter) setupAddress(addr string) error {
fmt.Sprintf("interface=%s", tun.iface.Name()), fmt.Sprintf("interface=%s", tun.iface.Name()),
fmt.Sprintf("addr=%s", addr), fmt.Sprintf("addr=%s", addr),
"store=active") "store=active")
tun.log.Debugln("netsh command: %v", strings.Join(cmd.Args, " ")) tun.log.Debugln("netsh command:", strings.Join(cmd.Args, " "))
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
tun.log.Errorf("Windows netsh failed: %v.", err) tun.log.Errorln("Windows netsh failed:", err)
tun.log.Traceln(string(output)) tun.log.Traceln(string(output))
return err return err
} }

View File

@ -211,16 +211,31 @@ func (c *Core) GetSessions() []Session {
var sessions []Session var sessions []Session
getSessions := func() { getSessions := func() {
for _, sinfo := range c.sessions.sinfos { for _, sinfo := range c.sessions.sinfos {
// TODO? skipped known but timed out sessions? var session Session
session := Session{ workerFunc := func() {
Coords: append([]byte{}, sinfo.coords...), session = Session{
MTU: sinfo.getMTU(), Coords: append([]byte{}, sinfo.coords...),
BytesSent: sinfo.bytesSent, MTU: sinfo.getMTU(),
BytesRecvd: sinfo.bytesRecvd, BytesSent: sinfo.bytesSent,
Uptime: time.Now().Sub(sinfo.timeOpened), BytesRecvd: sinfo.bytesRecvd,
WasMTUFixed: sinfo.wasMTUFixed, Uptime: time.Now().Sub(sinfo.timeOpened),
WasMTUFixed: sinfo.wasMTUFixed,
}
copy(session.PublicKey[:], sinfo.theirPermPub[:])
} }
copy(session.PublicKey[:], sinfo.theirPermPub[:]) var skip bool
func() {
defer func() {
if recover() != nil {
skip = true
}
}()
sinfo.doWorker(workerFunc)
}()
if skip {
continue
}
// TODO? skipped known but timed out sessions?
sessions = append(sessions, session) sessions = append(sessions, session)
} }
} }
@ -232,7 +247,7 @@ func (c *Core) GetSessions() []Session {
// from git, or returns "unknown" otherwise. // from git, or returns "unknown" otherwise.
func BuildName() string { func BuildName() string {
if buildName == "" { if buildName == "" {
return "unknown" return "yggdrasil"
} }
return buildName return buildName
} }
@ -395,6 +410,19 @@ func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInf
return NodeInfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString)) return NodeInfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString))
} }
// SetSessionGatekeeper allows you to configure a handler function for deciding
// whether a session should be allowed or not. The default session firewall is
// implemented in this way. The function receives the public key of the remote
// side and a boolean which is true if we initiated the session or false if we
// received an incoming session request. The function should return true to
// allow the session or false to reject it.
func (c *Core) SetSessionGatekeeper(f func(pubkey *crypto.BoxPubKey, initiator bool) bool) {
c.sessions.isAllowedMutex.Lock()
defer c.sessions.isAllowedMutex.Unlock()
c.sessions.isAllowedHandler = f
}
// SetLogger sets the output logger of the Yggdrasil node after startup. This // SetLogger sets the output logger of the Yggdrasil node after startup. This
// may be useful if you want to redirect the output later. // may be useful if you want to redirect the output later.
func (c *Core) SetLogger(log *log.Logger) { func (c *Core) SetLogger(log *log.Logger) {
@ -504,21 +532,14 @@ func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, err
rq := dhtReqKey{info.key, target} rq := dhtReqKey{info.key, target}
sendPing := func() { sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) { c.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }() resCh <- res
select {
case resCh <- res:
default:
}
}) })
c.dht.ping(&info, &target) c.dht.ping(&info, &target)
} }
c.router.doAdmin(sendPing) c.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
// TODO: do something better than the below... // TODO: do something better than the below...
for res := range resCh { res := <-resCh
if res != nil {
r := DHTRes{ r := DHTRes{
Coords: append([]byte{}, res.Coords...), Coords: append([]byte{}, res.Coords...),
} }

View File

@ -45,23 +45,18 @@ type Conn struct {
mutex sync.RWMutex mutex sync.RWMutex
closed bool closed bool
session *sessionInfo session *sessionInfo
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
searching atomic.Value // bool
searchwait chan struct{} // Never reset this, it's only used for the initial search
writebuf [][]byte // Packets to be sent if/when the search finishes
} }
// TODO func NewConn() that initializes additional fields as needed // TODO func NewConn() that initializes additional fields as needed
func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn {
conn := Conn{ conn := Conn{
core: core, core: core,
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
session: session, session: session,
searchwait: make(chan struct{}),
} }
conn.searching.Store(false)
return &conn return &conn
} }
@ -69,91 +64,44 @@ func (c *Conn) String() string {
return fmt.Sprintf("conn=%p", c) return fmt.Sprintf("conn=%p", c)
} }
// This method should only be called from the router goroutine // This should never be called from the router goroutine
func (c *Conn) startSearch() { func (c *Conn) search() error {
// The searchCompleted callback is given to the search var sinfo *searchInfo
searchCompleted := func(sinfo *sessionInfo, err error) { var isIn bool
defer c.searching.Store(false) c.core.router.doAdmin(func() { sinfo, isIn = c.core.searches.searches[*c.nodeID] })
// If the search failed for some reason, e.g. it hit a dead end or timed if !isIn {
// out, then do nothing done := make(chan struct{}, 1)
if err != nil { var sess *sessionInfo
c.core.log.Debugln(c.String(), "DHT search failed:", err) var err error
return searchCompleted := func(sinfo *sessionInfo, e error) {
sess = sinfo
err = e
// FIXME close can be called multiple times, do a non-blocking send instead
select {
case done <- struct{}{}:
default:
}
} }
// Take the connection mutex c.core.router.doAdmin(func() {
c.mutex.Lock() sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
defer c.mutex.Unlock() sinfo.continueSearch()
// Were we successfully given a sessionInfo pointer? })
if sinfo != nil { <-done
// Store it, and update the nodeID and nodeMask (which may have been c.session = sess
// wildcarded before now) with their complete counterparts if c.session == nil && err == nil {
c.core.log.Debugln(c.String(), "DHT search completed") panic("search failed but returned no error")
c.session = sinfo }
c.nodeID = crypto.GetNodeID(&sinfo.theirPermPub) if c.session != nil {
c.nodeID = crypto.GetNodeID(&c.session.theirPermPub)
for i := range c.nodeMask { for i := range c.nodeMask {
c.nodeMask[i] = 0xFF c.nodeMask[i] = 0xFF
} }
// Make sure that any blocks on read/write operations are lifted
defer func() { recover() }() // So duplicate searches don't panic
close(c.searchwait)
} else {
// No session was returned - this shouldn't really happen because we
// should always return an error reason if we don't return a session
panic("DHT search didn't return an error or a sessionInfo")
} }
if c.closed { return err
// Things were closed before the search returned
// Go ahead and close it again to make sure the session is cleaned up
go c.Close()
} else {
// Send any messages we may have buffered
var msgs [][]byte
msgs, c.writebuf = c.writebuf, nil
go func() {
for _, msg := range msgs {
c.Write(msg)
util.PutBytes(msg)
}
}()
}
}
// doSearch will be called below in response to one or more conditions
doSearch := func() {
c.searching.Store(true)
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
c.core.searches.continueSearch(sinfo)
}
// Take a copy of the session object, in case it changes later
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
if c.session == nil {
// No session object is present so previous searches, if we ran any, have
// not yielded a useful result (dead end, remote host not found)
doSearch()
} else { } else {
sinfo.worker <- func() { return errors.New("search already exists")
switch {
case !sinfo.init:
doSearch()
case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
doSearch()
} else {
c.core.sessions.ping(sinfo)
}
default: // Don't do anything, to keep traffic throttled
}
}
} }
return nil
} }
func getDeadlineTimer(value *atomic.Value) *time.Timer { func getDeadlineTimer(value *atomic.Value) *time.Timer {
@ -167,30 +115,9 @@ func getDeadlineTimer(value *atomic.Value) *time.Timer {
func (c *Conn) Read(b []byte) (int, error) { func (c *Conn) Read(b []byte) (int, error) {
// Take a copy of the session object // Take a copy of the session object
c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock()
timer := getDeadlineTimer(&c.readDeadline) timer := getDeadlineTimer(&c.readDeadline)
defer util.TimerStop(timer) defer util.TimerStop(timer)
// If there is a search in progress then wait for the result
if sinfo == nil {
// Wait for the search to complete
select {
case <-c.searchwait:
case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, 0}
}
// Retrieve our session info again
c.mutex.RLock()
sinfo = c.session
c.mutex.RUnlock()
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0, errors.New("search failed")
}
}
for { for {
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
select { select {
@ -232,6 +159,12 @@ func (c *Conn) Read(b []byte) (int, error) {
sinfo.bytesRecvd += uint64(len(b)) sinfo.bytesRecvd += uint64(len(b))
} }
// Hand over to the session worker // Hand over to the session worker
defer func() {
if recover() != nil {
err = errors.New("read failed, session already closed")
close(done)
}
}() // In case we're racing with a close
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
@ -253,32 +186,7 @@ func (c *Conn) Read(b []byte) (int, error) {
} }
func (c *Conn) Write(b []byte) (bytesWritten int, err error) { func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock()
// If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again
if sinfo == nil || !sinfo.init {
// Is a search already taking place?
if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) {
// No search was already taking place so start a new one
c.core.router.doAdmin(c.startSearch)
}
// Buffer the packet to be sent if/when the search is finished
c.mutex.Lock()
defer c.mutex.Unlock()
c.writebuf = append(c.writebuf, append(util.GetBytes(), b...))
for len(c.writebuf) > 32 {
util.PutBytes(c.writebuf[0])
c.writebuf = c.writebuf[1:]
}
return len(b), nil
} else {
// This triggers some session keepalive traffic
// FIXME this desparately needs to be refactored, since the ping case needlessly goes through the router goroutine just to have it pass a function to the session worker when it determines that a session already exists.
c.core.router.doAdmin(c.startSearch)
}
var packet []byte var packet []byte
done := make(chan struct{}) done := make(chan struct{})
written := len(b) written := len(b)
@ -301,11 +209,45 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
} }
packet = p.encode() packet = p.encode()
sinfo.bytesSent += uint64(len(b)) sinfo.bytesSent += uint64(len(b))
// The rest of this work is session keep-alive traffic
doSearch := func() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
sinfo.continueSearch()
}
go func() { c.core.router.admin <- routerWork }()
}
switch {
case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
doSearch()
} else {
sinfo.core.sessions.ping(sinfo)
}
case sinfo.reset && sinfo.pingTime.Before(sinfo.time):
sinfo.core.sessions.ping(sinfo)
default: // Don't do anything, to keep traffic throttled
}
} }
// Set up a timer so this doesn't block forever // Set up a timer so this doesn't block forever
timer := getDeadlineTimer(&c.writeDeadline) timer := getDeadlineTimer(&c.writeDeadline)
defer util.TimerStop(timer) defer util.TimerStop(timer)
// Hand over to the session worker // Hand over to the session worker
defer func() {
if recover() != nil {
err = errors.New("write failed, session already closed")
close(done)
}
}() // In case we're racing with a close
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
@ -326,8 +268,7 @@ func (c *Conn) Close() error {
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
c.session.close() c.core.router.doAdmin(c.session.close)
c.session = nil
} }
// This can't fail yet - TODO? // This can't fail yet - TODO?
c.closed = true c.closed = true

View File

@ -88,14 +88,14 @@ func (c *Core) addPeerLoop() {
// Add peers from the Peers section // Add peers from the Peers section
for _, peer := range current.Peers { for _, peer := range current.Peers {
c.AddPeer(peer, "") go c.AddPeer(peer, "")
time.Sleep(time.Second) time.Sleep(time.Second)
} }
// Add peers from the InterfacePeers section // Add peers from the InterfacePeers section
for intf, intfpeers := range current.InterfacePeers { for intf, intfpeers := range current.InterfacePeers {
for _, peer := range intfpeers { for _, peer := range intfpeers {
c.AddPeer(peer, intf) go c.AddPeer(peer, intf)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }

View File

@ -68,9 +68,9 @@ type dht struct {
core *Core core *Core
reconfigure chan chan error reconfigure chan chan error
nodeID crypto.NodeID nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar... // These next two could be replaced by a single linked list or similar...
table map[crypto.NodeID]*dhtInfo table map[crypto.NodeID]*dhtInfo
imp []*dhtInfo imp []*dhtInfo
@ -88,7 +88,7 @@ func (t *dht) init(c *Core) {
}() }()
t.nodeID = *t.core.NodeID() t.nodeID = *t.core.NodeID()
t.peers = make(chan *dhtInfo, 1024) t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey]dht_callbackInfo) t.callbacks = make(map[dhtReqKey][]dht_callbackInfo)
t.reset() t.reset()
} }
@ -244,15 +244,17 @@ type dht_callbackInfo struct {
// Adds a callback and removes it after some timeout. // Adds a callback and removes it after some timeout.
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)}
t.callbacks[*rq] = info t.callbacks[*rq] = append(t.callbacks[*rq], info)
} }
// Reads a lookup response, checks that we had sent a matching request, and processes the response info. // Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) { func (t *dht) handleRes(res *dhtRes) {
rq := dhtReqKey{res.Key, res.Dest} rq := dhtReqKey{res.Key, res.Dest}
if callback, isIn := t.callbacks[rq]; isIn { if callbacks, isIn := t.callbacks[rq]; isIn {
callback.f(res) for _, callback := range callbacks {
callback.f(res)
}
delete(t.callbacks, rq) delete(t.callbacks, rq)
} }
_, isIn := t.reqs[rq] _, isIn := t.reqs[rq]
@ -326,10 +328,15 @@ func (t *dht) doMaintenance() {
} }
} }
t.reqs = newReqs t.reqs = newReqs
newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) newCallbacks := make(map[dhtReqKey][]dht_callbackInfo, len(t.callbacks))
for key, callback := range t.callbacks { for key, cs := range t.callbacks {
if now.Before(callback.time) { for _, c := range cs {
newCallbacks[key] = callback if now.Before(c.time) {
newCallbacks[key] = append(newCallbacks[key], c)
} else {
// Signal failure
c.f(nil)
}
} }
} }
t.callbacks = newCallbacks t.callbacks = newCallbacks

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
) )
@ -14,6 +15,8 @@ type Dialer struct {
core *Core core *Core
} }
// TODO DialContext that allows timeouts/cancellation, Dial should just call this with no timeout set in the context
// Dial opens a session to the given node. The first paramter should be "nodeid" // Dial opens a session to the given node. The first paramter should be "nodeid"
// and the second parameter should contain a hexadecimal representation of the // and the second parameter should contain a hexadecimal representation of the
// target node ID. // target node ID.
@ -58,5 +61,17 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) {
// NodeID parameters. // NodeID parameters.
func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) {
conn := newConn(d.core, nodeID, nodeMask, nil) conn := newConn(d.core, nodeID, nodeMask, nil)
return conn, nil if err := conn.search(); err != nil {
conn.Close()
return nil, err
}
t := time.NewTimer(6 * time.Second) // TODO use a context instead
defer t.Stop()
select {
case <-conn.session.init:
return conn, nil
case <-t.C:
conn.Close()
return nil, errors.New("session handshake timeout")
}
} }

View File

@ -119,7 +119,7 @@ func (r *router) mainLoop() {
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info) r.core.dht.insertPeer(info)
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.reset()
r.core.dht.reset() r.core.dht.reset()
case <-ticker.C: case <-ticker.C:
{ {

View File

@ -33,13 +33,14 @@ const search_RETRY_TIME = time.Second
// Information about an ongoing search. // Information about an ongoing search.
// Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited.
type searchInfo struct { type searchInfo struct {
core *Core
dest crypto.NodeID dest crypto.NodeID
mask crypto.NodeID mask crypto.NodeID
time time.Time time time.Time
packet []byte
toVisit []*dhtInfo toVisit []*dhtInfo
visited map[crypto.NodeID]bool visited map[crypto.NodeID]bool
callback func(*sessionInfo, error) callback func(*sessionInfo, error)
// TODO context.Context for timeout and cancellation
} }
// This stores a map of active searches. // This stores a map of active searches.
@ -49,7 +50,7 @@ type searches struct {
searches map[crypto.NodeID]*searchInfo searches map[crypto.NodeID]*searchInfo
} }
// Intializes the searches struct. // Initializes the searches struct.
func (s *searches) init(core *Core) { func (s *searches) init(core *Core) {
s.core = core s.core = core
s.reconfigure = make(chan chan error, 1) s.reconfigure = make(chan chan error, 1)
@ -65,12 +66,13 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. // Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now() now := time.Now()
for dest, sinfo := range s.searches { //for dest, sinfo := range s.searches {
if now.Sub(sinfo.time) > time.Minute { // if now.Sub(sinfo.time) > time.Minute {
delete(s.searches, dest) // delete(s.searches, dest)
} // }
} //}
info := searchInfo{ info := searchInfo{
core: s.core,
dest: *dest, dest: *dest,
mask: *mask, mask: *mask,
time: now.Add(-time.Second), time: now.Add(-time.Second),
@ -82,30 +84,29 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Checks if there's an ongoing search relaed to a dhtRes. // Checks if there's an ongoing search related to a dhtRes.
// If there is, it adds the response info to the search and triggers a new search step. // If there is, it adds the response info to the search and triggers a new search step.
// If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more.
func (s *searches) handleDHTRes(res *dhtRes) { func (sinfo *searchInfo) handleDHTRes(res *dhtRes) {
sinfo, isIn := s.searches[res.Dest] if res == nil || sinfo.checkDHTRes(res) {
if !isIn || s.checkDHTRes(sinfo, res) {
// Either we don't recognize this search, or we just finished it // Either we don't recognize this search, or we just finished it
return return
} }
// Add to the search and continue // Add to the search and continue
s.addToSearch(sinfo, res) sinfo.addToSearch(res)
s.doSearchStep(sinfo) sinfo.doSearchStep()
} }
// Adds the information from a dhtRes to an ongoing search. // Adds the information from a dhtRes to an ongoing search.
// Info about a node that has already been visited is not re-added to the search. // Info about a node that has already been visited is not re-added to the search.
// Duplicate information about nodes toVisit is deduplicated (the newest information is kept). // Duplicate information about nodes toVisit is deduplicated (the newest information is kept).
// The toVisit list is sorted in ascending order of keyspace distance from the destination. // The toVisit list is sorted in ascending order of keyspace distance from the destination.
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Add responses to toVisit if closer to dest than the res node // Add responses to toVisit if closer to dest than the res node
from := dhtInfo{key: res.Key, coords: res.Coords} from := dhtInfo{key: res.Key, coords: res.Coords}
sinfo.visited[*from.getNodeID()] = true sinfo.visited[*from.getNodeID()] = true
for _, info := range res.Infos { for _, info := range res.Infos {
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
continue continue
} }
if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) {
@ -135,42 +136,43 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
// If there are no nodes left toVisit, then this cleans up the search. // If there are no nodes left toVisit, then this cleans up the search.
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (s *searches) doSearchStep(sinfo *searchInfo) { func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup // Dead end, do cleanup
delete(s.searches, sinfo.dest) delete(sinfo.core.searches.searches, sinfo.dest)
go sinfo.callback(nil, errors.New("search reached dead end")) sinfo.callback(nil, errors.New("search reached dead end"))
return return
} }
// Send to the next search target // Send to the next search target
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
rq := dhtReqKey{next.key, sinfo.dest} rq := dhtReqKey{next.key, sinfo.dest}
s.core.dht.addCallback(&rq, s.handleDHTRes) sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
s.core.dht.ping(next, &sinfo.dest) sinfo.core.dht.ping(next, &sinfo.dest)
} }
// If we've recenty sent a ping for this search, do nothing. // If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (s *searches) continueSearch(sinfo *searchInfo) { func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME { if time.Since(sinfo.time) < search_RETRY_TIME {
return return
} }
sinfo.time = time.Now() sinfo.time = time.Now()
s.doSearchStep(sinfo) sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later // In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later // Any that die aren't restarted, but a new one will start later
retryLater := func() { retryLater := func() {
newSearchInfo := s.searches[sinfo.dest] // FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.core.searches.searches[sinfo.dest]
if newSearchInfo != sinfo { if newSearchInfo != sinfo {
return return
} }
s.continueSearch(sinfo) sinfo.continueSearch()
} }
go func() { go func() {
time.Sleep(search_RETRY_TIME) time.Sleep(search_RETRY_TIME)
s.core.router.admin <- retryLater sinfo.core.router.admin <- retryLater
}() }()
} }
@ -185,37 +187,36 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb
// Checks if a dhtRes is good (called by handleDHTRes). // Checks if a dhtRes is good (called by handleDHTRes).
// If the response is from the target, get/create a session, trigger a session ping, and return true. // If the response is from the target, get/create a session, trigger a session ping, and return true.
// Otherwise return false. // Otherwise return false.
func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
them := crypto.GetNodeID(&res.Key) them := crypto.GetNodeID(&res.Key)
var destMasked crypto.NodeID var destMasked crypto.NodeID
var themMasked crypto.NodeID var themMasked crypto.NodeID
for idx := 0; idx < crypto.NodeIDLen; idx++ { for idx := 0; idx < crypto.NodeIDLen; idx++ {
destMasked[idx] = info.dest[idx] & info.mask[idx] destMasked[idx] = sinfo.dest[idx] & sinfo.mask[idx]
themMasked[idx] = them[idx] & info.mask[idx] themMasked[idx] = them[idx] & sinfo.mask[idx]
} }
if themMasked != destMasked { if themMasked != destMasked {
return false return false
} }
// They match, so create a session and send a sessionRequest // They match, so create a session and send a sessionRequest
sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key) sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn { if !isIn {
sinfo = s.core.sessions.createSession(&res.Key) sess = sinfo.core.sessions.createSession(&res.Key)
if sinfo == nil { if sess == nil {
// nil if the DHT search finished but the session wasn't allowed // nil if the DHT search finished but the session wasn't allowed
go info.callback(nil, errors.New("session not allowed")) sinfo.callback(nil, errors.New("session not allowed"))
return true return true
} }
_, isIn := s.core.sessions.getByTheirPerm(&res.Key) _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)
if !isIn { if !isIn {
panic("This should never happen") panic("This should never happen")
} }
} }
// FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)?
sinfo.coords = res.Coords sess.coords = res.Coords
sinfo.packet = info.packet sinfo.core.sessions.ping(sess)
s.core.sessions.ping(sinfo) sinfo.callback(sess, nil)
go info.callback(sinfo, nil)
// Cleanup // Cleanup
delete(s.searches, res.Dest) delete(sinfo.core.searches.searches, res.Dest)
return true return true
} }

View File

@ -6,7 +6,6 @@ package yggdrasil
import ( import (
"bytes" "bytes"
"encoding/hex"
"sync" "sync"
"time" "time"
@ -40,13 +39,13 @@ type sessionInfo struct {
pingTime time.Time // time the first ping was sent since the last received packet pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent pingSend time.Time // time the last ping was sent
coords []byte // coords of destination coords []byte // coords of destination
packet []byte // a buffered packet, sent immediately on ping/pong reset bool // reset if coords change
init bool // Reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session bytesRecvd uint64 // Bytes of real traffic received in this session
worker chan func() // Channel to send work to the session worker worker chan func() // Channel to send work to the session worker
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
} }
func (sinfo *sessionInfo) doWorker(f func()) { func (sinfo *sessionInfo) doWorker(f func()) {
@ -103,7 +102,14 @@ func (s *sessionInfo) update(p *sessionPing) bool {
} }
s.time = time.Now() s.time = time.Now()
s.tstamp = p.Tstamp s.tstamp = p.Tstamp
s.init = true s.reset = false
defer func() { recover() }() // Recover if the below panics
select {
case <-s.init:
default:
// Unblock anything waiting for the session to initialize
close(s.init)
}
return true return true
} }
@ -111,18 +117,16 @@ func (s *sessionInfo) update(p *sessionPing) bool {
// Sessions are indexed by handle. // Sessions are indexed by handle.
// Additionally, stores maps of address/subnet onto keys, and keys onto handles. // Additionally, stores maps of address/subnet onto keys, and keys onto handles.
type sessions struct { type sessions struct {
core *Core core *Core
listener *Listener listener *Listener
listenerMutex sync.Mutex listenerMutex sync.Mutex
reconfigure chan chan error reconfigure chan chan error
lastCleanup time.Time lastCleanup time.Time
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed
sinfos map[crypto.Handle]*sessionInfo // Maps (secret) handle onto session info isAllowedMutex sync.RWMutex // Protects the above
conns map[crypto.Handle]*Conn // Maps (secret) handle onto connections permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot
byMySes map[crypto.BoxPubKey]*crypto.Handle // Maps mySesPub onto handle sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info
byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle
addrToPerm map[address.Address]*crypto.BoxPubKey
subnetToPerm map[address.Subnet]*crypto.BoxPubKey
} }
// Initializes the session struct. // Initializes the session struct.
@ -148,77 +152,21 @@ func (ss *sessions) init(core *Core) {
}() }()
ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.sinfos = make(map[crypto.Handle]*sessionInfo)
ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle)
ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle)
ss.addrToPerm = make(map[address.Address]*crypto.BoxPubKey)
ss.subnetToPerm = make(map[address.Subnet]*crypto.BoxPubKey)
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
// Determines whether the session firewall is enabled.
func (ss *sessions) isSessionFirewallEnabled() bool {
ss.core.config.Mutex.RLock()
defer ss.core.config.Mutex.RUnlock()
return ss.core.config.Current.SessionFirewall.Enable
}
// Determines whether the session with a given publickey is allowed based on // Determines whether the session with a given publickey is allowed based on
// session firewall rules. // session firewall rules.
func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool {
ss.core.config.Mutex.RLock() ss.isAllowedMutex.RLock()
defer ss.core.config.Mutex.RUnlock() defer ss.isAllowedMutex.RUnlock()
// Allow by default if the session firewall is disabled if ss.isAllowedHandler == nil {
if !ss.isSessionFirewallEnabled() {
return true return true
} }
// Prepare for checking whitelist/blacklist
var box crypto.BoxPubKey return ss.isAllowedHandler(pubkey, initiator)
// Reject blacklisted nodes
for _, b := range ss.core.config.Current.SessionFirewall.BlacklistEncryptionPublicKeys {
key, err := hex.DecodeString(b)
if err == nil {
copy(box[:crypto.BoxPubKeyLen], key)
if box == *pubkey {
return false
}
}
}
// Allow whitelisted nodes
for _, b := range ss.core.config.Current.SessionFirewall.WhitelistEncryptionPublicKeys {
key, err := hex.DecodeString(b)
if err == nil {
copy(box[:crypto.BoxPubKeyLen], key)
if box == *pubkey {
return true
}
}
}
// Allow outbound sessions if appropriate
if ss.core.config.Current.SessionFirewall.AlwaysAllowOutbound {
if initiator {
return true
}
}
// Look and see if the pubkey is that of a direct peer
var isDirectPeer bool
for _, peer := range ss.core.peers.ports.Load().(map[switchPort]*peer) {
if peer.box == *pubkey {
isDirectPeer = true
break
}
}
// Allow direct peers if appropriate
if ss.core.config.Current.SessionFirewall.AllowFromDirect && isDirectPeer {
return true
}
// Allow remote nodes if appropriate
if ss.core.config.Current.SessionFirewall.AllowFromRemote && !isDirectPeer {
return true
}
// Finally, default-deny if not matching any of the above rules
return false
} }
// Gets the session corresponding to a given handle. // Gets the session corresponding to a given handle.
@ -227,16 +175,6 @@ func (ss *sessions) getSessionForHandle(handle *crypto.Handle) (*sessionInfo, bo
return sinfo, isIn return sinfo, isIn
} }
// Gets a session corresponding to an ephemeral session key used by this node.
func (ss *sessions) getByMySes(key *crypto.BoxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byMySes[*key]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getSessionForHandle(h)
return sinfo, isIn
}
// Gets a session corresponding to a permanent key used by the remote node. // Gets a session corresponding to a permanent key used by the remote node.
func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) { func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) {
h, isIn := ss.byTheirPerm[*key] h, isIn := ss.byTheirPerm[*key]
@ -247,30 +185,11 @@ func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) {
return sinfo, isIn return sinfo, isIn
} }
// Gets a session corresponding to an IPv6 address used by the remote node.
func (ss *sessions) getByTheirAddr(addr *address.Address) (*sessionInfo, bool) {
p, isIn := ss.addrToPerm[*addr]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
// Gets a session corresponding to an IPv6 /64 subnet used by the remote node/network.
func (ss *sessions) getByTheirSubnet(snet *address.Subnet) (*sessionInfo, bool) {
p, isIn := ss.subnetToPerm[*snet]
if !isIn {
return nil, false
}
sinfo, isIn := ss.getByTheirPerm(p)
return sinfo, isIn
}
// Creates a new session and lazily cleans up old existing sessions. This // Creates a new session and lazily cleans up old existing sessions. This
// includse initializing session info to sane defaults (e.g. lowest supported // includse initializing session info to sane defaults (e.g. lowest supported
// MTU). // MTU).
func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
// TODO: this check definitely needs to be moved
if !ss.isSessionAllowed(theirPermKey, true) { if !ss.isSessionAllowed(theirPermKey, true) {
return nil return nil
} }
@ -292,6 +211,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.mtuTime = now sinfo.mtuTime = now
sinfo.pingTime = now sinfo.pingTime = now
sinfo.pingSend = now sinfo.pingSend = now
sinfo.init = make(chan struct{})
higher := false higher := false
for idx := range ss.core.boxPub { for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@ -314,10 +234,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.worker = make(chan func(), 1) sinfo.worker = make(chan func(), 1)
sinfo.recv = make(chan *wire_trafficPacket, 32) sinfo.recv = make(chan *wire_trafficPacket, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub
ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub
go sinfo.workerMain() go sinfo.workerMain()
return &sinfo return &sinfo
} }
@ -342,36 +259,21 @@ func (ss *sessions) cleanup() {
sinfos[k] = v sinfos[k] = v
} }
ss.sinfos = sinfos ss.sinfos = sinfos
byMySes := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byMySes))
for k, v := range ss.byMySes {
byMySes[k] = v
}
ss.byMySes = byMySes
byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm)) byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm))
for k, v := range ss.byTheirPerm { for k, v := range ss.byTheirPerm {
byTheirPerm[k] = v byTheirPerm[k] = v
} }
ss.byTheirPerm = byTheirPerm ss.byTheirPerm = byTheirPerm
addrToPerm := make(map[address.Address]*crypto.BoxPubKey, len(ss.addrToPerm))
for k, v := range ss.addrToPerm {
addrToPerm[k] = v
}
ss.addrToPerm = addrToPerm
subnetToPerm := make(map[address.Subnet]*crypto.BoxPubKey, len(ss.subnetToPerm))
for k, v := range ss.subnetToPerm {
subnetToPerm[k] = v
}
ss.subnetToPerm = subnetToPerm
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
// Closes a session, removing it from sessions maps and killing the worker goroutine. // Closes a session, removing it from sessions maps and killing the worker goroutine.
func (sinfo *sessionInfo) close() { func (sinfo *sessionInfo) close() {
delete(sinfo.core.sessions.sinfos, sinfo.myHandle) if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo {
delete(sinfo.core.sessions.byMySes, sinfo.mySesPub) delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr) }
delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet) defer func() { recover() }()
close(sinfo.worker) close(sinfo.worker)
} }
@ -396,6 +298,8 @@ func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing {
// This comes up with dht req/res and session ping/pong traffic. // This comes up with dht req/res and session ping/pong traffic.
func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey,
theirPub *crypto.BoxPubKey) *crypto.BoxSharedKey { theirPub *crypto.BoxPubKey) *crypto.BoxSharedKey {
return crypto.GetSharedKey(myPriv, theirPub)
// FIXME concurrency issues with the below, so for now we just burn the CPU every time
if skey, isIn := ss.permShared[*theirPub]; isIn { if skey, isIn := ss.permShared[*theirPub]; isIn {
return skey return skey
} }
@ -434,8 +338,8 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
} }
packet := p.encode() packet := p.encode()
ss.core.router.out(packet) ss.core.router.out(packet)
if !isPong { if sinfo.pingTime.Before(sinfo.time) {
sinfo.pingSend = time.Now() sinfo.pingTime = time.Now()
} }
} }
@ -444,12 +348,12 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
func (ss *sessions) handlePing(ping *sessionPing) { func (ss *sessions) handlePing(ping *sessionPing) {
// Get the corresponding session (or create a new session) // Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
// Check the session firewall // Check if the session is allowed
if !isIn && ss.isSessionFirewallEnabled() { // TODO: this check may need to be moved
if !ss.isSessionAllowed(&ping.SendPermPub, false) { if !isIn && !ss.isSessionAllowed(&ping.SendPermPub, false) {
return return
}
} }
// Create the session if it doesn't already exist
if !isIn { if !isIn {
ss.createSession(&ping.SendPermPub) ss.createSession(&ping.SendPermPub)
sinfo, isIn = ss.getByTheirPerm(&ping.SendPermPub) sinfo, isIn = ss.getByTheirPerm(&ping.SendPermPub)
@ -476,15 +380,6 @@ func (ss *sessions) handlePing(ping *sessionPing) {
if !ping.IsPong { if !ping.IsPong {
ss.sendPingPong(sinfo, true) ss.sendPingPong(sinfo, true)
} }
if sinfo.packet != nil {
/* FIXME this needs to live in the net.Conn or something, needs work in Write
// send
var bs []byte
bs, sinfo.packet = sinfo.packet, nil
ss.core.router.sendPacket(bs) // FIXME this needs to live in the net.Conn or something, needs work in Write
*/
sinfo.packet = nil
}
}) })
} }
@ -529,10 +424,10 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Resets all sessions to an uninitialized state. // Resets all sessions to an uninitialized state.
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) resetInits() { func (ss *sessions) reset() {
for _, sinfo := range ss.sinfos { for _, sinfo := range ss.sinfos {
sinfo.doWorker(func() { sinfo.doWorker(func() {
sinfo.init = false sinfo.reset = true
}) })
} }
} }