Merge pull request #505 from yggdrasil-network/develop

Version 0.3.7
This commit is contained in:
Neil Alexander 2019-08-18 11:20:50 +01:00 committed by GitHub
commit 009d9c9ec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 798 additions and 599 deletions

View File

@ -25,6 +25,25 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- in case of vulnerabilities. - in case of vulnerabilities.
--> -->
## [0.3.7] - 2019-08-14
### Changed
- The switch should now forward packets along a single path more consistently in cases where congestion is low and multiple equal-length paths exist, which should improve stability and result in fewer out-of-order packets
- Sessions should now be more tolerant of out-of-order packets, by replacing a bitmask with a variable sized heap+map structure to track recently received nonces, which should reduce the number of packets dropped due to reordering when multiple paths are used or multiple independent flows are transmitted through the same session
- The admin socket can no longer return a dotfile representation of the known parts of the network, this could be rebuilt by clients using information from `getSwitchPeers`,`getDHT` and `getSessions`
### Fixed
- A number of significant performance regressions introduced in version 0.3.6 have been fixed, resulting in better performance
- Flow labels are now used to prioritise traffic flows again correctly
- In low-traffic scenarios where there are multiple peerings between a pair of nodes, Yggdrasil now prefers the most active peering instead of the least active, helping to reduce packet reordering
- The `Listen` statement, when configured as a string rather than an array, will now be parsed correctly
- The admin socket now returns `coords` as a correct array of unsigned 64-bit integers, rather than the internal representation
- The admin socket now returns `box_pub_key` in string format again
- Sessions no longer leak/block when no listener (e.g. TUN/TAP) is configured
- Incoming session connections no longer block when a session already exists, which results in less leaked goroutines
- Flooded sessions will no longer block other sessions
- Searches are now cleaned up properly and a couple of edge-cases with duplicate searches have been fixed
- A number of minor allocation and pointer fixes
## [0.3.6] - 2019-08-03 ## [0.3.6] - 2019-08-03
### Added ### Added
- Yggdrasil now has a public API with interfaces such as `yggdrasil.ConnDialer`, `yggdrasil.ConnListener` and `yggdrasil.Conn` for using Yggdrasil as a transport directly within applications - Yggdrasil now has a public API with interfaces such as `yggdrasil.ConnDialer`, `yggdrasil.ConnListener` and `yggdrasil.Conn` for using Yggdrasil as a transport directly within applications
@ -53,7 +72,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Session MTUs are now always calculated correctly, in some cases they were incorrectly defaulting to 1280 before - Session MTUs are now always calculated correctly, in some cases they were incorrectly defaulting to 1280 before
- Multiple searches now don't take place for a single connection - Multiple searches now don't take place for a single connection
- Concurrency bugs fixed - Concurrency bugs fixed
- Fixed a number of bugs in the ICMPv6 neighbor solicitation in the TUN/TAP code - Fixed a number of bugs in the ICMPv6 neighbor solicitation in the TUN/TAP code
- A case where peers weren't always added correctly if one or more peers were unreachable has been fixed - A case where peers weren't always added correctly if one or more peers were unreachable has been fixed
- Searches which include the local node are now handled correctly - Searches which include the local node are now handled correctly
- Lots of small bug tweaks and clean-ups throughout the codebase - Lots of small bug tweaks and clean-ups throughout the codebase

View File

@ -49,7 +49,7 @@ You may also find other platform-specific wrappers, scripts or tools in the
If you want to build from source, as opposed to installing one of the pre-built If you want to build from source, as opposed to installing one of the pre-built
packages: packages:
1. Install [Go](https://golang.org) (requires Go 1.11 or later) 1. Install [Go](https://golang.org) (requires Go 1.12 or later)
2. Clone this repository 2. Clone this repository
2. Run `./build` 2. Run `./build`

2
build
View File

@ -2,7 +2,7 @@
set -ef set -ef
PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil} PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/version}
PKGNAME=${PKGNAME:-$(sh contrib/semver/name.sh)} PKGNAME=${PKGNAME:-$(sh contrib/semver/name.sh)}
PKGVER=${PKGVER:-$(sh contrib/semver/version.sh --bare)} PKGVER=${PKGVER:-$(sh contrib/semver/version.sh --bare)}

View File

@ -25,6 +25,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/tuntap" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
) )
@ -74,6 +75,12 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *config
if err := hjson.Unmarshal(conf, &dat); err != nil { if err := hjson.Unmarshal(conf, &dat); err != nil {
panic(err) panic(err)
} }
// Check for fields that have changed type recently, e.g. the Listen config
// option is now a []string rather than a string
if listen, ok := dat["Listen"].(string); ok {
dat["Listen"] = []string{listen}
}
// Sanitise the config
confJson, err := json.Marshal(dat) confJson, err := json.Marshal(dat)
if err != nil { if err != nil {
panic(err) panic(err)
@ -113,7 +120,7 @@ func main() {
normaliseconf := flag.Bool("normaliseconf", false, "use in combination with either -useconf or -useconffile, outputs your configuration normalised") normaliseconf := flag.Bool("normaliseconf", false, "use in combination with either -useconf or -useconffile, outputs your configuration normalised")
confjson := flag.Bool("json", false, "print configuration from -genconf or -normaliseconf as JSON instead of HJSON") confjson := flag.Bool("json", false, "print configuration from -genconf or -normaliseconf as JSON instead of HJSON")
autoconf := flag.Bool("autoconf", false, "automatic mode (dynamic IP, peer with IPv6 neighbors)") autoconf := flag.Bool("autoconf", false, "automatic mode (dynamic IP, peer with IPv6 neighbors)")
version := flag.Bool("version", false, "prints the version of this build") ver := flag.Bool("version", false, "prints the version of this build")
logging := flag.String("logging", "info,warn,error", "comma-separated list of logging levels to enable") logging := flag.String("logging", "info,warn,error", "comma-separated list of logging levels to enable")
logto := flag.String("logto", "stdout", "file path to log to, \"syslog\" or \"stdout\"") logto := flag.String("logto", "stdout", "file path to log to, \"syslog\" or \"stdout\"")
flag.Parse() flag.Parse()
@ -121,10 +128,10 @@ func main() {
var cfg *config.NodeConfig var cfg *config.NodeConfig
var err error var err error
switch { switch {
case *version: case *ver:
fmt.Println("Build name:", yggdrasil.BuildName()) fmt.Println("Build name:", version.BuildName())
fmt.Println("Build version:", yggdrasil.BuildVersion()) fmt.Println("Build version:", version.BuildVersion())
os.Exit(0) return
case *autoconf: case *autoconf:
// Use an autoconf-generated config, this will give us random keys and // Use an autoconf-generated config, this will give us random keys and
// port numbers, and will use an automatically selected TUN/TAP interface. // port numbers, and will use an automatically selected TUN/TAP interface.
@ -168,7 +175,7 @@ func main() {
case "stdout": case "stdout":
logger = log.New(os.Stdout, "", log.Flags()) logger = log.New(os.Stdout, "", log.Flags())
case "syslog": case "syslog":
if syslogger, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "DAEMON", yggdrasil.BuildName()); err == nil { if syslogger, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "DAEMON", version.BuildName()); err == nil {
logger = log.New(syslogger, "", log.Flags()) logger = log.New(syslogger, "", log.Flags())
} }
default: default:

View File

@ -19,6 +19,7 @@ import (
"github.com/hjson/hjson-go" "github.com/hjson/hjson-go"
"github.com/yggdrasil-network/yggdrasil-go/src/defaults" "github.com/yggdrasil-network/yggdrasil-go/src/defaults"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
) )
type admin_info map[string]interface{} type admin_info map[string]interface{}
@ -53,9 +54,17 @@ func main() {
server := flag.String("endpoint", endpoint, "Admin socket endpoint") server := flag.String("endpoint", endpoint, "Admin socket endpoint")
injson := flag.Bool("json", false, "Output in JSON format (as opposed to pretty-print)") injson := flag.Bool("json", false, "Output in JSON format (as opposed to pretty-print)")
verbose := flag.Bool("v", false, "Verbose output (includes public keys)") verbose := flag.Bool("v", false, "Verbose output (includes public keys)")
ver := flag.Bool("version", false, "Prints the version of this build")
flag.Parse() flag.Parse()
args := flag.Args() args := flag.Args()
if *ver {
fmt.Println("Build name:", version.BuildName())
fmt.Println("Build version:", version.BuildVersion())
fmt.Println("To get the version number of the running Yggdrasil node, run", os.Args[0], "getSelf")
return
}
if len(args) == 0 { if len(args) == 0 {
flag.Usage() flag.Usage()
return return

10
go.mod
View File

@ -8,10 +8,12 @@ require (
github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0
github.com/mitchellh/mapstructure v1.1.2 github.com/mitchellh/mapstructure v1.1.2
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34 github.com/vishvananda/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f // indirect
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a
golang.org/x/text v0.3.2 golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060 // indirect golang.org/x/tools v0.0.0-20190814171936-5b18234b3ae0 // indirect
) )

15
go.sum
View File

@ -12,6 +12,10 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 h1:1zN6ImoqhSJhN8hGXFaJlSC8msLmIbX8bFqOfWLKw0w= github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 h1:1zN6ImoqhSJhN8hGXFaJlSC8msLmIbX8bFqOfWLKw0w=
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091/go.mod h1:N20Z5Y8oye9a7HmytmZ+tr8Q2vlP0tAHP13kTHzwvQY= github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091/go.mod h1:N20Z5Y8oye9a7HmytmZ+tr8Q2vlP0tAHP13kTHzwvQY=
github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM=
github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f h1:nBX3nTcmxEtHSERBJaIo1Qa26VwRaopnZmfDQUXsF4I=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae h1:MYCANF1kehCG6x6G+/9txLfq6n3lS5Vp0Mxn1hdiBAc= github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae h1:MYCANF1kehCG6x6G+/9txLfq6n3lS5Vp0Mxn1hdiBAc=
github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190719211521-a76871ea954b/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190719211521-a76871ea954b/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
@ -22,6 +26,8 @@ github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a h1:mQ0mPD+
github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34 h1:Qh5FE+Q5iGqpmR/FPMYHuoZLN921au/nxAlmKe+Hdbo= github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34 h1:Qh5FE+Q5iGqpmR/FPMYHuoZLN921au/nxAlmKe+Hdbo=
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8 h1:YY9Pg2BEp0jeUVU60svTOaDr+fs1ySC9RbdC1Qc6wOw=
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -34,6 +40,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -44,6 +52,10 @@ golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e h1:TsjK5I7fXk8f2FQrgu6NS7i5Qih3knl2FL1htyguLRE=
golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@ -53,3 +65,6 @@ golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDq
golang.org/x/tools v0.0.0-20190724185037-8aa4eac1a7c1/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190724185037-8aa4eac1a7c1/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190809145639-6d4652c779c4/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190814171936-5b18234b3ae0/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -17,6 +17,8 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
) )
@ -78,11 +80,6 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
} }
return Info{"list": handlers}, nil return Info{"list": handlers}, nil
}) })
/*
a.AddHandler("dot", []string{}, func(in Info) (Info, error) {
return Info{"dot": string(a.getResponse_dot())}, nil
})
*/
a.AddHandler("getSelf", []string{}, func(in Info) (Info, error) { a.AddHandler("getSelf", []string{}, func(in Info) (Info, error) {
ip := c.Address().String() ip := c.Address().String()
subnet := c.Subnet() subnet := c.Subnet()
@ -90,8 +87,8 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"self": Info{ "self": Info{
ip: Info{ ip: Info{
"box_pub_key": c.EncryptionPublicKey(), "box_pub_key": c.EncryptionPublicKey(),
"build_name": yggdrasil.BuildName(), "build_name": version.BuildName(),
"build_version": yggdrasil.BuildVersion(), "build_version": version.BuildVersion(),
"coords": fmt.Sprintf("%v", c.Coords()), "coords": fmt.Sprintf("%v", c.Coords()),
"subnet": subnet.String(), "subnet": subnet.String(),
}, },
@ -110,7 +107,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"bytes_recvd": p.BytesRecvd, "bytes_recvd": p.BytesRecvd,
"proto": p.Protocol, "proto": p.Protocol,
"endpoint": p.Endpoint, "endpoint": p.Endpoint,
"box_pub_key": p.PublicKey, "box_pub_key": hex.EncodeToString(p.PublicKey[:]),
} }
} }
return Info{"peers": peers}, nil return Info{"peers": peers}, nil
@ -128,7 +125,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"bytes_recvd": s.BytesRecvd, "bytes_recvd": s.BytesRecvd,
"proto": s.Protocol, "proto": s.Protocol,
"endpoint": s.Endpoint, "endpoint": s.Endpoint,
"box_pub_key": s.PublicKey, "box_pub_key": hex.EncodeToString(s.PublicKey[:]),
} }
} }
return Info{"switchpeers": switchpeers}, nil return Info{"switchpeers": switchpeers}, nil
@ -147,7 +144,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
dht[so] = Info{ dht[so] = Info{
"coords": fmt.Sprintf("%v", d.Coords), "coords": fmt.Sprintf("%v", d.Coords),
"last_seen": d.LastSeen.Seconds(), "last_seen": d.LastSeen.Seconds(),
"box_pub_key": d.PublicKey, "box_pub_key": hex.EncodeToString(d.PublicKey[:]),
} }
} }
return Info{"dht": dht}, nil return Info{"dht": dht}, nil
@ -164,7 +161,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"mtu": s.MTU, "mtu": s.MTU,
"uptime": s.Uptime.Seconds(), "uptime": s.Uptime.Seconds(),
"was_mtu_fixed": s.WasMTUFixed, "was_mtu_fixed": s.WasMTUFixed,
"box_pub_key": s.PublicKey, "box_pub_key": hex.EncodeToString(s.PublicKey[:]),
} }
} }
return Info{"sessions": sessions}, nil return Info{"sessions": sessions}, nil
@ -243,31 +240,46 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
} }
}) })
a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) { a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) {
var reserr error
var result yggdrasil.DHTRes
if in["target"] == nil { if in["target"] == nil {
in["target"] = "none" in["target"] = "none"
} }
result, err := a.core.DHTPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string)) coords := util.DecodeCoordString(in["coords"].(string))
if err == nil { var boxPubKey crypto.BoxPubKey
infos := make(map[string]map[string]string, len(result.Infos)) if b, err := hex.DecodeString(in["box_pub_key"].(string)); err == nil {
for _, dinfo := range result.Infos { copy(boxPubKey[:], b[:])
info := map[string]string{ if n, err := hex.DecodeString(in["target"].(string)); err == nil {
"box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]), var targetNodeID crypto.NodeID
"coords": fmt.Sprintf("%v", dinfo.Coords), copy(targetNodeID[:], n[:])
} result, reserr = a.core.DHTPing(boxPubKey, coords, &targetNodeID)
addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String() } else {
infos[addr] = info result, reserr = a.core.DHTPing(boxPubKey, coords, nil)
} }
return Info{"nodes": infos}, nil
} else { } else {
return Info{}, err return Info{}, err
} }
if reserr != nil {
return Info{}, reserr
}
infos := make(map[string]map[string]string, len(result.Infos))
for _, dinfo := range result.Infos {
info := map[string]string{
"box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]),
"coords": fmt.Sprintf("%v", dinfo.Coords),
}
addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String()
infos[addr] = info
}
return Info{"nodes": infos}, nil
}) })
a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) { a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) {
var nocache bool var nocache bool
if in["nocache"] != nil { if in["nocache"] != nil {
nocache = in["nocache"].(string) == "true" nocache = in["nocache"].(string) == "true"
} }
var box_pub_key, coords string var boxPubKey crypto.BoxPubKey
var coords []uint64
if in["box_pub_key"] == nil && in["coords"] == nil { if in["box_pub_key"] == nil && in["coords"] == nil {
nodeinfo := a.core.MyNodeInfo() nodeinfo := a.core.MyNodeInfo()
var jsoninfo interface{} var jsoninfo interface{}
@ -279,10 +291,14 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
} else if in["box_pub_key"] == nil || in["coords"] == nil { } else if in["box_pub_key"] == nil || in["coords"] == nil {
return Info{}, errors.New("Expecting both box_pub_key and coords") return Info{}, errors.New("Expecting both box_pub_key and coords")
} else { } else {
box_pub_key = in["box_pub_key"].(string) if b, err := hex.DecodeString(in["box_pub_key"].(string)); err == nil {
coords = in["coords"].(string) copy(boxPubKey[:], b[:])
} else {
return Info{}, err
}
coords = util.DecodeCoordString(in["coords"].(string))
} }
result, err := a.core.GetNodeInfo(box_pub_key, coords, nocache) result, err := a.core.GetNodeInfo(boxPubKey, coords, nocache)
if err == nil { if err == nil {
var m map[string]interface{} var m map[string]interface{}
if err = json.Unmarshal(result, &m); err == nil { if err = json.Unmarshal(result, &m); err == nil {
@ -472,133 +488,3 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
} }
} }
} }
// getResponse_dot returns a response for a graphviz dot formatted
// representation of the known parts of the network. This is color-coded and
// labeled, and includes the self node, switch peers, nodes known to the DHT,
// and nodes with open sessions. The graph is structured as a tree with directed
// links leading away from the root.
/*
func (a *AdminSocket) getResponse_dot() []byte {
//self := a.getData_getSelf()
peers := a.core.GetSwitchPeers()
dht := a.core.GetDHT()
sessions := a.core.GetSessions()
// Start building a tree from all known nodes
type nodeInfo struct {
name string
key string
parent string
port uint64
options string
}
infos := make(map[string]nodeInfo)
// Get coords as a slice of strings, FIXME? this looks very fragile
coordSlice := func(coords string) []string {
tmp := strings.Replace(coords, "[", "", -1)
tmp = strings.Replace(tmp, "]", "", -1)
return strings.Split(tmp, " ")
}
// First fill the tree with all known nodes, no parents
addInfo := func(nodes []admin_nodeInfo, options string, tag string) {
for _, node := range nodes {
n := node.asMap()
info := nodeInfo{
key: n["coords"].(string),
options: options,
}
if len(tag) > 0 {
info.name = fmt.Sprintf("%s\n%s", n["ip"].(string), tag)
} else {
info.name = n["ip"].(string)
}
coordsSplit := coordSlice(info.key)
if len(coordsSplit) != 0 {
portStr := coordsSplit[len(coordsSplit)-1]
portUint, err := strconv.ParseUint(portStr, 10, 64)
if err == nil {
info.port = portUint
}
}
infos[info.key] = info
}
}
addInfo(dht, "fillcolor=\"#ffffff\" style=filled fontname=\"sans serif\"", "Known in DHT") // white
addInfo(sessions, "fillcolor=\"#acf3fd\" style=filled fontname=\"sans serif\"", "Open session") // blue
addInfo(peers, "fillcolor=\"#ffffb5\" style=filled fontname=\"sans serif\"", "Connected peer") // yellow
addInfo(append([]admin_nodeInfo(nil), *self), "fillcolor=\"#a5ff8a\" style=filled fontname=\"sans serif\"", "This node") // green
// Now go through and create placeholders for any missing nodes
for _, info := range infos {
// This is ugly string manipulation
coordsSplit := coordSlice(info.key)
for idx := range coordsSplit {
key := fmt.Sprintf("[%v]", strings.Join(coordsSplit[:idx], " "))
newInfo, isIn := infos[key]
if isIn {
continue
}
newInfo.name = "?"
newInfo.key = key
newInfo.options = "fontname=\"sans serif\" style=dashed color=\"#999999\" fontcolor=\"#999999\""
coordsSplit := coordSlice(newInfo.key)
if len(coordsSplit) != 0 {
portStr := coordsSplit[len(coordsSplit)-1]
portUint, err := strconv.ParseUint(portStr, 10, 64)
if err == nil {
newInfo.port = portUint
}
}
infos[key] = newInfo
}
}
// Now go through and attach parents
for _, info := range infos {
pSplit := coordSlice(info.key)
if len(pSplit) > 0 {
pSplit = pSplit[:len(pSplit)-1]
}
info.parent = fmt.Sprintf("[%v]", strings.Join(pSplit, " "))
infos[info.key] = info
}
// Finally, get a sorted list of keys, which we use to organize the output
var keys []string
for _, info := range infos {
keys = append(keys, info.key)
}
// sort
sort.SliceStable(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
sort.SliceStable(keys, func(i, j int) bool {
return infos[keys[i]].port < infos[keys[j]].port
})
// Now print it all out
var out []byte
put := func(s string) {
out = append(out, []byte(s)...)
}
put("digraph {\n")
// First set the labels
for _, key := range keys {
info := infos[key]
put(fmt.Sprintf("\"%v\" [ label = \"%v\" %v ];\n", info.key, info.name, info.options))
}
// Then print the tree structure
for _, key := range keys {
info := infos[key]
if info.key == info.parent {
continue
} // happens for the root, skip it
port := fmt.Sprint(info.port)
style := "fontname=\"sans serif\""
if infos[info.parent].name == "?" || infos[info.key].name == "?" {
style = "fontname=\"sans serif\" style=dashed color=\"#999999\" fontcolor=\"#999999\""
}
put(fmt.Sprintf(" \"%+v\" -> \"%+v\" [ label = \"%v\" %s ];\n", info.parent, info.key, port, style))
}
put("}\n")
return out
}
*/

View File

@ -53,15 +53,14 @@ func (s *tunConn) reader() (err error) {
} }
s.tun.log.Debugln("Starting conn reader for", s.conn.String()) s.tun.log.Debugln("Starting conn reader for", s.conn.String())
defer s.tun.log.Debugln("Stopping conn reader for", s.conn.String()) defer s.tun.log.Debugln("Stopping conn reader for", s.conn.String())
var n int
b := make([]byte, 65535)
for { for {
select { select {
case <-s.stop: case <-s.stop:
return nil return nil
default: default:
} }
if n, err = s.conn.Read(b); err != nil { var bs []byte
if bs, err = s.conn.ReadNoCopy(); err != nil {
if e, eok := err.(yggdrasil.ConnError); eok && !e.Temporary() { if e, eok := err.(yggdrasil.ConnError); eok && !e.Temporary() {
if e.Closed() { if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn read debug:", err) s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn read debug:", err)
@ -70,14 +69,11 @@ func (s *tunConn) reader() (err error) {
} }
return e return e
} }
} else if n > 0 { } else if len(bs) > 0 {
bs := append(util.GetBytes(), b[:n]...) s.tun.send <- bs
select {
case s.tun.send <- bs:
default:
util.PutBytes(bs)
}
s.stillAlive() s.stillAlive()
} else {
util.PutBytes(bs)
} }
} }
} }
@ -96,12 +92,15 @@ func (s *tunConn) writer() error {
select { select {
case <-s.stop: case <-s.stop:
return nil return nil
case b, ok := <-s.send: case bs, ok := <-s.send:
if !ok { if !ok {
return errors.New("send closed") return errors.New("send closed")
} }
// TODO write timeout and close msg := yggdrasil.FlowKeyMessage{
if _, err := s.conn.Write(b); err != nil { FlowKey: util.GetFlowKey(bs),
Message: bs,
}
if err := s.conn.WriteNoCopy(msg); err != nil {
if e, eok := err.(yggdrasil.ConnError); !eok { if e, eok := err.(yggdrasil.ConnError); !eok {
if e.Closed() { if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
@ -112,9 +111,9 @@ func (s *tunConn) writer() error {
// TODO: This currently isn't aware of IPv4 for CKR // TODO: This currently isn't aware of IPv4 for CKR
ptb := &icmp.PacketTooBig{ ptb := &icmp.PacketTooBig{
MTU: int(e.PacketMaximumSize()), MTU: int(e.PacketMaximumSize()),
Data: b[:900], Data: bs[:900],
} }
if packet, err := CreateICMPv6(b[8:24], b[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil { if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
s.tun.send <- packet s.tun.send <- packet
} }
} else { } else {
@ -127,7 +126,6 @@ func (s *tunConn) writer() error {
} else { } else {
s.stillAlive() s.stillAlive()
} }
util.PutBytes(b)
} }
} }
} }

View File

@ -139,8 +139,10 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
continue continue
} }
} }
// Shift forward to avoid leaking bytes off the front of the slide when we eventually store it if offset != 0 {
bs = append(recvd[:0], bs...) // Shift forward to avoid leaking bytes off the front of the slice when we eventually store it
bs = append(recvd[:0], bs...)
}
// From the IP header, work out what our source and destination addresses // From the IP header, work out what our source and destination addresses
// and node IDs are. We will need these in order to work out where to send // and node IDs are. We will need these in order to work out where to send
// the packet // the packet
@ -260,11 +262,8 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
tun.mutex.Unlock() tun.mutex.Unlock()
if tc != nil { if tc != nil {
for _, packet := range packets { for _, packet := range packets {
select { p := packet // Possibly required because of how range
case tc.send <- packet: tc.send <- p
default:
util.PutBytes(packet)
}
} }
} }
}() }()
@ -274,21 +273,18 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
} }
// If we have a connection now, try writing to it // If we have a connection now, try writing to it
if isIn && session != nil { if isIn && session != nil {
select { session.send <- bs
case session.send <- bs:
default:
util.PutBytes(bs)
}
} }
} }
} }
func (tun *TunAdapter) reader() error { func (tun *TunAdapter) reader() error {
recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH)
toWorker := make(chan []byte, 32) toWorker := make(chan []byte, 32)
defer close(toWorker) defer close(toWorker)
go tun.readerPacketHandler(toWorker) go tun.readerPacketHandler(toWorker)
for { for {
// Get a slice to store the packet in
recvd := util.ResizeBytes(util.GetBytes(), 65535+tun_ETHER_HEADER_LENGTH)
// Wait for a packet to be delivered to us through the TUN/TAP adapter // Wait for a packet to be delivered to us through the TUN/TAP adapter
n, err := tun.iface.Read(recvd) n, err := tun.iface.Read(recvd)
if err != nil { if err != nil {
@ -298,9 +294,10 @@ func (tun *TunAdapter) reader() error {
panic(err) panic(err)
} }
if n == 0 { if n == 0 {
util.PutBytes(recvd)
continue continue
} }
bs := append(util.GetBytes(), recvd[:n]...) // Send the packet to the worker
toWorker <- bs toWorker <- recvd[:n]
} }
} }

View File

@ -5,11 +5,7 @@ package tuntap
// The linux platform specific tun parts // The linux platform specific tun parts
import ( import (
"errors" "github.com/vishvananda/netlink"
"fmt"
"net"
"github.com/docker/libcontainer/netlink"
water "github.com/yggdrasil-network/water" water "github.com/yggdrasil-network/water"
) )
@ -51,35 +47,21 @@ func (tun *TunAdapter) setup(ifname string, iftapmode bool, addr string, mtu int
// to exist on the system, but this will fail if Netlink is not present in the // to exist on the system, but this will fail if Netlink is not present in the
// kernel (it nearly always is). // kernel (it nearly always is).
func (tun *TunAdapter) setupAddress(addr string) error { func (tun *TunAdapter) setupAddress(addr string) error {
// Set address nladdr, err := netlink.ParseAddr(addr)
var netIF *net.Interface
ifces, err := net.Interfaces()
if err != nil { if err != nil {
return err return err
} }
for _, ifce := range ifces { nlintf, err := netlink.LinkByName(tun.iface.Name())
if ifce.Name == tun.iface.Name() {
var newIF = ifce
netIF = &newIF // Don't point inside ifces, it's apparently unsafe?...
}
}
if netIF == nil {
return errors.New(fmt.Sprintf("Failed to find interface: %s", tun.iface.Name()))
}
ip, ipNet, err := net.ParseCIDR(addr)
if err != nil { if err != nil {
return err return err
} }
err = netlink.NetworkLinkAddIp(netIF, ip, ipNet) if err := netlink.AddrAdd(nlintf, nladdr); err != nil {
if err != nil {
return err return err
} }
err = netlink.NetworkSetMTU(netIF, tun.mtu) if err := netlink.LinkSetMTU(nlintf, tun.mtu); err != nil {
if err != nil {
return err return err
} }
netlink.NetworkLinkUp(netIF) if err := netlink.LinkSetUp(nlintf); err != nil {
if err != nil {
return err return err
} }
return nil return nil

View File

@ -2,9 +2,13 @@ package util
// These are misc. utility functions that didn't really fit anywhere else // These are misc. utility functions that didn't really fit anywhere else
import "runtime" import (
import "sync" "runtime"
import "time" "strconv"
"strings"
"sync"
"time"
)
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
func Yield() { func Yield() {
@ -22,27 +26,25 @@ func UnlockThread() {
} }
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops. // This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
var byteStoreMutex sync.Mutex var byteStore = sync.Pool{New: func() interface{} { return []byte(nil) }}
var byteStore [][]byte
// Gets an empty slice from the byte store. // Gets an empty slice from the byte store.
func GetBytes() []byte { func GetBytes() []byte {
byteStoreMutex.Lock() return byteStore.Get().([]byte)[:0]
defer byteStoreMutex.Unlock()
if len(byteStore) > 0 {
var bs []byte
bs, byteStore = byteStore[len(byteStore)-1][:0], byteStore[:len(byteStore)-1]
return bs
} else {
return nil
}
} }
// Puts a slice in the store. // Puts a slice in the store.
func PutBytes(bs []byte) { func PutBytes(bs []byte) {
byteStoreMutex.Lock() byteStore.Put(bs)
defer byteStoreMutex.Unlock() }
byteStore = append(byteStore, bs)
// Gets a slice of the appropriate length, reusing existing slice capacity when possible
func ResizeBytes(bs []byte, length int) []byte {
if cap(bs) >= length {
return bs[:length]
} else {
return make([]byte, length)
}
} }
// This is a workaround to go's broken timer implementation // This is a workaround to go's broken timer implementation
@ -91,3 +93,54 @@ func Difference(a, b []string) []string {
} }
return ab return ab
} }
// DecodeCoordString decodes a string representing coordinates in [1 2 3] format
// and returns a []uint64.
func DecodeCoordString(in string) (out []uint64) {
s := strings.Trim(in, "[]")
t := strings.Split(s, " ")
for _, a := range t {
if u, err := strconv.ParseUint(a, 0, 64); err == nil {
out = append(out, u)
}
}
return out
}
// GetFlowLabel takes an IP packet as an argument and returns some information about the traffic flow.
// For IPv4 packets, this is derived from the source and destination protocol and port numbers.
// For IPv6 packets, this is derived from the FlowLabel field of the packet if this was set, otherwise it's handled like IPv4.
// The FlowKey is then used internally by Yggdrasil for congestion control.
func GetFlowKey(bs []byte) uint64 {
// Work out the flowkey - this is used to determine which switch queue
// traffic will be pushed to in the event of congestion
var flowkey uint64
// Get the IP protocol version from the packet
switch bs[0] & 0xf0 {
case 0x40: // IPv4 packet
// Check the packet meets minimum UDP packet length
if len(bs) >= 24 {
// Is the protocol TCP, UDP or SCTP?
if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 {
ihl := bs[0] & 0x0f * 4 // Header length
flowkey = uint64(bs[9])<<32 /* proto */ |
uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ |
uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */
}
}
case 0x60: // IPv6 packet
// Check if the flowlabel was specified in the packet header
flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3])
// If the flowlabel isn't present, make protokey from proto | sport | dport
// if the packet meets minimum UDP packet length
if flowkey == 0 && len(bs) >= 48 {
// Is the protocol TCP, UDP or SCTP?
if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 {
flowkey = uint64(bs[6])<<32 /* proto */ |
uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ |
uint64(bs[42])<<8 | uint64(bs[43]) /* dport */
}
}
}
return flowkey
}

29
src/util/workerpool.go Normal file
View File

@ -0,0 +1,29 @@
package util
import "runtime"
var workerPool chan func()
func init() {
maxProcs := runtime.GOMAXPROCS(0)
if maxProcs < 1 {
maxProcs = 1
}
workerPool = make(chan func(), maxProcs)
for idx := 0; idx < maxProcs; idx++ {
go func() {
for f := range workerPool {
f()
}
}()
}
}
// WorkerGo submits a job to a pool of GOMAXPROCS worker goroutines.
// This is meant for short non-blocking functions f() where you could just go f(),
// but you want some kind of backpressure to prevent spawning endless goroutines.
// WorkerGo returns as soon as the function is queued to run, not when it finishes.
// In Yggdrasil, these workers are used for certain cryptographic operations.
func WorkerGo(f func()) {
workerPool <- f
}

22
src/version/version.go Normal file
View File

@ -0,0 +1,22 @@
package version
var buildName string
var buildVersion string
// BuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func BuildName() string {
if buildName == "" {
return "yggdrasilctl"
}
return buildName
}
// BuildVersion gets the current build version. This is usually injected if
// built from git, or returns "unknown" otherwise.
func BuildVersion() string {
if buildVersion == "" {
return "unknown"
}
return buildVersion
}

View File

@ -6,8 +6,6 @@ import (
"fmt" "fmt"
"net" "net"
"sort" "sort"
"strconv"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -34,7 +32,7 @@ type Peer struct {
// to a given node. // to a given node.
type SwitchPeer struct { type SwitchPeer struct {
PublicKey crypto.BoxPubKey PublicKey crypto.BoxPubKey
Coords []byte Coords []uint64
BytesSent uint64 BytesSent uint64
BytesRecvd uint64 BytesRecvd uint64
Port uint64 Port uint64
@ -46,14 +44,14 @@ type SwitchPeer struct {
// DHT searches. // DHT searches.
type DHTEntry struct { type DHTEntry struct {
PublicKey crypto.BoxPubKey PublicKey crypto.BoxPubKey
Coords []byte Coords []uint64
LastSeen time.Duration LastSeen time.Duration
} }
// DHTRes represents a DHT response, as returned by DHTPing. // DHTRes represents a DHT response, as returned by DHTPing.
type DHTRes struct { type DHTRes struct {
PublicKey crypto.BoxPubKey // key of the sender PublicKey crypto.BoxPubKey // key of the sender
Coords []byte // coords of the sender Coords []uint64 // coords of the sender
Dest crypto.NodeID // the destination node ID Dest crypto.NodeID // the destination node ID
Infos []DHTEntry // response Infos []DHTEntry // response
} }
@ -85,7 +83,7 @@ type SwitchQueue struct {
// Session represents an open session with another node. // Session represents an open session with another node.
type Session struct { type Session struct {
PublicKey crypto.BoxPubKey PublicKey crypto.BoxPubKey
Coords []byte Coords []uint64
BytesSent uint64 BytesSent uint64
BytesRecvd uint64 BytesRecvd uint64
MTU uint16 MTU uint16
@ -138,7 +136,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
} }
coords := elem.locator.getCoords() coords := elem.locator.getCoords()
info := SwitchPeer{ info := SwitchPeer{
Coords: append([]byte{}, coords...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...),
BytesSent: atomic.LoadUint64(&peer.bytesSent), BytesSent: atomic.LoadUint64(&peer.bytesSent),
BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd),
Port: uint64(elem.port), Port: uint64(elem.port),
@ -166,7 +164,7 @@ func (c *Core) GetDHT() []DHTEntry {
}) })
for _, v := range dhtentry { for _, v := range dhtentry {
info := DHTEntry{ info := DHTEntry{
Coords: append([]byte{}, v.coords...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(v.coords)...),
LastSeen: now.Sub(v.recv), LastSeen: now.Sub(v.recv),
} }
copy(info.PublicKey[:], v.key[:]) copy(info.PublicKey[:], v.key[:])
@ -214,7 +212,7 @@ func (c *Core) GetSessions() []Session {
var session Session var session Session
workerFunc := func() { workerFunc := func() {
session = Session{ session = Session{
Coords: append([]byte{}, sinfo.coords...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(sinfo.coords)...),
MTU: sinfo.getMTU(), MTU: sinfo.getMTU(),
BytesSent: sinfo.bytesSent, BytesSent: sinfo.bytesSent,
BytesRecvd: sinfo.bytesRecvd, BytesRecvd: sinfo.bytesRecvd,
@ -243,24 +241,6 @@ func (c *Core) GetSessions() []Session {
return sessions return sessions
} }
// BuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func BuildName() string {
if buildName == "" {
return "yggdrasil"
}
return buildName
}
// BuildVersion gets the current build version. This is usually injected if
// built from git, or returns "unknown" otherwise.
func BuildVersion() string {
if buildVersion == "" {
return "unknown"
}
return buildVersion
}
// ConnListen returns a listener for Yggdrasil session connections. // ConnListen returns a listener for Yggdrasil session connections.
func (c *Core) ConnListen() (*Listener, error) { func (c *Core) ConnListen() (*Listener, error) {
c.sessions.listenerMutex.Lock() c.sessions.listenerMutex.Lock()
@ -311,9 +291,9 @@ func (c *Core) EncryptionPublicKey() string {
} }
// Coords returns the current coordinates of the node. // Coords returns the current coordinates of the node.
func (c *Core) Coords() []byte { func (c *Core) Coords() []uint64 {
table := c.switchTable.table.Load().(lookupTable) table := c.switchTable.table.Load().(lookupTable)
return table.self.getCoords() return wire_coordsBytestoUint64s(table.self.getCoords())
} }
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128 // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
@ -336,8 +316,8 @@ func (c *Core) MyNodeInfo() NodeInfoPayload {
return c.router.nodeinfo.getNodeInfo() return c.router.nodeinfo.getNodeInfo()
} }
// SetNodeInfo the lcal nodeinfo. Note that nodeinfo can be any value or struct, // SetNodeInfo sets the local nodeinfo. Note that nodeinfo can be any value or
// it will be serialised into JSON automatically. // struct, it will be serialised into JSON automatically.
func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) { func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
} }
@ -346,30 +326,7 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
// key and coordinates specified. The third parameter specifies whether a cached // key and coordinates specified. The third parameter specifies whether a cached
// result is acceptable - this results in less traffic being generated than is // result is acceptable - this results in less traffic being generated than is
// necessary when, e.g. crawling the network. // necessary when, e.g. crawling the network.
func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInfoPayload, error) { func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return NodeInfoPayload{}, err
} else {
copy(key[:], keyBytes)
}
if !nocache {
if response, err := c.router.nodeinfo.getCachedNodeInfo(key); err == nil {
return response, nil
}
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return NodeInfoPayload{}, err
} else {
coords = append(coords, uint8(u64))
}
}
response := make(chan *NodeInfoPayload, 1) response := make(chan *NodeInfoPayload, 1)
sendNodeInfoRequest := func() { sendNodeInfoRequest := func() {
c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) { c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) {
@ -379,7 +336,7 @@ func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInf
default: default:
} }
}) })
c.router.nodeinfo.sendNodeInfo(key, coords, false) c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
} }
c.router.doAdmin(sendNodeInfoRequest) c.router.doAdmin(sendNodeInfoRequest)
go func() { go func() {
@ -389,7 +346,7 @@ func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInf
for res := range response { for res := range response {
return *res, nil return *res, nil
} }
return NodeInfoPayload{}, fmt.Errorf("getNodeInfo timeout: %s", keyString) return NodeInfoPayload{}, fmt.Errorf("getNodeInfo timeout: %s", hex.EncodeToString(key[:]))
} }
// SetSessionGatekeeper allows you to configure a handler function for deciding // SetSessionGatekeeper allows you to configure a handler function for deciding
@ -477,64 +434,38 @@ func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) {
// DHTPing sends a DHT ping to the node with the provided key and coords, // DHTPing sends a DHT ping to the node with the provided key and coords,
// optionally looking up the specified target NodeID. // optionally looking up the specified target NodeID.
func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, error) { func (c *Core) DHTPing(key crypto.BoxPubKey, coords []uint64, target *crypto.NodeID) (DHTRes, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return DHTRes{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return DHTRes{}, err
} else {
coords = append(coords, uint8(u64))
}
}
resCh := make(chan *dhtRes, 1) resCh := make(chan *dhtRes, 1)
info := dhtInfo{ info := dhtInfo{
key: key, key: key,
coords: coords, coords: wire_coordsUint64stoBytes(coords),
} }
target := *info.getNodeID() if target == nil {
if targetString == "none" { target = info.getNodeID()
// Leave the default target in place
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
return DHTRes{}, err
} else if len(targetBytes) != len(target) {
return DHTRes{}, errors.New("Incorrect target NodeID length")
} else {
var target crypto.NodeID
copy(target[:], targetBytes)
} }
rq := dhtReqKey{info.key, target} rq := dhtReqKey{info.key, *target}
sendPing := func() { sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) { c.dht.addCallback(&rq, func(res *dhtRes) {
resCh <- res resCh <- res
}) })
c.dht.ping(&info, &target) c.dht.ping(&info, &rq.dest)
} }
c.router.doAdmin(sendPing) c.router.doAdmin(sendPing)
// TODO: do something better than the below... // TODO: do something better than the below...
res := <-resCh res := <-resCh
if res != nil { if res != nil {
r := DHTRes{ r := DHTRes{
Coords: append([]byte{}, res.Coords...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(res.Coords)...),
} }
copy(r.PublicKey[:], res.Key[:]) copy(r.PublicKey[:], res.Key[:])
for _, i := range res.Infos { for _, i := range res.Infos {
e := DHTEntry{ e := DHTEntry{
Coords: append([]byte{}, i.coords...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(i.coords)...),
} }
copy(e.PublicKey[:], i.key[:]) copy(e.PublicKey[:], i.key[:])
r.Infos = append(r.Infos, e) r.Infos = append(r.Infos, e)
} }
return r, nil return r, nil
} }
return DHTRes{}, fmt.Errorf("DHT ping timeout: %s", keyString) return DHTRes{}, fmt.Errorf("DHT ping timeout: %s", hex.EncodeToString(key[:]))
} }

View File

@ -57,7 +57,6 @@ type Conn struct {
core *Core core *Core
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
cancel util.Cancellation
mutex sync.RWMutex // protects the below mutex sync.RWMutex // protects the below
nodeID *crypto.NodeID nodeID *crypto.NodeID
nodeMask *crypto.NodeID nodeMask *crypto.NodeID
@ -71,7 +70,6 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
session: session, session: session,
cancel: util.NewCancellation(),
} }
return &conn return &conn
} }
@ -82,7 +80,7 @@ func (c *Conn) String() string {
return fmt.Sprintf("conn=%p", c) return fmt.Sprintf("conn=%p", c)
} }
// This should never be called from the router goroutine // This should never be called from the router goroutine, used in the dial functions
func (c *Conn) search() error { func (c *Conn) search() error {
var sinfo *searchInfo var sinfo *searchInfo
var isIn bool var isIn bool
@ -122,133 +120,117 @@ func (c *Conn) search() error {
return nil return nil
} }
// Used in session keep-alive traffic in Conn.Write
func (c *Conn) doSearch() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Start the search
sinfo.continueSearch()
}
}
go func() { c.core.router.admin <- routerWork }()
}
func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation { func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation {
if deadline, ok := value.Load().(time.Time); ok { if deadline, ok := value.Load().(time.Time); ok {
// A deadline is set, so return a Cancellation that uses it // A deadline is set, so return a Cancellation that uses it
return util.CancellationWithDeadline(c.cancel, deadline) return util.CancellationWithDeadline(c.session.cancel, deadline)
} else { } else {
// No cancellation was set, so return a child cancellation with no timeout // No cancellation was set, so return a child cancellation with no timeout
return util.CancellationChild(c.cancel) return util.CancellationChild(c.session.cancel)
} }
} }
func (c *Conn) Read(b []byte) (int, error) { // Used internally by Read, the caller is responsible for util.PutBytes when they're done.
// Take a copy of the session object func (c *Conn) ReadNoCopy() ([]byte, error) {
sinfo := c.session
cancel := c.getDeadlineCancellation(&c.readDeadline) cancel := c.getDeadlineCancellation(&c.readDeadline)
defer cancel.Cancel(nil) defer cancel.Cancel(nil)
var bs []byte // Wait for some traffic to come through from the session
for { select {
// Wait for some traffic to come through from the session case <-cancel.Finished():
select { if cancel.Error() == util.CancellationTimeoutError {
case <-cancel.Finished(): return nil, ConnError{errors.New("read timeout"), true, false, false, 0}
if cancel.Error() == util.CancellationTimeoutError { } else {
return 0, ConnError{errors.New("read timeout"), true, false, false, 0} return nil, ConnError{errors.New("session closed"), false, false, true, 0}
} else {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
case p, ok := <-sinfo.recv:
// If the session is closed then do nothing
if !ok {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
var err error
sessionFunc := func() {
defer util.PutBytes(p.Payload)
// If the nonce is bad then drop the packet and return an error
if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
return
}
// Decrypt the packet
var isOK bool
bs, isOK = crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce)
// Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't
if !isOK {
err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, false, 0}
return
}
// Update the session
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
// Something went wrong in the session worker so abort
if err != nil {
if ce, ok := err.(*ConnError); ok && ce.Temporary() {
continue
}
return 0, err
}
// Copy results to the output slice and clean up
copy(b, bs)
util.PutBytes(bs)
// If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice
return len(bs), nil
} }
case bs := <-c.session.recv:
return bs, nil
} }
} }
func (c *Conn) Write(b []byte) (bytesWritten int, err error) { // Implements net.Conn.Read
sinfo := c.session func (c *Conn) Read(b []byte) (int, error) {
var packet []byte bs, err := c.ReadNoCopy()
written := len(b) if err != nil {
return 0, err
}
n := len(bs)
if len(bs) > len(b) {
n = len(b)
err = ConnError{errors.New("read buffer too small for entire packet"), false, true, false, 0}
}
// Copy results to the output slice and clean up
copy(b, bs)
util.PutBytes(bs)
// Return the number of bytes copied to the slice, along with any error
return n, err
}
// Used internally by Write, the caller must not reuse the argument bytes when no error occurs
func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
var err error
sessionFunc := func() { sessionFunc := func() {
// Does the packet exceed the permitted size for the session? // Does the packet exceed the permitted size for the session?
if uint16(len(b)) > sinfo.getMTU() { if uint16(len(msg.Message)) > c.session.getMTU() {
written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())}
return return
} }
// Encrypt the packet
payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, b, &sinfo.myNonce)
defer util.PutBytes(payload)
// Construct the wire packet to send to the router
p := wire_trafficPacket{
Coords: sinfo.coords,
Handle: sinfo.theirHandle,
Nonce: *nonce,
Payload: payload,
}
packet = p.encode()
sinfo.bytesSent += uint64(len(b))
// The rest of this work is session keep-alive traffic // The rest of this work is session keep-alive traffic
doSearch := func() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
sinfo.continueSearch()
}
go func() { c.core.router.admin <- routerWork }()
}
switch { switch {
case time.Since(sinfo.time) > 6*time.Second: case time.Since(c.session.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct // TODO double check that the above condition is correct
doSearch() c.doSearch()
} else { } else {
sinfo.core.sessions.ping(sinfo) c.core.sessions.ping(c.session)
} }
case sinfo.reset && sinfo.pingTime.Before(sinfo.time): case c.session.reset && c.session.pingTime.Before(c.session.time):
sinfo.core.sessions.ping(sinfo) c.core.sessions.ping(c.session)
default: // Don't do anything, to keep traffic throttled default: // Don't do anything, to keep traffic throttled
} }
} }
sinfo.doFunc(sessionFunc) c.session.doFunc(sessionFunc)
// Give the packet to the router if err == nil {
if written > 0 { cancel := c.getDeadlineCancellation(&c.writeDeadline)
sinfo.core.router.out(packet) defer cancel.Cancel(nil)
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
err = ConnError{errors.New("write timeout"), true, false, false, 0}
} else {
err = ConnError{errors.New("session closed"), false, false, true, 0}
}
case c.session.send <- msg:
}
}
return err
}
// Implements net.Conn.Write
func (c *Conn) Write(b []byte) (int, error) {
written := len(b)
msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)}
err := c.WriteNoCopy(msg)
if err != nil {
util.PutBytes(msg.Message)
written = 0
} }
// Finally return the number of bytes we wrote
return written, err return written, err
} }
@ -257,10 +239,9 @@ func (c *Conn) Close() (err error) {
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
c.core.router.doAdmin(c.session.close) if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
} err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
if e := c.cancel.Cancel(errors.New("connection closed")); e != nil { }
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
} }
return return
} }

View File

@ -10,11 +10,9 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
) )
var buildName string
var buildVersion string
// The Core object represents the Yggdrasil node. You should create a Core // The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run. // object for each Yggdrasil node you plan to run.
type Core struct { type Core struct {
@ -164,10 +162,10 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState,
Previous: *nc, Previous: *nc,
} }
if name := BuildName(); name != "unknown" { if name := version.BuildName(); name != "unknown" {
c.log.Infoln("Build name:", name) c.log.Infoln("Build name:", name)
} }
if version := BuildVersion(); version != "unknown" { if version := version.BuildVersion(); version != "unknown" {
c.log.Infoln("Build version:", version) c.log.Infoln("Build version:", version)
} }

View File

@ -69,6 +69,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er
defer t.Stop() defer t.Stop()
select { select {
case <-conn.session.init: case <-conn.session.init:
conn.session.startWorkers()
return conn, nil return conn, nil
case <-t.C: case <-t.C:
conn.Close() conn.Close()

View File

@ -179,7 +179,10 @@ func (intf *linkInterface) handler() error {
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name)
intf.msgIO.close() intf.msgIO.close()
<-oldIntf.closed if !intf.incoming {
// Block outgoing connection attempts until the existing connection closes
<-oldIntf.closed
}
return nil return nil
} else { } else {
intf.closed = make(chan struct{}) intf.closed = make(chan struct{})

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
) )
type nodeinfo struct { type nodeinfo struct {
@ -99,8 +100,8 @@ func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error {
m.myNodeInfoMutex.Lock() m.myNodeInfoMutex.Lock()
defer m.myNodeInfoMutex.Unlock() defer m.myNodeInfoMutex.Unlock()
defaults := map[string]interface{}{ defaults := map[string]interface{}{
"buildname": BuildName(), "buildname": version.BuildName(),
"buildversion": BuildVersion(), "buildversion": version.BuildVersion(),
"buildplatform": runtime.GOOS, "buildplatform": runtime.GOOS,
"buildarch": runtime.GOARCH, "buildarch": runtime.GOARCH,
} }

View File

@ -127,7 +127,6 @@ func (r *router) mainLoop() {
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance() r.core.dht.doMaintenance()
r.core.sessions.cleanup() r.core.sessions.cleanup()
util.GetBytes() // To slowly drain things
} }
case f := <-r.admin: case f := <-r.admin:
f() f()
@ -163,11 +162,12 @@ func (r *router) handleTraffic(packet []byte) {
} }
sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle) sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle)
if !isIn { if !isIn {
util.PutBytes(p.Payload)
return return
} }
select { select {
case sinfo.recv <- &p: // FIXME ideally this should be front drop case sinfo.fromRouter <- p:
default: case <-sinfo.cancel.Finished():
util.PutBytes(p.Payload) util.PutBytes(p.Payload)
} }
} }

View File

@ -65,17 +65,11 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info. // Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now()
//for dest, sinfo := range s.searches {
// if now.Sub(sinfo.time) > time.Minute {
// delete(s.searches, dest)
// }
//}
info := searchInfo{ info := searchInfo{
core: s.core, core: s.core,
dest: *dest, dest: *dest,
mask: *mask, mask: *mask,
time: now.Add(-time.Second), time: time.Now(),
callback: callback, callback: callback,
} }
s.searches[*dest] = &info s.searches[*dest] = &info
@ -138,9 +132,11 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (sinfo *searchInfo) doSearchStep() { func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 { if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup if time.Since(sinfo.time) > search_RETRY_TIME {
delete(sinfo.core.searches.searches, sinfo.dest) // Dead end and no response in too long, do cleanup
sinfo.callback(nil, errors.New("search reached dead end")) delete(sinfo.core.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
}
return return
} }
// Send to the next search target // Send to the next search target
@ -149,15 +145,12 @@ func (sinfo *searchInfo) doSearchStep() {
rq := dhtReqKey{next.key, sinfo.dest} rq := dhtReqKey{next.key, sinfo.dest}
sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.dht.ping(next, &sinfo.dest) sinfo.core.dht.ping(next, &sinfo.dest)
sinfo.time = time.Now()
} }
// If we've recenty sent a ping for this search, do nothing. // If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (sinfo *searchInfo) continueSearch() { func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME {
return
}
sinfo.time = time.Now()
sinfo.doSearchStep() sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later // In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
@ -209,6 +202,8 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
if sess == nil { if sess == nil {
// nil if the DHT search finished but the session wasn't allowed // nil if the DHT search finished but the session wasn't allowed
sinfo.callback(nil, errors.New("session not allowed")) sinfo.callback(nil, errors.New("session not allowed"))
// Cleanup
delete(sinfo.core.searches.searches, res.Dest)
return true return true
} }
_, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)

View File

@ -6,46 +6,71 @@ package yggdrasil
import ( import (
"bytes" "bytes"
"container/heap"
"errors"
"sync" "sync"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
const nonceWindow = time.Second
// A heap of nonces, used with a map[nonce]time to allow out-of-order packets a little time to arrive without rejecting them
type nonceHeap []crypto.BoxNonce
func (h nonceHeap) Len() int { return len(h) }
func (h nonceHeap) Less(i, j int) bool { return h[i].Minus(&h[j]) < 0 }
func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nonceHeap) Push(x interface{}) { *h = append(*h, x.(crypto.BoxNonce)) }
func (h *nonceHeap) Pop() interface{} {
l := len(*h)
var n crypto.BoxNonce
n, *h = (*h)[l-1], (*h)[:l-1]
return n
}
func (h nonceHeap) peek() *crypto.BoxNonce { return &h[len(h)-1] }
// All the information we know about an active session. // All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct { type sessionInfo struct {
mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session
core *Core // core *Core //
reconfigure chan chan error // reconfigure chan chan error //
theirAddr address.Address // theirAddr address.Address //
theirSubnet address.Subnet // theirSubnet address.Subnet //
theirPermPub crypto.BoxPubKey // theirPermPub crypto.BoxPubKey //
theirSesPub crypto.BoxPubKey // theirSesPub crypto.BoxPubKey //
mySesPub crypto.BoxPubKey // mySesPub crypto.BoxPubKey //
mySesPriv crypto.BoxPrivKey // mySesPriv crypto.BoxPrivKey //
sharedSesKey crypto.BoxSharedKey // derived from session keys sharedSesKey crypto.BoxSharedKey // derived from session keys
theirHandle crypto.Handle // theirHandle crypto.Handle //
myHandle crypto.Handle // myHandle crypto.Handle //
theirNonce crypto.BoxNonce // theirNonce crypto.BoxNonce //
theirNonceMask uint64 // theirNonceHeap nonceHeap // priority queue to keep track of the lowest nonce we recently accepted
myNonce crypto.BoxNonce // theirNonceMap map[crypto.BoxNonce]time.Time // time we added each nonce to the heap
theirMTU uint16 // myNonce crypto.BoxNonce //
myMTU uint16 // theirMTU uint16 //
wasMTUFixed bool // Was the MTU fixed by a receive error? myMTU uint16 //
timeOpened time.Time // Time the sessino was opened wasMTUFixed bool // Was the MTU fixed by a receive error?
time time.Time // Time we last received a packet timeOpened time.Time // Time the sessino was opened
mtuTime time.Time // time myMTU was last changed time time.Time // Time we last received a packet
pingTime time.Time // time the first ping was sent since the last received packet mtuTime time.Time // time myMTU was last changed
pingSend time.Time // time the last ping was sent pingTime time.Time // time the first ping was sent since the last received packet
coords []byte // coords of destination pingSend time.Time // time the last ping was sent
reset bool // reset if coords change coords []byte // coords of destination
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation reset bool // reset if coords change
bytesSent uint64 // Bytes of real traffic sent in this session tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesRecvd uint64 // Bytes of real traffic received in this session bytesSent uint64 // Bytes of real traffic sent in this session
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn bytesRecvd uint64 // Bytes of real traffic received in this session
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers
fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session
recv chan []byte // Decrypted packets go here, picked up by the associated Conn
send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
} }
func (sinfo *sessionInfo) doFunc(f func()) { func (sinfo *sessionInfo) doFunc(f func()) {
@ -82,7 +107,8 @@ func (s *sessionInfo) update(p *sessionPing) bool {
s.theirHandle = p.Handle s.theirHandle = p.Handle
s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub) s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub)
s.theirNonce = crypto.BoxNonce{} s.theirNonce = crypto.BoxNonce{}
s.theirNonceMask = 0 s.theirNonceHeap = nil
s.theirNonceMap = make(map[crypto.BoxNonce]time.Time)
} }
if p.MTU >= 1280 || p.MTU == 0 { if p.MTU >= 1280 || p.MTU == 0 {
s.theirMTU = p.MTU s.theirMTU = p.MTU
@ -203,6 +229,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.pingTime = now sinfo.pingTime = now
sinfo.pingSend = now sinfo.pingSend = now
sinfo.init = make(chan struct{}) sinfo.init = make(chan struct{})
sinfo.cancel = util.NewCancellation()
higher := false higher := false
for idx := range ss.core.boxPub { for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@ -222,9 +249,16 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle() sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.recv = make(chan *wire_trafficPacket, 32) sinfo.fromRouter = make(chan wire_trafficPacket, 1)
sinfo.recv = make(chan []byte, 32)
sinfo.send = make(chan FlowKeyMessage, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go func() {
// Run cleanup when the session is canceled
<-sinfo.cancel.Finished()
sinfo.core.router.doAdmin(sinfo.close)
}()
return &sinfo return &sinfo
} }
@ -335,39 +369,40 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
func (ss *sessions) handlePing(ping *sessionPing) { func (ss *sessions) handlePing(ping *sessionPing) {
// Get the corresponding session (or create a new session) // Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
// Check if the session is allowed switch {
// TODO: this check may need to be moved case isIn: // Session already exists
if !isIn && !ss.isSessionAllowed(&ping.SendPermPub, false) { case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed
return case ping.IsPong: // This is a response, not an initial ping, so ignore it.
} default:
// Create the session if it doesn't already exist
if !isIn {
ss.createSession(&ping.SendPermPub)
sinfo, isIn = ss.getByTheirPerm(&ping.SendPermPub)
if !isIn {
panic("This should not happen")
}
ss.listenerMutex.Lock() ss.listenerMutex.Lock()
// Check and see if there's a Listener waiting to accept connections if ss.listener != nil {
// TODO: this should not block if nothing is accepting // This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions.
if !ping.IsPong && ss.listener != nil { // We need to create a session and pass it to the listener.
sinfo = ss.createSession(&ping.SendPermPub)
if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
panic("This should not happen")
}
conn := newConn(ss.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo) conn := newConn(ss.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
for i := range conn.nodeMask { for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF conn.nodeMask[i] = 0xFF
} }
ss.listener.conn <- conn conn.session.startWorkers()
c := ss.listener.conn
go func() { c <- conn }()
} }
ss.listenerMutex.Unlock() ss.listenerMutex.Unlock()
} }
sinfo.doFunc(func() { if sinfo != nil {
// Update the session sinfo.doFunc(func() {
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ // Update the session
return if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
} return
if !ping.IsPong { }
ss.sendPingPong(sinfo, true) if !ping.IsPong {
} ss.sendPingPong(sinfo, true)
}) }
})
}
} }
// Get the MTU of the session. // Get the MTU of the session.
@ -386,27 +421,40 @@ func (sinfo *sessionInfo) getMTU() uint16 {
// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. // Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received.
func (sinfo *sessionInfo) nonceIsOK(theirNonce *crypto.BoxNonce) bool { func (sinfo *sessionInfo) nonceIsOK(theirNonce *crypto.BoxNonce) bool {
// The bitmask is to allow for some non-duplicate out-of-order packets // The bitmask is to allow for some non-duplicate out-of-order packets
diff := theirNonce.Minus(&sinfo.theirNonce) if theirNonce.Minus(&sinfo.theirNonce) > 0 {
if diff > 0 { // This is newer than the newest nonce we've seen
return true return true
} }
return ^sinfo.theirNonceMask&(0x01<<uint64(-diff)) != 0 if len(sinfo.theirNonceHeap) > 0 {
if theirNonce.Minus(sinfo.theirNonceHeap.peek()) > 0 {
if _, isIn := sinfo.theirNonceMap[*theirNonce]; !isIn {
// This nonce is recent enough that we keep track of older nonces, but it's not one we've seen yet
return true
}
}
}
return false
} }
// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce
func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Shift nonce mask if needed // Start with some cleanup
// Set bit for len(sinfo.theirNonceHeap) > 64 {
diff := theirNonce.Minus(&sinfo.theirNonce) if time.Since(sinfo.theirNonceMap[*sinfo.theirNonceHeap.peek()]) < nonceWindow {
if diff > 0 { // This nonce is still fairly new, so keep it around
// This nonce is newer, so shift the window before setting the bit, and update theirNonce in the session info. break
sinfo.theirNonceMask <<= uint64(diff) }
sinfo.theirNonceMask &= 0x01 // TODO? reallocate the map in some cases, to free unused map space?
sinfo.theirNonce = *theirNonce delete(sinfo.theirNonceMap, *sinfo.theirNonceHeap.peek())
} else { heap.Pop(&sinfo.theirNonceHeap)
// This nonce is older, so set the bit but do not shift the window.
sinfo.theirNonceMask &= 0x01 << uint64(-diff)
} }
if theirNonce.Minus(&sinfo.theirNonce) > 0 {
// This nonce is the newest we've seen, so make a note of that
sinfo.theirNonce = *theirNonce
}
// Add it to the heap/map so we know not to allow it again
heap.Push(&sinfo.theirNonceHeap, *theirNonce)
sinfo.theirNonceMap[*theirNonce] = time.Now()
} }
// Resets all sessions to an uninitialized state. // Resets all sessions to an uninitialized state.
@ -418,3 +466,191 @@ func (ss *sessions) reset() {
}) })
} }
} }
////////////////////////////////////////////////////////////////////////////////
//////////////////////////// Worker Functions Below ////////////////////////////
////////////////////////////////////////////////////////////////////////////////
func (sinfo *sessionInfo) startWorkers() {
go sinfo.recvWorker()
go sinfo.sendWorker()
}
type FlowKeyMessage struct {
FlowKey uint64
Message []byte
}
func (sinfo *sessionInfo) recvWorker() {
// TODO move theirNonce etc into a struct that gets stored here, passed in over a channel
// Since there's no reason for anywhere else in the session code to need to *read* it...
// Only needs to be updated from the outside if a ping resets it...
// That would get rid of the need to take a mutex for the sessionFunc
var callbacks []chan func()
doRecv := func(p wire_trafficPacket) {
var bs []byte
var err error
var k crypto.BoxSharedKey
sessionFunc := func() {
if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
return
}
k = sinfo.sharedSesKey
}
sinfo.doFunc(sessionFunc)
if err != nil {
util.PutBytes(p.Payload)
return
}
var isOK bool
ch := make(chan func(), 1)
poolFunc := func() {
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
callback := func() {
util.PutBytes(p.Payload)
if !isOK {
util.PutBytes(bs)
return
}
sessionFunc = func() {
if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) {
// The session updated in the mean time, so return an error
err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
return
}
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
if err != nil {
// Not sure what else to do with this packet, I guess just drop it
util.PutBytes(bs)
} else {
// Pass the packet to the buffer for Conn.Read
select {
case <-sinfo.cancel.Finished():
util.PutBytes(bs)
case sinfo.recv <- bs:
}
}
}
ch <- callback
}
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
}
fromHelper := make(chan wire_trafficPacket, 1)
go func() {
var buf []wire_trafficPacket
for {
for len(buf) > 0 {
select {
case <-sinfo.cancel.Finished():
return
case p := <-sinfo.fromRouter:
buf = append(buf, p)
for len(buf) > 64 { // Based on nonce window size
util.PutBytes(buf[0].Payload)
buf = buf[1:]
}
case fromHelper <- buf[0]:
buf = buf[1:]
}
}
select {
case <-sinfo.cancel.Finished():
return
case p := <-sinfo.fromRouter:
buf = append(buf, p)
}
}
}()
for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case p := <-fromHelper:
doRecv(p)
}
}
select {
case <-sinfo.cancel.Finished():
return
case p := <-fromHelper:
doRecv(p)
}
}
}
func (sinfo *sessionInfo) sendWorker() {
// TODO move info that this worker needs here, send updates via a channel
// Otherwise we need to take a mutex to avoid races with update()
var callbacks []chan func()
doSend := func(msg FlowKeyMessage) {
var p wire_trafficPacket
var k crypto.BoxSharedKey
sessionFunc := func() {
sinfo.bytesSent += uint64(len(msg.Message))
p = wire_trafficPacket{
Coords: append([]byte(nil), sinfo.coords...),
Handle: sinfo.theirHandle,
Nonce: sinfo.myNonce,
}
if msg.FlowKey != 0 {
// Helps ensure that traffic from this flow ends up in a separate queue from other flows
// The zero padding relies on the fact that the self-peer is always on port 0
p.Coords = append(p.Coords, 0)
p.Coords = wire_put_uint64(msg.FlowKey, p.Coords)
}
sinfo.myNonce.Increment()
k = sinfo.sharedSesKey
}
// Get the mutex-protected info needed to encrypt the packet
sinfo.doFunc(sessionFunc)
ch := make(chan func(), 1)
poolFunc := func() {
// Encrypt the packet
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
// The callback will send the packet
callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode()
// Cleanup
util.PutBytes(msg.Message)
util.PutBytes(p.Payload)
// Send the packet
sinfo.core.router.out(packet)
}
ch <- callback
}
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
}
for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case msg := <-sinfo.send:
doSend(msg)
}
}
select {
case <-sinfo.cancel.Finished():
return
case bs := <-sinfo.send:
doSend(bs)
}
}
}

View File

@ -1,9 +1,11 @@
package yggdrasil package yggdrasil
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
@ -13,9 +15,8 @@ var _ = linkInterfaceMsgIO(&stream{})
type stream struct { type stream struct {
rwc io.ReadWriteCloser rwc io.ReadWriteCloser
inputBuffer []byte // Incoming packet stream inputBuffer *bufio.Reader
frag [2 * streamMsgSize]byte // Temporary data read off the underlying rwc, on its way to the inputBuffer outputBuffer net.Buffers
outputBuffer [2 * streamMsgSize]byte // Temporary data about to be written to the rwc
} }
func (s *stream) close() error { func (s *stream) close() error {
@ -30,19 +31,24 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
// TODO have this also do the metadata handshake and create the peer struct // TODO have this also do the metadata handshake and create the peer struct
s.rwc = rwc s.rwc = rwc
// TODO call something to do the metadata exchange // TODO call something to do the metadata exchange
s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize)
} }
// writeMsg writes a message with stream padding, and is *not* thread safe. // writeMsg writes a message with stream padding, and is *not* thread safe.
func (s *stream) writeMsg(bs []byte) (int, error) { func (s *stream) writeMsg(bs []byte) (int, error) {
buf := s.outputBuffer[:0] buf := s.outputBuffer[:0]
buf = append(buf, streamMsg[:]...) buf = append(buf, streamMsg[:])
buf = wire_put_uint64(uint64(len(bs)), buf) l := wire_put_uint64(uint64(len(bs)), util.GetBytes())
padLen := len(buf) defer util.PutBytes(l)
buf = append(buf, bs...) buf = append(buf, l)
padLen := len(buf[0]) + len(buf[1])
buf = append(buf, bs)
totalLen := padLen + len(bs)
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
var bn int var bn int
for bn < len(buf) { for bn < totalLen {
n, err := s.rwc.Write(buf[bn:]) n, err := buf.WriteTo(s.rwc)
bn += n bn += int(n)
if err != nil { if err != nil {
l := bn - padLen l := bn - padLen
if l < 0 { if l < 0 {
@ -57,26 +63,11 @@ func (s *stream) writeMsg(bs []byte) (int, error) {
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
func (s *stream) readMsg() ([]byte, error) { func (s *stream) readMsg() ([]byte, error) {
for { for {
buf := s.inputBuffer bs, err := s.readMsgFromBuffer()
msg, ok, err := stream_chopMsg(&buf) if err != nil {
switch {
case err != nil:
// Something in the stream format is corrupt
return nil, fmt.Errorf("message error: %v", err) return nil, fmt.Errorf("message error: %v", err)
case ok:
// Copy the packet into bs, shift the buffer, and return
msg = append(util.GetBytes(), msg...)
s.inputBuffer = append(s.inputBuffer[:0], buf...)
return msg, nil
default:
// Wait for the underlying reader to return enough info for us to proceed
n, err := s.rwc.Read(s.frag[:])
if n > 0 {
s.inputBuffer = append(s.inputBuffer, s.frag[:n]...)
} else if err != nil {
return nil, err
}
} }
return bs, err
} }
} }
@ -108,34 +99,30 @@ func (s *stream) _recvMetaBytes() ([]byte, error) {
return metaBytes, nil return metaBytes, nil
} }
// This takes a pointer to a slice as an argument. It checks if there's a // Reads bytes from the underlying rwc and returns 1 full message
// complete message and, if so, slices out those parts and returns the message, func (s *stream) readMsgFromBuffer() ([]byte, error) {
// true, and nil. If there's no error, but also no complete message, it returns pad := streamMsg // Copy
// nil, false, and nil. If there's an error, it returns nil, false, and the _, err := io.ReadFull(s.inputBuffer, pad[:])
// error, which the reader then handles (currently, by returning from the if err != nil {
// reader, which causes the connection to close). return nil, err
func stream_chopMsg(bs *[]byte) ([]byte, bool, error) { } else if pad != streamMsg {
// Returns msg, ok, err return nil, errors.New("bad message")
if len(*bs) < len(streamMsg) {
return nil, false, nil
} }
for idx := range streamMsg { lenSlice := make([]byte, 0, 10)
if (*bs)[idx] != streamMsg[idx] { // FIXME this nextByte stuff depends on wire.go format, kind of ugly to have it here
return nil, false, errors.New("bad message") nextByte := byte(0xff)
for nextByte > 127 {
nextByte, err = s.inputBuffer.ReadByte()
if err != nil {
return nil, err
} }
lenSlice = append(lenSlice, nextByte)
} }
msgLen, msgLenLen := wire_decode_uint64((*bs)[len(streamMsg):]) msgLen, _ := wire_decode_uint64(lenSlice)
if msgLen > streamMsgSize { if msgLen > streamMsgSize {
return nil, false, errors.New("oversized message") return nil, errors.New("oversized message")
} }
msgBegin := len(streamMsg) + msgLenLen msg := util.ResizeBytes(util.GetBytes(), int(msgLen))
msgEnd := msgBegin + int(msgLen) _, err = io.ReadFull(s.inputBuffer, msg)
if msgLenLen == 0 || len(*bs) < msgEnd { return msg, err
// We don't have the full message
// Need to buffer this and wait for the rest to come in
return nil, false, nil
}
msg := (*bs)[msgBegin:msgEnd]
(*bs) = (*bs)[msgEnd:]
return msg, true, nil
} }

View File

@ -630,6 +630,16 @@ func switch_getPacketStreamID(packet []byte) string {
return string(switch_getPacketCoords(packet)) return string(switch_getPacketCoords(packet))
} }
// Returns the flowlabel from a given set of coords
func switch_getFlowLabelFromCoords(in []byte) []byte {
for i, v := range in {
if v == 0 {
return in[i+1:]
}
}
return []byte{}
}
// Find the best port for a given set of coords // Find the best port for a given set of coords
func (t *switchTable) bestPortForCoords(coords []byte) switchPort { func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
table := t.getTable() table := t.getTable()
@ -667,19 +677,28 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
var update bool var update bool
switch { switch {
case to == nil: case to == nil:
//nothing // no port was found, ignore it
case !isIdle: case !isIdle:
//nothing // the port is busy, ignore it
case best == nil: case best == nil:
// this is the first idle port we've found, so select it until we find a
// better candidate port to use instead
update = true update = true
case cinfo.dist < bestDist: case cinfo.dist < bestDist:
// the port takes a shorter path/is more direct than our current
// candidate, so select that instead
update = true update = true
case cinfo.dist > bestDist: case cinfo.dist > bestDist:
//nothing // the port takes a longer path/is less direct than our current candidate,
case thisTime.Before(bestTime): // ignore it
case thisTime.After(bestTime):
// all else equal, this port was used more recently than our current
// candidate, so choose that instead. this should mean that, in low
// traffic scenarios, we consistently pick the same link which helps with
// packet ordering
update = true update = true
default: default:
//nothing // the search for a port has finished
} }
if update { if update {
best = to best = to
@ -692,10 +711,9 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
delete(idle, best.port) delete(idle, best.port)
best.sendPacket(packet) best.sendPacket(packet)
return true return true
} else {
// Didn't find anyone idle to send it to
return false
} }
// Didn't find anyone idle to send it to
return false
} }
// Info about a buffered packet // Info about a buffered packet
@ -814,17 +832,23 @@ func (t *switchTable) doWorker() {
go func() { go func() {
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer // Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
var buf [][]byte var buf [][]byte
var size int
for { for {
buf = append(buf, <-t.toRouter) bs := <-t.toRouter
size += len(bs)
buf = append(buf, bs)
for len(buf) > 0 { for len(buf) > 0 {
select { select {
case bs := <-t.toRouter: case bs := <-t.toRouter:
size += len(bs)
buf = append(buf, bs) buf = append(buf, bs)
for len(buf) > 32 { for size > int(t.queueTotalMaxSize) {
size -= len(buf[0])
util.PutBytes(buf[0]) util.PutBytes(buf[0])
buf = buf[1:] buf = buf[1:]
} }
case sendingToRouter <- buf[0]: case sendingToRouter <- buf[0]:
size -= len(buf[0])
buf = buf[1:] buf = buf[1:]
} }
} }

View File

@ -115,6 +115,29 @@ func wire_decode_coords(packet []byte) ([]byte, int) {
return packet[coordBegin:coordEnd], coordEnd return packet[coordBegin:coordEnd], coordEnd
} }
// Converts a []uint64 set of coords to a []byte set of coords.
func wire_coordsUint64stoBytes(in []uint64) (out []byte) {
for _, coord := range in {
c := wire_encode_uint64(coord)
out = append(out, c...)
}
return out
}
// Converts a []byte set of coords to a []uint64 set of coords.
func wire_coordsBytestoUint64s(in []byte) (out []uint64) {
offset := 0
for {
coord, length := wire_decode_uint64(in[offset:])
if length == 0 {
break
}
out = append(out, coord)
offset += length
}
return out
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Encodes a swtichMsg into its wire format. // Encodes a swtichMsg into its wire format.