Compare commits

..

84 Commits

Author SHA1 Message Date
Neil Alexander
562a7d1f19 Merge pull request #516 from yggdrasil-network/develop
Version 0.3.8
2019-08-21 18:19:56 +01:00
Neil Alexander
0cb99d522f Update changelog 2019-08-21 18:18:46 +01:00
Neil Alexander
1308cb37b9 Merge pull request #515 from Arceliar/tidy
Clean up go.mod / go.sum
2019-08-21 07:11:07 +01:00
Arceliar
0d5dd9c455 update crypto dependency and run go mod tidy 2019-08-20 23:44:20 -05:00
Arceliar
12ce8c6a0a Merge pull request #512 from neilalexander/cryptokey
Cryptokey routing changes
2019-08-20 20:23:00 -05:00
Arceliar
f9d28e80df Merge pull request #514 from Arceliar/bugfix
hopefully prevent a deadlock
2019-08-20 19:13:00 -05:00
Arceliar
226dd6170d hopefully prevent a deadlock 2019-08-20 18:49:53 -05:00
Arceliar
4156aa3003 move ckr checks into the tunConn code 2019-08-20 18:10:08 -05:00
Neil Alexander
b79829c43b Merge branch 'develop' into cryptokey 2019-08-20 09:43:17 +01:00
Neil Alexander
ca73cf9e98 Merge pull request #513 from Arceliar/speedup
More speedup
2019-08-20 09:43:00 +01:00
Neil Alexander
b6e67bc0ba Check CKR remotes when receiving traffic 2019-08-20 09:38:46 +01:00
Neil Alexander
2b6462c8a9 Strict checking of Yggdrasil source/destination addresses 2019-08-20 09:38:27 +01:00
Arceliar
834a6a6f1a don't allocate a new child cancellation in Conn read/write calls if no deadline is set 2019-08-19 18:06:05 -05:00
Neil Alexander
2a629880fd Rename crypto-key config options, improve control flow 2019-08-19 10:28:30 +01:00
Arceliar
c04816b4bd Merge pull request #510 from Arceliar/streamWrites
Send multiple packets from the switch at once
2019-08-18 18:19:06 -05:00
Arceliar
8af1a7086c when a link becomes idle and packet are buffered that the link could send, send at least 65535 bytes worth instead of 1 packet, this reduces syscall overhead when small packets are sent through the network 2019-08-18 12:29:07 -05:00
Arceliar
62337bcd64 allow links to send multiple packets at once, currently we still only bother to send 1 at a time from the switch level 2019-08-18 12:17:54 -05:00
Neil Alexander
009d9c9ec0 Merge pull request #505 from yggdrasil-network/develop
Version 0.3.7
2019-08-18 11:20:50 +01:00
Arceliar
039dd98f0d Update CHANGELOG.md 2019-08-17 12:46:34 -05:00
Arceliar
57e7acdda8 Update CHANGELOG.md 2019-08-17 12:42:17 -05:00
Neil Alexander
80535d402f Merge pull request #508 from Arceliar/nonce
New nonce tracking
2019-08-17 10:55:21 +01:00
Arceliar
fd5f3ca764 fix heap pop order 2019-08-16 23:07:40 -05:00
Arceliar
03b8af9f1a keep track of recent nonces with a heap and a map instead of a fixed-size bitmask 2019-08-16 18:37:16 -05:00
Neil Alexander
fdac8932a8 Update changelog 2019-08-15 13:11:54 +01:00
Neil Alexander
ae0fe93de5 Update changelog 2019-08-15 12:54:04 +01:00
Neil Alexander
adf69d0127 Merge pull request #506 from Arceliar/switchorder
Switch priority change
2019-08-15 11:00:12 +01:00
Neil Alexander
5b054766a2 Update comments in handleIn, add switch_getFlowLabelFromCoords helper (in case it is useful if we try to consider flowlabels in multi-link scenarios) 2019-08-15 10:54:04 +01:00
Arceliar
382c2e6546 even more go.sum 2019-08-14 18:14:24 -05:00
Arceliar
1a2b7a8b60 test a change to how switch hops are selected when multiple links are idle 2019-08-14 17:57:36 -05:00
Neil Alexander
2abb71682f Update changelog, readme, go.mod/go.sum 2019-08-14 22:21:30 +01:00
Neil Alexander
f26f071901 Merge pull request #497 from Slex/issues/488
Implement feature from https://github.com/yggdrasil-network/yggdrasil…
2019-08-14 20:11:15 +01:00
Neil Alexander
02bfe28399 Minor tweaks 2019-08-14 20:09:02 +01:00
Neil Alexander
2cec5bf108 Merge pull request #504 from neilalexander/netlink
Use new netlink library (fixes #493)
2019-08-14 19:59:30 +01:00
Neil Alexander
33cd10c463 Merge branch 'issues/488' of github.com:slex/yggdrasil-go into issues/488 2019-08-14 19:58:45 +01:00
Neil Alexander
4702da2bcb Use new netlink library (fixes #493) 2019-08-14 19:32:40 +01:00
Neil Alexander
d9fabad8bc Merge pull request #502 from Arceliar/linkleak
Try to fix leaks in #501
2019-08-14 07:17:39 +01:00
Arceliar
46c5df1c23 when we abandon a link because we already have a connection to that peer, only wait for the connection to close if it's an *outgoing* link, otherwise incomming connection attempts can cause us to leak links 2019-08-13 18:49:49 -05:00
Neil Alexander
5e7df5a1c4 Merge pull request #499 from yggdrasil-network/sessionfix
Prevent session leaks or blocking if the listener is busy
2019-08-13 08:27:30 +01:00
Arceliar
b2cb1d965c avoid leaking sessions when no listener exists, or blocking if it's busy 2019-08-12 18:22:30 -05:00
Arceliar
c15976e4dc go.sum 2019-08-12 18:08:02 -05:00
Neil Alexander
70a118ae98 Update go.mod/go.sum 2019-08-12 11:41:29 +01:00
Neil Alexander
16076b53b9 Merge pull request #498 from Arceliar/search
Search fixes
2019-08-11 21:13:49 +01:00
Arceliar
277da1fe60 make sure searches don't end if try to continue (in parallel) with nowhere left to send, but we just sent a search and are still waiting for a response 2019-08-11 13:11:14 -05:00
Arceliar
7a28eb787e try to fix a few edge cases with searches that could lead them to ending without the callback being run or without cleaning up the old search info 2019-08-11 13:00:19 -05:00
Slex
589ad638ea Implement feature from https://github.com/yggdrasil-network/yggdrasil-go/issues/488 2019-08-11 00:31:22 +03:00
Arceliar
ae05683c73 Merge pull request #494 from Arceliar/bufpersession
Per-session front-dropping buffers for incoming traffic
2019-08-07 18:15:20 -05:00
Arceliar
5e81a0c421 Use a separate buffer per session for incoming packets, so 1 session that floods won't block other sessions 2019-08-07 18:08:31 -05:00
Arceliar
9ab08446ff make sure the sessionInfo.recvWorker doesn't block if sinfo.recv somehow fills 2019-08-07 17:40:50 -05:00
Neil Alexander
71e9ca25f7 Merge pull request #492 from neilalexander/fixlisten
Transform Listen statement to new format if needed
2019-08-07 10:57:55 +01:00
Neil Alexander
bbb35d7209 Transform Listen statement to new format if needed 2019-08-07 10:52:19 +01:00
Neil Alexander
c99ed9fb60 Merge pull request #491 from Arceliar/flowkey
Fix the old flowkey stuff so congestion control actually works...
2019-08-07 10:33:17 +01:00
Arceliar
d795ab1b65 minor allocation fix 2019-08-06 20:51:38 -05:00
Arceliar
790524bd1c copy/paste old flowkey logic into a util function, add a struct of key and packet, make WriteNoCopy accept this instead of a slice 2019-08-06 19:25:55 -05:00
Arceliar
6cb0ed91ad Merge pull request #486 from Arceliar/bugfix
Conn/Session cleanup
2019-08-05 19:17:30 -05:00
Arceliar
679866d5ff have createSession fill the sessionInfo.cancel field, have Conn use Conn.session.cancel instead of storing its own cancellation, this should prevent any of these things from being both nil and reachable at the same time 2019-08-05 19:11:28 -05:00
Arceliar
032b86c9a3 Merge pull request #485 from Arceliar/bugfix
Bugfix
2019-08-05 18:57:14 -05:00
Arceliar
8a85149817 remove src/.DS_Store 2019-08-05 18:50:08 -05:00
Arceliar
84a4f54217 temporary fix to nil pointer, better to make sure it's never nil 2019-08-05 18:49:15 -05:00
Neil Alexander
bd3b42022b Merge pull request #480 from Arceliar/speedup
Speedup
2019-08-05 10:24:54 +01:00
Neil Alexander
f046249ac6 Merge pull request #484 from neilalexander/config
API changes
2019-08-05 10:24:43 +01:00
Neil Alexander
2ee00fcc09 Return box_pub_key as hex string in JSON (replaces #481) 2019-08-05 10:21:40 +01:00
Neil Alexander
3a2ae9d902 Update API to represent coords as []uint64 2019-08-05 10:17:19 +01:00
Neil Alexander
37533f157d Make some API changes (currently broken) 2019-08-05 00:30:12 +01:00
Arceliar
979c3d4c07 move some potentially blocking operations out of session pool workers, minor cleanup 2019-08-04 16:29:58 -05:00
Arceliar
c55d7b4705 have the switch queue drop packts to ourself when the total size of all packets is at least queueTotalMaxSize, instead of an arbitrary unconfigurable packet count 2019-08-04 16:16:49 -05:00
Arceliar
6803f209b0 have tuntap code use Conn.ReadNoCopy and Conn.WriteNoCopy to avoid copying between slices 2019-08-04 15:59:51 -05:00
Arceliar
5d5486049b add Conn.ReadNoCopy and Conn.WriteNoCopy that transfer ownership of a slice instead of copying, have Read and Write use the NoCopy versions under the hood and just manage copying as needed 2019-08-04 15:53:34 -05:00
Arceliar
07f14f92ed disable crypto and switch buffer changes from testing 2019-08-04 15:25:14 -05:00
Arceliar
0ba8c6a34f have the stream code use bufio instead of copying manually to an input buffer, slightly reduces total uses of memmove 2019-08-04 15:21:04 -05:00
Arceliar
75b931f37e eliminate some more copying between slices 2019-08-04 14:50:19 -05:00
Arceliar
f52955ee0f WARNING: CRYPTO DISABLED while speeding up stream writeMsg 2019-08-04 14:18:59 -05:00
Arceliar
1e6a6d2160 use session.cancel in the router to make blocking safe, reduce size of fromRouter buffer so the drops in the switch are closer to the intended front-drop behavior 2019-08-04 02:21:41 -05:00
Arceliar
7bf5884ac1 remove some lossy channel sends that should be safe to allow to block 2019-08-04 02:14:45 -05:00
Arceliar
6da5802ae5 don't block forever in Write if the session is cancelled, cleanup Conn.Read slightly 2019-08-04 02:08:47 -05:00
Arceliar
144c823bee just use a sync.Pool as the bytestore to not overcomplicate things, the allocations from interface{} casting don't seem to actually hurt in practice right now 2019-08-04 00:28:13 -05:00
Arceliar
cbbb61b019 fix another drain on the bytestore 2019-08-04 00:00:41 -05:00
Arceliar
00e9c3dbd9 do session crypto work using the worker pool 2019-08-03 23:27:52 -05:00
Arceliar
befd1b43a0 refactor session worker code slightly 2019-08-03 23:14:51 -05:00
Arceliar
7a9ad0c8cc add workerpool to util 2019-08-03 23:10:37 -05:00
Arceliar
b9987b4fdc reduce time spent with a mutex held in sessionInfo.recvWorker 2019-08-03 22:47:10 -05:00
Arceliar
099bd3ae1e reduce part of sendWorker that needs to keep a mutex 2019-08-03 22:35:10 -05:00
Arceliar
72ed541bf3 a little cleanup to Conn functions 2019-08-03 22:07:38 -05:00
Arceliar
5dfc71e1ee put bytes back when done 2019-08-03 22:00:47 -05:00
Arceliar
df0090e32a Add per-session read/write workers, work in progress, they still unfortunately need to take a mutex for safety 2019-08-03 21:46:18 -05:00
30 changed files with 1133 additions and 839 deletions

View File

@@ -25,6 +25,44 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- in case of vulnerabilities.
-->
## [0.3.8] - 2019-08-21
### Changed
- Yggdrasil can now send multiple packets from the switch at once, which results in improved throughput with smaller packets or lower MTUs
- Performance has been slightly improved by not allocating cancellations where not necessary
- Crypto-key routing options have been renamed for clarity
- `IPv4Sources` is now named `IPv4LocalSubnets`
- `IPv6Sources` is now named `IPv6LocalSubnets`
- `IPv4Destinations` is now named `IPv4RemoteSubnets`
- `IPv6Destinations` is now named `IPv6RemoteSubnets`
- The old option names will continue to be accepted by the configuration parser for now but may not be indefinitely
- When presented with multiple paths between two nodes, the switch now prefers the most recently used port when possible instead of the least recently used, helping to reduce packet reordering
- New nonce tracking should help to reduce the number of packets dropped as a result of multiple/aggregate paths or congestion control in the switch
### Fixed
- **Security vulnerability**: Address verification was not strict enough, which could result in a malicious session sending traffic with unexpected or spoofed source or destination addresses which Yggdrasil could fail to reject
- Versions `0.3.6` and `0.3.7` are vulnerable - users of these versions should upgrade as soon as possible
- Versions `0.3.5` and earlier are not affected
- A deadlock was fixed in the session code which could result in Yggdrasil failing to pass traffic after some time
## [0.3.7] - 2019-08-14
### Changed
- The switch should now forward packets along a single path more consistently in cases where congestion is low and multiple equal-length paths exist, which should improve stability and result in fewer out-of-order packets
- Sessions should now be more tolerant of out-of-order packets, by replacing a bitmask with a variable sized heap+map structure to track recently received nonces, which should reduce the number of packets dropped due to reordering when multiple paths are used or multiple independent flows are transmitted through the same session
- The admin socket can no longer return a dotfile representation of the known parts of the network, this could be rebuilt by clients using information from `getSwitchPeers`,`getDHT` and `getSessions`
### Fixed
- A number of significant performance regressions introduced in version 0.3.6 have been fixed, resulting in better performance
- Flow labels are now used to prioritise traffic flows again correctly
- In low-traffic scenarios where there are multiple peerings between a pair of nodes, Yggdrasil now prefers the most active peering instead of the least active, helping to reduce packet reordering
- The `Listen` statement, when configured as a string rather than an array, will now be parsed correctly
- The admin socket now returns `coords` as a correct array of unsigned 64-bit integers, rather than the internal representation
- The admin socket now returns `box_pub_key` in string format again
- Sessions no longer leak/block when no listener (e.g. TUN/TAP) is configured
- Incoming session connections no longer block when a session already exists, which results in less leaked goroutines
- Flooded sessions will no longer block other sessions
- Searches are now cleaned up properly and a couple of edge-cases with duplicate searches have been fixed
- A number of minor allocation and pointer fixes
## [0.3.6] - 2019-08-03
### Added
- Yggdrasil now has a public API with interfaces such as `yggdrasil.ConnDialer`, `yggdrasil.ConnListener` and `yggdrasil.Conn` for using Yggdrasil as a transport directly within applications
@@ -53,7 +91,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Session MTUs are now always calculated correctly, in some cases they were incorrectly defaulting to 1280 before
- Multiple searches now don't take place for a single connection
- Concurrency bugs fixed
- Fixed a number of bugs in the ICMPv6 neighbor solicitation in the TUN/TAP code
- Fixed a number of bugs in the ICMPv6 neighbor solicitation in the TUN/TAP code
- A case where peers weren't always added correctly if one or more peers were unreachable has been fixed
- Searches which include the local node are now handled correctly
- Lots of small bug tweaks and clean-ups throughout the codebase

View File

@@ -49,7 +49,7 @@ You may also find other platform-specific wrappers, scripts or tools in the
If you want to build from source, as opposed to installing one of the pre-built
packages:
1. Install [Go](https://golang.org) (requires Go 1.11 or later)
1. Install [Go](https://golang.org) (requires Go 1.12 or later)
2. Clone this repository
2. Run `./build`

2
build
View File

@@ -2,7 +2,7 @@
set -ef
PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil}
PKGSRC=${PKGSRC:-github.com/yggdrasil-network/yggdrasil-go/src/version}
PKGNAME=${PKGNAME:-$(sh contrib/semver/name.sh)}
PKGVER=${PKGVER:-$(sh contrib/semver/version.sh --bare)}

View File

@@ -25,6 +25,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/tuntap"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
)
@@ -74,6 +75,30 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *config
if err := hjson.Unmarshal(conf, &dat); err != nil {
panic(err)
}
// Check for fields that have changed type recently, e.g. the Listen config
// option is now a []string rather than a string
if listen, ok := dat["Listen"].(string); ok {
dat["Listen"] = []string{listen}
}
if tunnelrouting, ok := dat["TunnelRouting"].(map[string]interface{}); ok {
if c, ok := tunnelrouting["IPv4Sources"]; ok {
delete(tunnelrouting, "IPv4Sources")
tunnelrouting["IPv4LocalSubnets"] = c
}
if c, ok := tunnelrouting["IPv6Sources"]; ok {
delete(tunnelrouting, "IPv6Sources")
tunnelrouting["IPv6LocalSubnets"] = c
}
if c, ok := tunnelrouting["IPv4Destinations"]; ok {
delete(tunnelrouting, "IPv4Destinations")
tunnelrouting["IPv4RemoteSubnets"] = c
}
if c, ok := tunnelrouting["IPv6Destinations"]; ok {
delete(tunnelrouting, "IPv6Destinations")
tunnelrouting["IPv6RemoteSubnets"] = c
}
}
// Sanitise the config
confJson, err := json.Marshal(dat)
if err != nil {
panic(err)
@@ -113,7 +138,7 @@ func main() {
normaliseconf := flag.Bool("normaliseconf", false, "use in combination with either -useconf or -useconffile, outputs your configuration normalised")
confjson := flag.Bool("json", false, "print configuration from -genconf or -normaliseconf as JSON instead of HJSON")
autoconf := flag.Bool("autoconf", false, "automatic mode (dynamic IP, peer with IPv6 neighbors)")
version := flag.Bool("version", false, "prints the version of this build")
ver := flag.Bool("version", false, "prints the version of this build")
logging := flag.String("logging", "info,warn,error", "comma-separated list of logging levels to enable")
logto := flag.String("logto", "stdout", "file path to log to, \"syslog\" or \"stdout\"")
flag.Parse()
@@ -121,10 +146,10 @@ func main() {
var cfg *config.NodeConfig
var err error
switch {
case *version:
fmt.Println("Build name:", yggdrasil.BuildName())
fmt.Println("Build version:", yggdrasil.BuildVersion())
os.Exit(0)
case *ver:
fmt.Println("Build name:", version.BuildName())
fmt.Println("Build version:", version.BuildVersion())
return
case *autoconf:
// Use an autoconf-generated config, this will give us random keys and
// port numbers, and will use an automatically selected TUN/TAP interface.
@@ -168,7 +193,7 @@ func main() {
case "stdout":
logger = log.New(os.Stdout, "", log.Flags())
case "syslog":
if syslogger, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "DAEMON", yggdrasil.BuildName()); err == nil {
if syslogger, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "DAEMON", version.BuildName()); err == nil {
logger = log.New(syslogger, "", log.Flags())
}
default:

View File

@@ -19,6 +19,7 @@ import (
"github.com/hjson/hjson-go"
"github.com/yggdrasil-network/yggdrasil-go/src/defaults"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
)
type admin_info map[string]interface{}
@@ -53,9 +54,17 @@ func main() {
server := flag.String("endpoint", endpoint, "Admin socket endpoint")
injson := flag.Bool("json", false, "Output in JSON format (as opposed to pretty-print)")
verbose := flag.Bool("v", false, "Verbose output (includes public keys)")
ver := flag.Bool("version", false, "Prints the version of this build")
flag.Parse()
args := flag.Args()
if *ver {
fmt.Println("Build name:", version.BuildName())
fmt.Println("Build version:", version.BuildVersion())
fmt.Println("To get the version number of the running Yggdrasil node, run", os.Args[0], "getSelf")
return
}
if len(args) == 0 {
flag.Usage()
return

13
go.mod
View File

@@ -1,17 +1,18 @@
module github.com/yggdrasil-network/yggdrasil-go
require (
github.com/docker/libcontainer v2.2.1+incompatible
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8
github.com/hashicorp/go-syslog v1.0.0
github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222
github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0
github.com/mitchellh/mapstructure v1.1.2
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b // indirect
github.com/vishvananda/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f // indirect
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060 // indirect
)

49
go.sum
View File

@@ -1,5 +1,3 @@
github.com/docker/libcontainer v2.2.1+incompatible h1:++SbbkCw+X8vAd4j2gOCzZ2Nn7s2xFALTf7LZKmM1/0=
github.com/docker/libcontainer v2.2.1+incompatible/go.mod h1:osvj61pYsqhNCMLGX31xr7klUBhHb/ZBuXS0o1Fvwbw=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY=
github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U=
github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE=
@@ -12,44 +10,23 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 h1:1zN6ImoqhSJhN8hGXFaJlSC8msLmIbX8bFqOfWLKw0w=
github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091/go.mod h1:N20Z5Y8oye9a7HmytmZ+tr8Q2vlP0tAHP13kTHzwvQY=
github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae h1:MYCANF1kehCG6x6G+/9txLfq6n3lS5Vp0Mxn1hdiBAc=
github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190719211521-a76871ea954b/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190719213007-b160316e362e/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190720101301-5db94379a5eb/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190720145626-28ccb9101d55/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a h1:mQ0mPD+dyB/vaDPyVkCBiXUQu9Or7/cRSTjPlV8tXvw=
github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34 h1:Qh5FE+Q5iGqpmR/FPMYHuoZLN921au/nxAlmKe+Hdbo=
github.com/yggdrasil-network/water v0.0.0-20190725123504-a16161896c34/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b h1:+y4hCMc/WKsDbAPsOQZgBSaSZ26uh2afyaWeVg/3s/c=
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E=
github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM=
github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f h1:nBX3nTcmxEtHSERBJaIo1Qa26VwRaopnZmfDQUXsF4I=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8 h1:YY9Pg2BEp0jeUVU60svTOaDr+fs1ySC9RbdC1Qc6wOw=
github.com/yggdrasil-network/water v0.0.0-20190812103929-c83fe40250f8/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20181207154023-610586996380 h1:zPQexyRtNYBc7bcHmehl1dH6TB3qn8zytv8cBGLDNY0=
golang.org/x/net v0.0.0-20181207154023-610586996380/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190724185037-8aa4eac1a7c1/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190729092621-ff9f1409240a/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=

View File

@@ -17,6 +17,8 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
)
@@ -78,11 +80,6 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
}
return Info{"list": handlers}, nil
})
/*
a.AddHandler("dot", []string{}, func(in Info) (Info, error) {
return Info{"dot": string(a.getResponse_dot())}, nil
})
*/
a.AddHandler("getSelf", []string{}, func(in Info) (Info, error) {
ip := c.Address().String()
subnet := c.Subnet()
@@ -90,8 +87,8 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"self": Info{
ip: Info{
"box_pub_key": c.EncryptionPublicKey(),
"build_name": yggdrasil.BuildName(),
"build_version": yggdrasil.BuildVersion(),
"build_name": version.BuildName(),
"build_version": version.BuildVersion(),
"coords": fmt.Sprintf("%v", c.Coords()),
"subnet": subnet.String(),
},
@@ -110,7 +107,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"bytes_recvd": p.BytesRecvd,
"proto": p.Protocol,
"endpoint": p.Endpoint,
"box_pub_key": p.PublicKey,
"box_pub_key": hex.EncodeToString(p.PublicKey[:]),
}
}
return Info{"peers": peers}, nil
@@ -128,7 +125,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"bytes_recvd": s.BytesRecvd,
"proto": s.Protocol,
"endpoint": s.Endpoint,
"box_pub_key": s.PublicKey,
"box_pub_key": hex.EncodeToString(s.PublicKey[:]),
}
}
return Info{"switchpeers": switchpeers}, nil
@@ -147,7 +144,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
dht[so] = Info{
"coords": fmt.Sprintf("%v", d.Coords),
"last_seen": d.LastSeen.Seconds(),
"box_pub_key": d.PublicKey,
"box_pub_key": hex.EncodeToString(d.PublicKey[:]),
}
}
return Info{"dht": dht}, nil
@@ -164,7 +161,7 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
"mtu": s.MTU,
"uptime": s.Uptime.Seconds(),
"was_mtu_fixed": s.WasMTUFixed,
"box_pub_key": s.PublicKey,
"box_pub_key": hex.EncodeToString(s.PublicKey[:]),
}
}
return Info{"sessions": sessions}, nil
@@ -243,31 +240,46 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
}
})
a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) {
var reserr error
var result yggdrasil.DHTRes
if in["target"] == nil {
in["target"] = "none"
}
result, err := a.core.DHTPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string))
if err == nil {
infos := make(map[string]map[string]string, len(result.Infos))
for _, dinfo := range result.Infos {
info := map[string]string{
"box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]),
"coords": fmt.Sprintf("%v", dinfo.Coords),
}
addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String()
infos[addr] = info
coords := util.DecodeCoordString(in["coords"].(string))
var boxPubKey crypto.BoxPubKey
if b, err := hex.DecodeString(in["box_pub_key"].(string)); err == nil {
copy(boxPubKey[:], b[:])
if n, err := hex.DecodeString(in["target"].(string)); err == nil {
var targetNodeID crypto.NodeID
copy(targetNodeID[:], n[:])
result, reserr = a.core.DHTPing(boxPubKey, coords, &targetNodeID)
} else {
result, reserr = a.core.DHTPing(boxPubKey, coords, nil)
}
return Info{"nodes": infos}, nil
} else {
return Info{}, err
}
if reserr != nil {
return Info{}, reserr
}
infos := make(map[string]map[string]string, len(result.Infos))
for _, dinfo := range result.Infos {
info := map[string]string{
"box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]),
"coords": fmt.Sprintf("%v", dinfo.Coords),
}
addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String()
infos[addr] = info
}
return Info{"nodes": infos}, nil
})
a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) {
var nocache bool
if in["nocache"] != nil {
nocache = in["nocache"].(string) == "true"
}
var box_pub_key, coords string
var boxPubKey crypto.BoxPubKey
var coords []uint64
if in["box_pub_key"] == nil && in["coords"] == nil {
nodeinfo := a.core.MyNodeInfo()
var jsoninfo interface{}
@@ -279,10 +291,14 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
} else if in["box_pub_key"] == nil || in["coords"] == nil {
return Info{}, errors.New("Expecting both box_pub_key and coords")
} else {
box_pub_key = in["box_pub_key"].(string)
coords = in["coords"].(string)
if b, err := hex.DecodeString(in["box_pub_key"].(string)); err == nil {
copy(boxPubKey[:], b[:])
} else {
return Info{}, err
}
coords = util.DecodeCoordString(in["coords"].(string))
}
result, err := a.core.GetNodeInfo(box_pub_key, coords, nocache)
result, err := a.core.GetNodeInfo(boxPubKey, coords, nocache)
if err == nil {
var m map[string]interface{}
if err = json.Unmarshal(result, &m); err == nil {
@@ -472,133 +488,3 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
}
}
}
// getResponse_dot returns a response for a graphviz dot formatted
// representation of the known parts of the network. This is color-coded and
// labeled, and includes the self node, switch peers, nodes known to the DHT,
// and nodes with open sessions. The graph is structured as a tree with directed
// links leading away from the root.
/*
func (a *AdminSocket) getResponse_dot() []byte {
//self := a.getData_getSelf()
peers := a.core.GetSwitchPeers()
dht := a.core.GetDHT()
sessions := a.core.GetSessions()
// Start building a tree from all known nodes
type nodeInfo struct {
name string
key string
parent string
port uint64
options string
}
infos := make(map[string]nodeInfo)
// Get coords as a slice of strings, FIXME? this looks very fragile
coordSlice := func(coords string) []string {
tmp := strings.Replace(coords, "[", "", -1)
tmp = strings.Replace(tmp, "]", "", -1)
return strings.Split(tmp, " ")
}
// First fill the tree with all known nodes, no parents
addInfo := func(nodes []admin_nodeInfo, options string, tag string) {
for _, node := range nodes {
n := node.asMap()
info := nodeInfo{
key: n["coords"].(string),
options: options,
}
if len(tag) > 0 {
info.name = fmt.Sprintf("%s\n%s", n["ip"].(string), tag)
} else {
info.name = n["ip"].(string)
}
coordsSplit := coordSlice(info.key)
if len(coordsSplit) != 0 {
portStr := coordsSplit[len(coordsSplit)-1]
portUint, err := strconv.ParseUint(portStr, 10, 64)
if err == nil {
info.port = portUint
}
}
infos[info.key] = info
}
}
addInfo(dht, "fillcolor=\"#ffffff\" style=filled fontname=\"sans serif\"", "Known in DHT") // white
addInfo(sessions, "fillcolor=\"#acf3fd\" style=filled fontname=\"sans serif\"", "Open session") // blue
addInfo(peers, "fillcolor=\"#ffffb5\" style=filled fontname=\"sans serif\"", "Connected peer") // yellow
addInfo(append([]admin_nodeInfo(nil), *self), "fillcolor=\"#a5ff8a\" style=filled fontname=\"sans serif\"", "This node") // green
// Now go through and create placeholders for any missing nodes
for _, info := range infos {
// This is ugly string manipulation
coordsSplit := coordSlice(info.key)
for idx := range coordsSplit {
key := fmt.Sprintf("[%v]", strings.Join(coordsSplit[:idx], " "))
newInfo, isIn := infos[key]
if isIn {
continue
}
newInfo.name = "?"
newInfo.key = key
newInfo.options = "fontname=\"sans serif\" style=dashed color=\"#999999\" fontcolor=\"#999999\""
coordsSplit := coordSlice(newInfo.key)
if len(coordsSplit) != 0 {
portStr := coordsSplit[len(coordsSplit)-1]
portUint, err := strconv.ParseUint(portStr, 10, 64)
if err == nil {
newInfo.port = portUint
}
}
infos[key] = newInfo
}
}
// Now go through and attach parents
for _, info := range infos {
pSplit := coordSlice(info.key)
if len(pSplit) > 0 {
pSplit = pSplit[:len(pSplit)-1]
}
info.parent = fmt.Sprintf("[%v]", strings.Join(pSplit, " "))
infos[info.key] = info
}
// Finally, get a sorted list of keys, which we use to organize the output
var keys []string
for _, info := range infos {
keys = append(keys, info.key)
}
// sort
sort.SliceStable(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
sort.SliceStable(keys, func(i, j int) bool {
return infos[keys[i]].port < infos[keys[j]].port
})
// Now print it all out
var out []byte
put := func(s string) {
out = append(out, []byte(s)...)
}
put("digraph {\n")
// First set the labels
for _, key := range keys {
info := infos[key]
put(fmt.Sprintf("\"%v\" [ label = \"%v\" %v ];\n", info.key, info.name, info.options))
}
// Then print the tree structure
for _, key := range keys {
info := infos[key]
if info.key == info.parent {
continue
} // happens for the root, skip it
port := fmt.Sprint(info.port)
style := "fontname=\"sans serif\""
if infos[info.parent].name == "?" || infos[info.key].name == "?" {
style = "fontname=\"sans serif\" style=dashed color=\"#999999\" fontcolor=\"#999999\""
}
put(fmt.Sprintf(" \"%+v\" -> \"%+v\" [ label = \"%v\" %s ];\n", info.parent, info.key, port, style))
}
put("}\n")
return out
}
*/

View File

@@ -74,11 +74,11 @@ type SessionFirewall struct {
// TunnelRouting contains the crypto-key routing tables for tunneling
type TunnelRouting struct {
Enable bool `comment:"Enable or disable tunnel routing."`
IPv6Destinations map[string]string `comment:"IPv6 CIDR subnets, mapped to the EncryptionPublicKey to which they\nshould be routed, e.g. { \"aaaa:bbbb:cccc::/e\": \"boxpubkey\", ... }"`
IPv6Sources []string `comment:"Optional IPv6 source subnets which are allowed to be tunnelled in\naddition to this node's Yggdrasil address/subnet. If not\nspecified, only traffic originating from this node's Yggdrasil\naddress or subnet will be tunnelled."`
IPv4Destinations map[string]string `comment:"IPv4 CIDR subnets, mapped to the EncryptionPublicKey to which they\nshould be routed, e.g. { \"a.b.c.d/e\": \"boxpubkey\", ... }"`
IPv4Sources []string `comment:"IPv4 source subnets which are allowed to be tunnelled. Unlike for\nIPv6, this option is required for bridging IPv4 traffic. Only\ntraffic with a source matching these subnets will be tunnelled."`
Enable bool `comment:"Enable or disable tunnel routing."`
IPv6RemoteSubnets map[string]string `comment:"IPv6 subnets belonging to remote nodes, mapped to the node's public\nkey, e.g. { \"aaaa:bbbb:cccc::/e\": \"boxpubkey\", ... }"`
IPv6LocalSubnets []string `comment:"IPv6 subnets belonging to this node's end of the tunnels. Only traffic\nfrom these ranges (or the Yggdrasil node's IPv6 address/subnet)\nwill be tunnelled."`
IPv4RemoteSubnets map[string]string `comment:"IPv4 subnets belonging to remote nodes, mapped to the node's public\nkey, e.g. { \"a.b.c.d/e\": \"boxpubkey\", ... }"`
IPv4LocalSubnets []string `comment:"IPv4 subnets belonging to this node's end of the tunnels. Only traffic\nfrom these ranges will be tunnelled."`
}
// SwitchOptions contains tuning options for the switch

View File

@@ -66,15 +66,15 @@ func (t *TunAdapter) SetupAdminHandlers(a *admin.AdminSocket) {
t.ckr.setEnabled(enabled)
return admin.Info{"enabled": enabled}, nil
})
a.AddHandler("addSourceSubnet", []string{"subnet"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.addSourceSubnet(in["subnet"].(string)); err == nil {
a.AddHandler("addLocalSubnet", []string{"subnet"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.addLocalSubnet(in["subnet"].(string)); err == nil {
return admin.Info{"added": []string{in["subnet"].(string)}}, nil
} else {
return admin.Info{"not_added": []string{in["subnet"].(string)}}, errors.New("Failed to add source subnet")
}
})
a.AddHandler("addRoute", []string{"subnet", "box_pub_key"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.addRoute(in["subnet"].(string), in["box_pub_key"].(string)); err == nil {
a.AddHandler("addRemoteSubnet", []string{"subnet", "box_pub_key"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.addRemoteSubnet(in["subnet"].(string), in["box_pub_key"].(string)); err == nil {
return admin.Info{"added": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, nil
} else {
return admin.Info{"not_added": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, errors.New("Failed to add route")
@@ -87,8 +87,8 @@ func (t *TunAdapter) SetupAdminHandlers(a *admin.AdminSocket) {
subnets = append(subnets, subnet.String())
}
}
getSourceSubnets(t.ckr.ipv4sources)
getSourceSubnets(t.ckr.ipv6sources)
getSourceSubnets(t.ckr.ipv4locals)
getSourceSubnets(t.ckr.ipv6locals)
return admin.Info{"source_subnets": subnets}, nil
})
a.AddHandler("getRoutes", []string{}, func(in admin.Info) (admin.Info, error) {
@@ -98,19 +98,19 @@ func (t *TunAdapter) SetupAdminHandlers(a *admin.AdminSocket) {
routes[ckr.subnet.String()] = hex.EncodeToString(ckr.destination[:])
}
}
getRoutes(t.ckr.ipv4routes)
getRoutes(t.ckr.ipv6routes)
getRoutes(t.ckr.ipv4remotes)
getRoutes(t.ckr.ipv6remotes)
return admin.Info{"routes": routes}, nil
})
a.AddHandler("removeSourceSubnet", []string{"subnet"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.removeSourceSubnet(in["subnet"].(string)); err == nil {
a.AddHandler("removeLocalSubnet", []string{"subnet"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.removeLocalSubnet(in["subnet"].(string)); err == nil {
return admin.Info{"removed": []string{in["subnet"].(string)}}, nil
} else {
return admin.Info{"not_removed": []string{in["subnet"].(string)}}, errors.New("Failed to remove source subnet")
}
})
a.AddHandler("removeRoute", []string{"subnet", "box_pub_key"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.removeRoute(in["subnet"].(string), in["box_pub_key"].(string)); err == nil {
a.AddHandler("removeRemoteSubnet", []string{"subnet", "box_pub_key"}, func(in admin.Info) (admin.Info, error) {
if err := t.ckr.removeRemoteSubnet(in["subnet"].(string), in["box_pub_key"].(string)); err == nil {
return admin.Info{"removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, nil
} else {
return admin.Info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["box_pub_key"].(string))}}, errors.New("Failed to remove route")

View File

@@ -21,15 +21,15 @@ type cryptokey struct {
tun *TunAdapter
enabled atomic.Value // bool
reconfigure chan chan error
ipv4routes []cryptokey_route
ipv6routes []cryptokey_route
ipv4remotes []cryptokey_route
ipv6remotes []cryptokey_route
ipv4cache map[address.Address]cryptokey_route
ipv6cache map[address.Address]cryptokey_route
ipv4sources []net.IPNet
ipv6sources []net.IPNet
mutexroutes sync.RWMutex
ipv4locals []net.IPNet
ipv6locals []net.IPNet
mutexremotes sync.RWMutex
mutexcaches sync.RWMutex
mutexsources sync.RWMutex
mutexlocals sync.RWMutex
}
type cryptokey_route struct {
@@ -65,43 +65,43 @@ func (c *cryptokey) configure() error {
c.setEnabled(current.TunnelRouting.Enable)
// Clear out existing routes
c.mutexroutes.Lock()
c.ipv6routes = make([]cryptokey_route, 0)
c.ipv4routes = make([]cryptokey_route, 0)
c.mutexroutes.Unlock()
c.mutexremotes.Lock()
c.ipv6remotes = make([]cryptokey_route, 0)
c.ipv4remotes = make([]cryptokey_route, 0)
c.mutexremotes.Unlock()
// Add IPv6 routes
for ipv6, pubkey := range current.TunnelRouting.IPv6Destinations {
if err := c.addRoute(ipv6, pubkey); err != nil {
for ipv6, pubkey := range current.TunnelRouting.IPv6RemoteSubnets {
if err := c.addRemoteSubnet(ipv6, pubkey); err != nil {
return err
}
}
// Add IPv4 routes
for ipv4, pubkey := range current.TunnelRouting.IPv4Destinations {
if err := c.addRoute(ipv4, pubkey); err != nil {
for ipv4, pubkey := range current.TunnelRouting.IPv4RemoteSubnets {
if err := c.addRemoteSubnet(ipv4, pubkey); err != nil {
return err
}
}
// Clear out existing sources
c.mutexsources.Lock()
c.ipv6sources = make([]net.IPNet, 0)
c.ipv4sources = make([]net.IPNet, 0)
c.mutexsources.Unlock()
c.mutexlocals.Lock()
c.ipv6locals = make([]net.IPNet, 0)
c.ipv4locals = make([]net.IPNet, 0)
c.mutexlocals.Unlock()
// Add IPv6 sources
c.ipv6sources = make([]net.IPNet, 0)
for _, source := range current.TunnelRouting.IPv6Sources {
if err := c.addSourceSubnet(source); err != nil {
c.ipv6locals = make([]net.IPNet, 0)
for _, source := range current.TunnelRouting.IPv6LocalSubnets {
if err := c.addLocalSubnet(source); err != nil {
return err
}
}
// Add IPv4 sources
c.ipv4sources = make([]net.IPNet, 0)
for _, source := range current.TunnelRouting.IPv4Sources {
if err := c.addSourceSubnet(source); err != nil {
c.ipv4locals = make([]net.IPNet, 0)
for _, source := range current.TunnelRouting.IPv4LocalSubnets {
if err := c.addLocalSubnet(source); err != nil {
return err
}
}
@@ -128,35 +128,21 @@ func (c *cryptokey) isEnabled() bool {
// Check whether the given address (with the address length specified in bytes)
// matches either the current node's address, the node's routed subnet or the
// list of subnets specified in IPv4Sources/IPv6Sources.
func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool {
c.mutexsources.RLock()
defer c.mutexsources.RUnlock()
ip := net.IP(addr[:addrlen])
if addrlen == net.IPv6len {
// Does this match our node's address?
if bytes.Equal(addr[:16], c.tun.addr[:16]) {
return true
}
// Does this match our node's subnet?
if bytes.Equal(addr[:8], c.tun.subnet[:8]) {
return true
}
}
// list of subnets specified in ipv4locals/ipv6locals.
func (c *cryptokey) isValidLocalAddress(addr address.Address, addrlen int) bool {
c.mutexlocals.RLock()
defer c.mutexlocals.RUnlock()
// Does it match a configured CKR source?
if c.isEnabled() {
ip := net.IP(addr[:addrlen])
// Build our references to the routing sources
var routingsources *[]net.IPNet
// Check if the prefix is IPv4 or IPv6
if addrlen == net.IPv6len {
routingsources = &c.ipv6sources
routingsources = &c.ipv6locals
} else if addrlen == net.IPv4len {
routingsources = &c.ipv4sources
routingsources = &c.ipv4locals
} else {
return false
}
@@ -174,9 +160,9 @@ func (c *cryptokey) isValidSource(addr address.Address, addrlen int) bool {
// Adds a source subnet, which allows traffic with these source addresses to
// be tunnelled using crypto-key routing.
func (c *cryptokey) addSourceSubnet(cidr string) error {
c.mutexsources.Lock()
defer c.mutexsources.Unlock()
func (c *cryptokey) addLocalSubnet(cidr string) error {
c.mutexlocals.Lock()
defer c.mutexlocals.Unlock()
// Is the CIDR we've been given valid?
_, ipnet, err := net.ParseCIDR(cidr)
@@ -192,9 +178,9 @@ func (c *cryptokey) addSourceSubnet(cidr string) error {
// Check if the prefix is IPv4 or IPv6
if prefixsize == net.IPv6len*8 {
routingsources = &c.ipv6sources
routingsources = &c.ipv6locals
} else if prefixsize == net.IPv4len*8 {
routingsources = &c.ipv4sources
routingsources = &c.ipv4locals
} else {
return errors.New("Unexpected prefix size")
}
@@ -214,10 +200,10 @@ func (c *cryptokey) addSourceSubnet(cidr string) error {
// Adds a destination route for the given CIDR to be tunnelled to the node
// with the given BoxPubKey.
func (c *cryptokey) addRoute(cidr string, dest string) error {
c.mutexroutes.Lock()
func (c *cryptokey) addRemoteSubnet(cidr string, dest string) error {
c.mutexremotes.Lock()
c.mutexcaches.Lock()
defer c.mutexroutes.Unlock()
defer c.mutexremotes.Unlock()
defer c.mutexcaches.Unlock()
// Is the CIDR we've been given valid?
@@ -235,10 +221,10 @@ func (c *cryptokey) addRoute(cidr string, dest string) error {
// Check if the prefix is IPv4 or IPv6
if prefixsize == net.IPv6len*8 {
routingtable = &c.ipv6routes
routingtable = &c.ipv6remotes
routingcache = &c.ipv6cache
} else if prefixsize == net.IPv4len*8 {
routingtable = &c.ipv4routes
routingtable = &c.ipv4remotes
routingcache = &c.ipv4cache
} else {
return errors.New("Unexpected prefix size")
@@ -323,14 +309,14 @@ func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (c
c.mutexcaches.RUnlock()
c.mutexroutes.RLock()
defer c.mutexroutes.RUnlock()
c.mutexremotes.RLock()
defer c.mutexremotes.RUnlock()
// Check if the prefix is IPv4 or IPv6
if addrlen == net.IPv6len {
routingtable = &c.ipv6routes
routingtable = &c.ipv6remotes
} else if addrlen == net.IPv4len {
routingtable = &c.ipv4routes
routingtable = &c.ipv4remotes
} else {
return crypto.BoxPubKey{}, errors.New("Unexpected prefix size")
}
@@ -339,7 +325,7 @@ func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (c
ip := make(net.IP, addrlen)
copy(ip[:addrlen], addr[:])
// Check if we have a route. At this point c.ipv6routes should be
// Check if we have a route. At this point c.ipv6remotes should be
// pre-sorted so that the most specific routes are first
for _, route := range *routingtable {
// Does this subnet match the given IP?
@@ -366,14 +352,14 @@ func (c *cryptokey) getPublicKeyForAddress(addr address.Address, addrlen int) (c
}
// No route was found if we got to this point
return crypto.BoxPubKey{}, errors.New(fmt.Sprintf("No route to %s", ip.String()))
return crypto.BoxPubKey{}, fmt.Errorf("no route to %s", ip.String())
}
// Removes a source subnet, which allows traffic with these source addresses to
// be tunnelled using crypto-key routing.
func (c *cryptokey) removeSourceSubnet(cidr string) error {
c.mutexsources.Lock()
defer c.mutexsources.Unlock()
func (c *cryptokey) removeLocalSubnet(cidr string) error {
c.mutexlocals.Lock()
defer c.mutexlocals.Unlock()
// Is the CIDR we've been given valid?
_, ipnet, err := net.ParseCIDR(cidr)
@@ -389,9 +375,9 @@ func (c *cryptokey) removeSourceSubnet(cidr string) error {
// Check if the prefix is IPv4 or IPv6
if prefixsize == net.IPv6len*8 {
routingsources = &c.ipv6sources
routingsources = &c.ipv6locals
} else if prefixsize == net.IPv4len*8 {
routingsources = &c.ipv4sources
routingsources = &c.ipv4locals
} else {
return errors.New("Unexpected prefix size")
}
@@ -409,10 +395,10 @@ func (c *cryptokey) removeSourceSubnet(cidr string) error {
// Removes a destination route for the given CIDR to be tunnelled to the node
// with the given BoxPubKey.
func (c *cryptokey) removeRoute(cidr string, dest string) error {
c.mutexroutes.Lock()
func (c *cryptokey) removeRemoteSubnet(cidr string, dest string) error {
c.mutexremotes.Lock()
c.mutexcaches.Lock()
defer c.mutexroutes.Unlock()
defer c.mutexremotes.Unlock()
defer c.mutexcaches.Unlock()
// Is the CIDR we've been given valid?
@@ -430,10 +416,10 @@ func (c *cryptokey) removeRoute(cidr string, dest string) error {
// Check if the prefix is IPv4 or IPv6
if prefixsize == net.IPv6len*8 {
routingtable = &c.ipv6routes
routingtable = &c.ipv6remotes
routingcache = &c.ipv6cache
} else if prefixsize == net.IPv4len*8 {
routingtable = &c.ipv4routes
routingtable = &c.ipv4remotes
routingcache = &c.ipv4cache
} else {
return errors.New("Unexpected prefix size")

View File

@@ -1,10 +1,12 @@
package tuntap
import (
"bytes"
"errors"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
"golang.org/x/net/icmp"
@@ -53,15 +55,14 @@ func (s *tunConn) reader() (err error) {
}
s.tun.log.Debugln("Starting conn reader for", s.conn.String())
defer s.tun.log.Debugln("Stopping conn reader for", s.conn.String())
var n int
b := make([]byte, 65535)
for {
select {
case <-s.stop:
return nil
default:
}
if n, err = s.conn.Read(b); err != nil {
var bs []byte
if bs, err = s.conn.ReadNoCopy(); err != nil {
if e, eok := err.(yggdrasil.ConnError); eok && !e.Temporary() {
if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP conn read debug:", err)
@@ -70,14 +71,68 @@ func (s *tunConn) reader() (err error) {
}
return e
}
} else if n > 0 {
bs := append(util.GetBytes(), b[:n]...)
select {
case s.tun.send <- bs:
} else if len(bs) > 0 {
ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
ipv6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
isCGA := true
// Check source addresses
switch {
case ipv6 && bs[8] == 0x02 && bytes.Equal(s.addr[:16], bs[8:24]): // source
case ipv6 && bs[8] == 0x03 && bytes.Equal(s.snet[:8], bs[8:16]): // source
default:
util.PutBytes(bs)
isCGA = false
}
// Check destiantion addresses
switch {
case ipv6 && bs[24] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[24:40]): // destination
case ipv6 && bs[24] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[24:32]): // destination
default:
isCGA = false
}
// Decide how to handle the packet
var skip bool
switch {
case isCGA: // Allowed
case s.tun.ckr.isEnabled() && (ipv4 || ipv6):
var srcAddr address.Address
var dstAddr address.Address
var addrlen int
if ipv4 {
copy(srcAddr[:], bs[12:16])
copy(dstAddr[:], bs[16:20])
addrlen = 4
}
if ipv6 {
copy(srcAddr[:], bs[8:24])
copy(dstAddr[:], bs[24:40])
addrlen = 16
}
if !s.tun.ckr.isValidLocalAddress(dstAddr, addrlen) {
// The destination address isn't in our CKR allowed range
skip = true
} else if key, err := s.tun.ckr.getPublicKeyForAddress(srcAddr, addrlen); err == nil {
srcNodeID := crypto.GetNodeID(&key)
if s.conn.RemoteAddr() == *srcNodeID {
// This is the one allowed CKR case, where source and destination addresses are both good
} else {
// The CKR key associated with this address doesn't match the sender's NodeID
skip = true
}
} else {
// We have no CKR route for this source address
skip = true
}
default:
skip = true
}
if skip {
util.PutBytes(bs)
continue
}
s.tun.send <- bs
s.stillAlive()
} else {
util.PutBytes(bs)
}
}
}
@@ -96,12 +151,72 @@ func (s *tunConn) writer() error {
select {
case <-s.stop:
return nil
case b, ok := <-s.send:
case bs, ok := <-s.send:
if !ok {
return errors.New("send closed")
}
// TODO write timeout and close
if _, err := s.conn.Write(b); err != nil {
v4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
v6 := len(bs) > 40 && bs[0]&0xf0 == 0x60
isCGA := true
// Check source addresses
switch {
case v6 && bs[8] == 0x02 && bytes.Equal(s.tun.addr[:16], bs[8:24]): // source
case v6 && bs[8] == 0x03 && bytes.Equal(s.tun.subnet[:8], bs[8:16]): // source
default:
isCGA = false
}
// Check destiantion addresses
switch {
case v6 && bs[24] == 0x02 && bytes.Equal(s.addr[:16], bs[24:40]): // destination
case v6 && bs[24] == 0x03 && bytes.Equal(s.snet[:8], bs[24:32]): // destination
default:
isCGA = false
}
// Decide how to handle the packet
var skip bool
switch {
case isCGA: // Allowed
case s.tun.ckr.isEnabled() && (v4 || v6):
var srcAddr address.Address
var dstAddr address.Address
var addrlen int
if v4 {
copy(srcAddr[:], bs[12:16])
copy(dstAddr[:], bs[16:20])
addrlen = 4
}
if v6 {
copy(srcAddr[:], bs[8:24])
copy(dstAddr[:], bs[24:40])
addrlen = 16
}
if !s.tun.ckr.isValidLocalAddress(srcAddr, addrlen) {
// The source address isn't in our CKR allowed range
skip = true
} else if key, err := s.tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil {
dstNodeID := crypto.GetNodeID(&key)
if s.conn.RemoteAddr() == *dstNodeID {
// This is the one allowed CKR case, where source and destination addresses are both good
} else {
// The CKR key associated with this address doesn't match the sender's NodeID
skip = true
}
} else {
// We have no CKR route for this destination address... why do we have the packet in the first place?
skip = true
}
default:
skip = true
}
if skip {
util.PutBytes(bs)
continue
}
msg := yggdrasil.FlowKeyMessage{
FlowKey: util.GetFlowKey(bs),
Message: bs,
}
if err := s.conn.WriteNoCopy(msg); err != nil {
if e, eok := err.(yggdrasil.ConnError); !eok {
if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)
@@ -112,9 +227,9 @@ func (s *tunConn) writer() error {
// TODO: This currently isn't aware of IPv4 for CKR
ptb := &icmp.PacketTooBig{
MTU: int(e.PacketMaximumSize()),
Data: b[:900],
Data: bs[:900],
}
if packet, err := CreateICMPv6(b[8:24], b[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
if packet, err := CreateICMPv6(bs[8:24], bs[24:40], ipv6.ICMPTypePacketTooBig, 0, ptb); err == nil {
s.tun.send <- packet
}
} else {
@@ -127,7 +242,6 @@ func (s *tunConn) writer() error {
} else {
s.stillAlive()
}
util.PutBytes(b)
}
}
}

View File

@@ -2,7 +2,6 @@ package tuntap
import (
"bytes"
"errors"
"net"
"time"
@@ -22,24 +21,6 @@ func (tun *TunAdapter) writer() error {
continue
}
if tun.iface.IsTAP() {
var dstAddr address.Address
if b[0]&0xf0 == 0x60 {
if len(b) < 40 {
//panic("Tried to send a packet shorter than an IPv6 header...")
util.PutBytes(b)
continue
}
copy(dstAddr[:16], b[24:])
} else if b[0]&0xf0 == 0x40 {
if len(b) < 20 {
//panic("Tried to send a packet shorter than an IPv4 header...")
util.PutBytes(b)
continue
}
copy(dstAddr[:4], b[16:])
} else {
return errors.New("Invalid address family")
}
sendndp := func(dstAddr address.Address) {
neigh, known := tun.icmpv6.getNeighbor(dstAddr)
known = known && (time.Since(neigh.lastsolicitation).Seconds() < 30)
@@ -48,6 +29,7 @@ func (tun *TunAdapter) writer() error {
}
}
peermac := net.HardwareAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
var dstAddr address.Address
var peerknown bool
if b[0]&0xf0 == 0x40 {
dstAddr = tun.addr
@@ -69,7 +51,6 @@ func (tun *TunAdapter) writer() error {
} else {
// Nothing has been discovered, try to discover the destination
sendndp(tun.addr)
}
if peerknown {
var proto ethernet.Ethertype
@@ -139,15 +120,14 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
continue
}
}
// Shift forward to avoid leaking bytes off the front of the slide when we eventually store it
bs = append(recvd[:0], bs...)
if offset != 0 {
// Shift forward to avoid leaking bytes off the front of the slice when we eventually store it
bs = append(recvd[:0], bs...)
}
// From the IP header, work out what our source and destination addresses
// and node IDs are. We will need these in order to work out where to send
// the packet
var srcAddr address.Address
var dstAddr address.Address
var dstNodeID *crypto.NodeID
var dstNodeIDMask *crypto.NodeID
var dstSnet address.Subnet
var addrlen int
n := len(bs)
@@ -164,7 +144,6 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
}
// IPv6 address
addrlen = 16
copy(srcAddr[:addrlen], bs[8:])
copy(dstAddr[:addrlen], bs[24:])
copy(dstSnet[:addrlen/2], bs[24:])
} else if bs[0]&0xf0 == 0x40 {
@@ -178,37 +157,29 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
}
// IPv4 address
addrlen = 4
copy(srcAddr[:addrlen], bs[12:])
copy(dstAddr[:addrlen], bs[16:])
} else {
// Unknown address length or protocol, so drop the packet and ignore it
tun.log.Traceln("Unknown packet type, dropping")
continue
}
if tun.ckr.isEnabled() && !tun.ckr.isValidSource(srcAddr, addrlen) {
// The packet had a source address that doesn't belong to us or our
// configured crypto-key routing source subnets
continue
}
if !dstAddr.IsValid() && !dstSnet.IsValid() {
if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil {
// A public key was found, get the node ID for the search
dstNodeID = crypto.GetNodeID(&key)
// Do a quick check to ensure that the node ID refers to a vaild
// Yggdrasil address or subnet - this might be superfluous
addr := *address.AddrForNodeID(dstNodeID)
copy(dstAddr[:], addr[:])
copy(dstSnet[:], addr[:])
// Are we certain we looked up a valid node?
if !dstAddr.IsValid() && !dstSnet.IsValid() {
continue
if tun.ckr.isEnabled() {
if addrlen != 16 || (!dstAddr.IsValid() && !dstSnet.IsValid()) {
if key, err := tun.ckr.getPublicKeyForAddress(dstAddr, addrlen); err == nil {
// A public key was found, get the node ID for the search
dstNodeID := crypto.GetNodeID(&key)
dstAddr = *address.AddrForNodeID(dstNodeID)
dstSnet = *address.SubnetForNodeID(dstNodeID)
addrlen = 16
}
} else {
// No public key was found in the CKR table so we've exhausted our options
continue
}
}
if addrlen != 16 || (!dstAddr.IsValid() && !dstSnet.IsValid()) {
// Couldn't find this node's ygg IP
continue
}
// Do we have an active connection for this node address?
var dstNodeID, dstNodeIDMask *crypto.NodeID
tun.mutex.RLock()
session, isIn := tun.addrToConn[dstAddr]
if !isIn || session == nil {
@@ -260,11 +231,8 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
tun.mutex.Unlock()
if tc != nil {
for _, packet := range packets {
select {
case tc.send <- packet:
default:
util.PutBytes(packet)
}
p := packet // Possibly required because of how range
tc.send <- p
}
}
}()
@@ -274,21 +242,18 @@ func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
}
// If we have a connection now, try writing to it
if isIn && session != nil {
select {
case session.send <- bs:
default:
util.PutBytes(bs)
}
session.send <- bs
}
}
}
func (tun *TunAdapter) reader() error {
recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH)
toWorker := make(chan []byte, 32)
defer close(toWorker)
go tun.readerPacketHandler(toWorker)
for {
// Get a slice to store the packet in
recvd := util.ResizeBytes(util.GetBytes(), 65535+tun_ETHER_HEADER_LENGTH)
// Wait for a packet to be delivered to us through the TUN/TAP adapter
n, err := tun.iface.Read(recvd)
if err != nil {
@@ -298,9 +263,10 @@ func (tun *TunAdapter) reader() error {
panic(err)
}
if n == 0 {
util.PutBytes(recvd)
continue
}
bs := append(util.GetBytes(), recvd[:n]...)
toWorker <- bs
// Send the packet to the worker
toWorker <- recvd[:n]
}
}

View File

@@ -5,11 +5,7 @@ package tuntap
// The linux platform specific tun parts
import (
"errors"
"fmt"
"net"
"github.com/docker/libcontainer/netlink"
"github.com/vishvananda/netlink"
water "github.com/yggdrasil-network/water"
)
@@ -51,35 +47,21 @@ func (tun *TunAdapter) setup(ifname string, iftapmode bool, addr string, mtu int
// to exist on the system, but this will fail if Netlink is not present in the
// kernel (it nearly always is).
func (tun *TunAdapter) setupAddress(addr string) error {
// Set address
var netIF *net.Interface
ifces, err := net.Interfaces()
nladdr, err := netlink.ParseAddr(addr)
if err != nil {
return err
}
for _, ifce := range ifces {
if ifce.Name == tun.iface.Name() {
var newIF = ifce
netIF = &newIF // Don't point inside ifces, it's apparently unsafe?...
}
}
if netIF == nil {
return errors.New(fmt.Sprintf("Failed to find interface: %s", tun.iface.Name()))
}
ip, ipNet, err := net.ParseCIDR(addr)
nlintf, err := netlink.LinkByName(tun.iface.Name())
if err != nil {
return err
}
err = netlink.NetworkLinkAddIp(netIF, ip, ipNet)
if err != nil {
if err := netlink.AddrAdd(nlintf, nladdr); err != nil {
return err
}
err = netlink.NetworkSetMTU(netIF, tun.mtu)
if err != nil {
if err := netlink.LinkSetMTU(nlintf, tun.mtu); err != nil {
return err
}
netlink.NetworkLinkUp(netIF)
if err != nil {
if err := netlink.LinkSetUp(nlintf); err != nil {
return err
}
return nil

View File

@@ -2,9 +2,13 @@ package util
// These are misc. utility functions that didn't really fit anywhere else
import "runtime"
import "sync"
import "time"
import (
"runtime"
"strconv"
"strings"
"sync"
"time"
)
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
func Yield() {
@@ -22,27 +26,25 @@ func UnlockThread() {
}
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
var byteStoreMutex sync.Mutex
var byteStore [][]byte
var byteStore = sync.Pool{New: func() interface{} { return []byte(nil) }}
// Gets an empty slice from the byte store.
func GetBytes() []byte {
byteStoreMutex.Lock()
defer byteStoreMutex.Unlock()
if len(byteStore) > 0 {
var bs []byte
bs, byteStore = byteStore[len(byteStore)-1][:0], byteStore[:len(byteStore)-1]
return bs
} else {
return nil
}
return byteStore.Get().([]byte)[:0]
}
// Puts a slice in the store.
func PutBytes(bs []byte) {
byteStoreMutex.Lock()
defer byteStoreMutex.Unlock()
byteStore = append(byteStore, bs)
byteStore.Put(bs)
}
// Gets a slice of the appropriate length, reusing existing slice capacity when possible
func ResizeBytes(bs []byte, length int) []byte {
if cap(bs) >= length {
return bs[:length]
} else {
return make([]byte, length)
}
}
// This is a workaround to go's broken timer implementation
@@ -91,3 +93,54 @@ func Difference(a, b []string) []string {
}
return ab
}
// DecodeCoordString decodes a string representing coordinates in [1 2 3] format
// and returns a []uint64.
func DecodeCoordString(in string) (out []uint64) {
s := strings.Trim(in, "[]")
t := strings.Split(s, " ")
for _, a := range t {
if u, err := strconv.ParseUint(a, 0, 64); err == nil {
out = append(out, u)
}
}
return out
}
// GetFlowLabel takes an IP packet as an argument and returns some information about the traffic flow.
// For IPv4 packets, this is derived from the source and destination protocol and port numbers.
// For IPv6 packets, this is derived from the FlowLabel field of the packet if this was set, otherwise it's handled like IPv4.
// The FlowKey is then used internally by Yggdrasil for congestion control.
func GetFlowKey(bs []byte) uint64 {
// Work out the flowkey - this is used to determine which switch queue
// traffic will be pushed to in the event of congestion
var flowkey uint64
// Get the IP protocol version from the packet
switch bs[0] & 0xf0 {
case 0x40: // IPv4 packet
// Check the packet meets minimum UDP packet length
if len(bs) >= 24 {
// Is the protocol TCP, UDP or SCTP?
if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 {
ihl := bs[0] & 0x0f * 4 // Header length
flowkey = uint64(bs[9])<<32 /* proto */ |
uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ |
uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */
}
}
case 0x60: // IPv6 packet
// Check if the flowlabel was specified in the packet header
flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3])
// If the flowlabel isn't present, make protokey from proto | sport | dport
// if the packet meets minimum UDP packet length
if flowkey == 0 && len(bs) >= 48 {
// Is the protocol TCP, UDP or SCTP?
if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 {
flowkey = uint64(bs[6])<<32 /* proto */ |
uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ |
uint64(bs[42])<<8 | uint64(bs[43]) /* dport */
}
}
}
return flowkey
}

29
src/util/workerpool.go Normal file
View File

@@ -0,0 +1,29 @@
package util
import "runtime"
var workerPool chan func()
func init() {
maxProcs := runtime.GOMAXPROCS(0)
if maxProcs < 1 {
maxProcs = 1
}
workerPool = make(chan func(), maxProcs)
for idx := 0; idx < maxProcs; idx++ {
go func() {
for f := range workerPool {
f()
}
}()
}
}
// WorkerGo submits a job to a pool of GOMAXPROCS worker goroutines.
// This is meant for short non-blocking functions f() where you could just go f(),
// but you want some kind of backpressure to prevent spawning endless goroutines.
// WorkerGo returns as soon as the function is queued to run, not when it finishes.
// In Yggdrasil, these workers are used for certain cryptographic operations.
func WorkerGo(f func()) {
workerPool <- f
}

22
src/version/version.go Normal file
View File

@@ -0,0 +1,22 @@
package version
var buildName string
var buildVersion string
// BuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func BuildName() string {
if buildName == "" {
return "yggdrasilctl"
}
return buildName
}
// BuildVersion gets the current build version. This is usually injected if
// built from git, or returns "unknown" otherwise.
func BuildVersion() string {
if buildVersion == "" {
return "unknown"
}
return buildVersion
}

View File

@@ -6,8 +6,6 @@ import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
@@ -34,7 +32,7 @@ type Peer struct {
// to a given node.
type SwitchPeer struct {
PublicKey crypto.BoxPubKey
Coords []byte
Coords []uint64
BytesSent uint64
BytesRecvd uint64
Port uint64
@@ -46,14 +44,14 @@ type SwitchPeer struct {
// DHT searches.
type DHTEntry struct {
PublicKey crypto.BoxPubKey
Coords []byte
Coords []uint64
LastSeen time.Duration
}
// DHTRes represents a DHT response, as returned by DHTPing.
type DHTRes struct {
PublicKey crypto.BoxPubKey // key of the sender
Coords []byte // coords of the sender
Coords []uint64 // coords of the sender
Dest crypto.NodeID // the destination node ID
Infos []DHTEntry // response
}
@@ -85,7 +83,7 @@ type SwitchQueue struct {
// Session represents an open session with another node.
type Session struct {
PublicKey crypto.BoxPubKey
Coords []byte
Coords []uint64
BytesSent uint64
BytesRecvd uint64
MTU uint16
@@ -138,7 +136,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
}
coords := elem.locator.getCoords()
info := SwitchPeer{
Coords: append([]byte{}, coords...),
Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...),
BytesSent: atomic.LoadUint64(&peer.bytesSent),
BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd),
Port: uint64(elem.port),
@@ -166,7 +164,7 @@ func (c *Core) GetDHT() []DHTEntry {
})
for _, v := range dhtentry {
info := DHTEntry{
Coords: append([]byte{}, v.coords...),
Coords: append([]uint64{}, wire_coordsBytestoUint64s(v.coords)...),
LastSeen: now.Sub(v.recv),
}
copy(info.PublicKey[:], v.key[:])
@@ -214,7 +212,7 @@ func (c *Core) GetSessions() []Session {
var session Session
workerFunc := func() {
session = Session{
Coords: append([]byte{}, sinfo.coords...),
Coords: append([]uint64{}, wire_coordsBytestoUint64s(sinfo.coords)...),
MTU: sinfo.getMTU(),
BytesSent: sinfo.bytesSent,
BytesRecvd: sinfo.bytesRecvd,
@@ -243,24 +241,6 @@ func (c *Core) GetSessions() []Session {
return sessions
}
// BuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func BuildName() string {
if buildName == "" {
return "yggdrasil"
}
return buildName
}
// BuildVersion gets the current build version. This is usually injected if
// built from git, or returns "unknown" otherwise.
func BuildVersion() string {
if buildVersion == "" {
return "unknown"
}
return buildVersion
}
// ConnListen returns a listener for Yggdrasil session connections.
func (c *Core) ConnListen() (*Listener, error) {
c.sessions.listenerMutex.Lock()
@@ -311,9 +291,9 @@ func (c *Core) EncryptionPublicKey() string {
}
// Coords returns the current coordinates of the node.
func (c *Core) Coords() []byte {
func (c *Core) Coords() []uint64 {
table := c.switchTable.table.Load().(lookupTable)
return table.self.getCoords()
return wire_coordsBytestoUint64s(table.self.getCoords())
}
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128
@@ -336,8 +316,8 @@ func (c *Core) MyNodeInfo() NodeInfoPayload {
return c.router.nodeinfo.getNodeInfo()
}
// SetNodeInfo the lcal nodeinfo. Note that nodeinfo can be any value or struct,
// it will be serialised into JSON automatically.
// SetNodeInfo sets the local nodeinfo. Note that nodeinfo can be any value or
// struct, it will be serialised into JSON automatically.
func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
}
@@ -346,30 +326,7 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
// key and coordinates specified. The third parameter specifies whether a cached
// result is acceptable - this results in less traffic being generated than is
// necessary when, e.g. crawling the network.
func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInfoPayload, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return NodeInfoPayload{}, err
} else {
copy(key[:], keyBytes)
}
if !nocache {
if response, err := c.router.nodeinfo.getCachedNodeInfo(key); err == nil {
return response, nil
}
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return NodeInfoPayload{}, err
} else {
coords = append(coords, uint8(u64))
}
}
func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) {
response := make(chan *NodeInfoPayload, 1)
sendNodeInfoRequest := func() {
c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) {
@@ -379,7 +336,7 @@ func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInf
default:
}
})
c.router.nodeinfo.sendNodeInfo(key, coords, false)
c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
}
c.router.doAdmin(sendNodeInfoRequest)
go func() {
@@ -389,7 +346,7 @@ func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInf
for res := range response {
return *res, nil
}
return NodeInfoPayload{}, fmt.Errorf("getNodeInfo timeout: %s", keyString)
return NodeInfoPayload{}, fmt.Errorf("getNodeInfo timeout: %s", hex.EncodeToString(key[:]))
}
// SetSessionGatekeeper allows you to configure a handler function for deciding
@@ -477,64 +434,38 @@ func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) {
// DHTPing sends a DHT ping to the node with the provided key and coords,
// optionally looking up the specified target NodeID.
func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return DHTRes{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return DHTRes{}, err
} else {
coords = append(coords, uint8(u64))
}
}
func (c *Core) DHTPing(key crypto.BoxPubKey, coords []uint64, target *crypto.NodeID) (DHTRes, error) {
resCh := make(chan *dhtRes, 1)
info := dhtInfo{
key: key,
coords: coords,
coords: wire_coordsUint64stoBytes(coords),
}
target := *info.getNodeID()
if targetString == "none" {
// Leave the default target in place
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
return DHTRes{}, err
} else if len(targetBytes) != len(target) {
return DHTRes{}, errors.New("Incorrect target NodeID length")
} else {
var target crypto.NodeID
copy(target[:], targetBytes)
if target == nil {
target = info.getNodeID()
}
rq := dhtReqKey{info.key, target}
rq := dhtReqKey{info.key, *target}
sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) {
resCh <- res
})
c.dht.ping(&info, &target)
c.dht.ping(&info, &rq.dest)
}
c.router.doAdmin(sendPing)
// TODO: do something better than the below...
res := <-resCh
if res != nil {
r := DHTRes{
Coords: append([]byte{}, res.Coords...),
Coords: append([]uint64{}, wire_coordsBytestoUint64s(res.Coords)...),
}
copy(r.PublicKey[:], res.Key[:])
for _, i := range res.Infos {
e := DHTEntry{
Coords: append([]byte{}, i.coords...),
Coords: append([]uint64{}, wire_coordsBytestoUint64s(i.coords)...),
}
copy(e.PublicKey[:], i.key[:])
r.Infos = append(r.Infos, e)
}
return r, nil
}
return DHTRes{}, fmt.Errorf("DHT ping timeout: %s", keyString)
return DHTRes{}, fmt.Errorf("DHT ping timeout: %s", hex.EncodeToString(key[:]))
}

View File

@@ -57,7 +57,6 @@ type Conn struct {
core *Core
readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer
cancel util.Cancellation
mutex sync.RWMutex // protects the below
nodeID *crypto.NodeID
nodeMask *crypto.NodeID
@@ -71,7 +70,6 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
nodeID: nodeID,
nodeMask: nodeMask,
session: session,
cancel: util.NewCancellation(),
}
return &conn
}
@@ -82,7 +80,7 @@ func (c *Conn) String() string {
return fmt.Sprintf("conn=%p", c)
}
// This should never be called from the router goroutine
// This should never be called from the router goroutine, used in the dial functions
func (c *Conn) search() error {
var sinfo *searchInfo
var isIn bool
@@ -122,133 +120,122 @@ func (c *Conn) search() error {
return nil
}
func (c *Conn) getDeadlineCancellation(value *atomic.Value) util.Cancellation {
// Used in session keep-alive traffic in Conn.Write
func (c *Conn) doSearch() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Start the search
sinfo.continueSearch()
}
}
go func() { c.core.router.admin <- routerWork }()
}
func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation, bool) {
if deadline, ok := value.Load().(time.Time); ok {
// A deadline is set, so return a Cancellation that uses it
return util.CancellationWithDeadline(c.cancel, deadline)
c := util.CancellationWithDeadline(c.session.cancel, deadline)
return c, true
} else {
// No cancellation was set, so return a child cancellation with no timeout
return util.CancellationChild(c.cancel)
// No deadline was set, so just return the existinc cancellation and a dummy value
return c.session.cancel, false
}
}
func (c *Conn) Read(b []byte) (int, error) {
// Take a copy of the session object
sinfo := c.session
cancel := c.getDeadlineCancellation(&c.readDeadline)
defer cancel.Cancel(nil)
var bs []byte
for {
// Wait for some traffic to come through from the session
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
} else {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
case p, ok := <-sinfo.recv:
// If the session is closed then do nothing
if !ok {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
var err error
sessionFunc := func() {
defer util.PutBytes(p.Payload)
// If the nonce is bad then drop the packet and return an error
if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
return
}
// Decrypt the packet
var isOK bool
bs, isOK = crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce)
// Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't
if !isOK {
err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, false, 0}
return
}
// Update the session
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
// Something went wrong in the session worker so abort
if err != nil {
if ce, ok := err.(*ConnError); ok && ce.Temporary() {
continue
}
return 0, err
}
// Copy results to the output slice and clean up
copy(b, bs)
util.PutBytes(bs)
// If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice
return len(bs), nil
// Used internally by Read, the caller is responsible for util.PutBytes when they're done.
func (c *Conn) ReadNoCopy() ([]byte, error) {
cancel, doCancel := c.getDeadlineCancellation(&c.readDeadline)
if doCancel {
defer cancel.Cancel(nil)
}
// Wait for some traffic to come through from the session
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return nil, ConnError{errors.New("read timeout"), true, false, false, 0}
} else {
return nil, ConnError{errors.New("session closed"), false, false, true, 0}
}
case bs := <-c.session.recv:
return bs, nil
}
}
func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
sinfo := c.session
var packet []byte
written := len(b)
// Implements net.Conn.Read
func (c *Conn) Read(b []byte) (int, error) {
bs, err := c.ReadNoCopy()
if err != nil {
return 0, err
}
n := len(bs)
if len(bs) > len(b) {
n = len(b)
err = ConnError{errors.New("read buffer too small for entire packet"), false, true, false, 0}
}
// Copy results to the output slice and clean up
copy(b, bs)
util.PutBytes(bs)
// Return the number of bytes copied to the slice, along with any error
return n, err
}
// Used internally by Write, the caller must not reuse the argument bytes when no error occurs
func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
var err error
sessionFunc := func() {
// Does the packet exceed the permitted size for the session?
if uint16(len(b)) > sinfo.getMTU() {
written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())}
if uint16(len(msg.Message)) > c.session.getMTU() {
err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())}
return
}
// Encrypt the packet
payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, b, &sinfo.myNonce)
defer util.PutBytes(payload)
// Construct the wire packet to send to the router
p := wire_trafficPacket{
Coords: sinfo.coords,
Handle: sinfo.theirHandle,
Nonce: *nonce,
Payload: payload,
}
packet = p.encode()
sinfo.bytesSent += uint64(len(b))
// The rest of this work is session keep-alive traffic
doSearch := func() {
routerWork := func() {
// Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID]
if !isIn {
// Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
}
// Continue the search
sinfo.continueSearch()
}
go func() { c.core.router.admin <- routerWork }()
}
switch {
case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
case time.Since(c.session.time) > 6*time.Second:
if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct
doSearch()
c.doSearch()
} else {
sinfo.core.sessions.ping(sinfo)
c.core.sessions.ping(c.session)
}
case sinfo.reset && sinfo.pingTime.Before(sinfo.time):
sinfo.core.sessions.ping(sinfo)
case c.session.reset && c.session.pingTime.Before(c.session.time):
c.core.sessions.ping(c.session)
default: // Don't do anything, to keep traffic throttled
}
}
sinfo.doFunc(sessionFunc)
// Give the packet to the router
if written > 0 {
sinfo.core.router.out(packet)
c.session.doFunc(sessionFunc)
if err == nil {
cancel, doCancel := c.getDeadlineCancellation(&c.writeDeadline)
if doCancel {
defer cancel.Cancel(nil)
}
select {
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
err = ConnError{errors.New("write timeout"), true, false, false, 0}
} else {
err = ConnError{errors.New("session closed"), false, false, true, 0}
}
case c.session.send <- msg:
}
}
return err
}
// Implements net.Conn.Write
func (c *Conn) Write(b []byte) (int, error) {
written := len(b)
msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)}
err := c.WriteNoCopy(msg)
if err != nil {
util.PutBytes(msg.Message)
written = 0
}
// Finally return the number of bytes we wrote
return written, err
}
@@ -257,10 +244,9 @@ func (c *Conn) Close() (err error) {
defer c.mutex.Unlock()
if c.session != nil {
// Close the session, if it hasn't been closed already
c.core.router.doAdmin(c.session.close)
}
if e := c.cancel.Cancel(errors.New("connection closed")); e != nil {
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
}
}
return
}

View File

@@ -10,11 +10,9 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
)
var buildName string
var buildVersion string
// The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run.
type Core struct {
@@ -164,10 +162,10 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState,
Previous: *nc,
}
if name := BuildName(); name != "unknown" {
if name := version.BuildName(); name != "unknown" {
c.log.Infoln("Build name:", name)
}
if version := BuildVersion(); version != "unknown" {
if version := version.BuildVersion(); version != "unknown" {
c.log.Infoln("Build version:", version)
}

View File

@@ -556,8 +556,10 @@ func DEBUG_simLinkPeers(p, q *peer) {
goWorkers := func(source, dest *peer) {
source.linkOut = make(chan []byte, 1)
send := make(chan []byte, 1)
source.out = func(bs []byte) {
send <- bs
source.out = func(bss [][]byte) {
for _, bs := range bss {
send <- bs
}
}
go source.linkLoop()
go func() {

View File

@@ -37,7 +37,7 @@ type linkInfo struct {
type linkInterfaceMsgIO interface {
readMsg() ([]byte, error)
writeMsg([]byte) (int, error)
writeMsgs([][]byte) (int, error)
close() error
// These are temporary workarounds to stream semantics
_sendMetaBytes([]byte) error
@@ -179,7 +179,10 @@ func (intf *linkInterface) handler() error {
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name)
intf.msgIO.close()
<-oldIntf.closed
if !intf.incoming {
// Block outgoing connection attempts until the existing connection closes
<-oldIntf.closed
}
return nil
} else {
intf.closed = make(chan struct{})
@@ -204,11 +207,11 @@ func (intf *linkInterface) handler() error {
intf.link.core.peers.removePeer(intf.peer.port)
}()
// Finish setting up the peer struct
out := make(chan []byte, 1)
out := make(chan [][]byte, 1)
defer close(out)
intf.peer.out = func(msg []byte) {
intf.peer.out = func(msgs [][]byte) {
defer func() { recover() }()
out <- msg
out <- msgs
}
intf.peer.linkOut = make(chan []byte, 1)
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
@@ -231,12 +234,12 @@ func (intf *linkInterface) handler() error {
interval := 4 * time.Second
tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
defer util.TimerStop(tcpTimer)
send := func(bs []byte) {
send := func(bss [][]byte) {
sendBlocked.Reset(time.Second)
intf.msgIO.writeMsg(bs)
size, _ := intf.msgIO.writeMsgs(bss)
util.TimerStop(sendBlocked)
select {
case signalSent <- len(bs) > 0:
case signalSent <- size > 0:
default:
}
}
@@ -244,7 +247,7 @@ func (intf *linkInterface) handler() error {
// First try to send any link protocol traffic
select {
case msg := <-intf.peer.linkOut:
send(msg)
send([][]byte{msg})
continue
default:
}
@@ -256,19 +259,21 @@ func (intf *linkInterface) handler() error {
case <-tcpTimer.C:
intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
send([][]byte{nil})
case <-sendAck:
intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil)
send([][]byte{nil})
case msg := <-intf.peer.linkOut:
send(msg)
case msg, ok := <-out:
send([][]byte{msg})
case msgs, ok := <-out:
if !ok {
return
}
send(msg)
util.PutBytes(msg)
send(msgs)
for _, msg := range msgs {
util.PutBytes(msg)
}
select {
case signalReady <- struct{}{}:
default:

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/version"
)
type nodeinfo struct {
@@ -99,8 +100,8 @@ func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error {
m.myNodeInfoMutex.Lock()
defer m.myNodeInfoMutex.Unlock()
defaults := map[string]interface{}{
"buildname": BuildName(),
"buildversion": BuildVersion(),
"buildname": version.BuildName(),
"buildversion": version.BuildVersion(),
"buildplatform": runtime.GOOS,
"buildarch": runtime.GOARCH,
}

View File

@@ -109,7 +109,7 @@ type peer struct {
linkOut (chan []byte) // used for protocol traffic (to bypass queues)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo (chan *dhtInfo) // used to keep the DHT working
out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
}
@@ -250,11 +250,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
}
// This just calls p.out(packet) for now.
func (p *peer) sendPacket(packet []byte) {
func (p *peer) sendPackets(packets [][]byte) {
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
atomic.AddUint64(&p.bytesSent, uint64(len(packet)))
p.out(packet)
var size int
for _, packet := range packets {
size += len(packet)
}
atomic.AddUint64(&p.bytesSent, uint64(size))
p.out(packets)
}
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.

View File

@@ -39,10 +39,10 @@ type router struct {
reconfigure chan chan error
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
in <-chan [][]byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
nodeinfo nodeinfo
}
@@ -52,7 +52,7 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 1) // TODO something better than this...
in := make(chan [][]byte, 1) // TODO something better than this...
self := linkInterface{
name: "(self)",
info: linkInfo{
@@ -62,7 +62,7 @@ func (r *router) init(core *Core) {
},
}
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packet []byte) { in <- packet }
p.out = func(packets [][]byte) { in <- packets }
r.in = in
out := make(chan []byte, 32)
go func() {
@@ -114,8 +114,10 @@ func (r *router) mainLoop() {
defer ticker.Stop()
for {
select {
case p := <-r.in:
r.handleIn(p)
case ps := <-r.in:
for _, p := range ps {
r.handleIn(p)
}
case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info)
case <-r.reset:
@@ -127,7 +129,6 @@ func (r *router) mainLoop() {
r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance()
r.core.sessions.cleanup()
util.GetBytes() // To slowly drain things
}
case f := <-r.admin:
f()
@@ -163,11 +164,12 @@ func (r *router) handleTraffic(packet []byte) {
}
sinfo, isIn := r.core.sessions.getSessionForHandle(&p.Handle)
if !isIn {
util.PutBytes(p.Payload)
return
}
select {
case sinfo.recv <- &p: // FIXME ideally this should be front drop
default:
case sinfo.fromRouter <- p:
case <-sinfo.cancel.Finished():
util.PutBytes(p.Payload)
}
}

View File

@@ -65,17 +65,11 @@ func (s *searches) init(core *Core) {
// Creates a new search info, adds it to the searches struct, and returns a pointer to the info.
func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo {
now := time.Now()
//for dest, sinfo := range s.searches {
// if now.Sub(sinfo.time) > time.Minute {
// delete(s.searches, dest)
// }
//}
info := searchInfo{
core: s.core,
dest: *dest,
mask: *mask,
time: now.Add(-time.Second),
time: time.Now(),
callback: callback,
}
s.searches[*dest] = &info
@@ -138,9 +132,11 @@ func (sinfo *searchInfo) addToSearch(res *dhtRes) {
// Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping.
func (sinfo *searchInfo) doSearchStep() {
if len(sinfo.toVisit) == 0 {
// Dead end, do cleanup
delete(sinfo.core.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
if time.Since(sinfo.time) > search_RETRY_TIME {
// Dead end and no response in too long, do cleanup
delete(sinfo.core.searches.searches, sinfo.dest)
sinfo.callback(nil, errors.New("search reached dead end"))
}
return
}
// Send to the next search target
@@ -149,15 +145,12 @@ func (sinfo *searchInfo) doSearchStep() {
rq := dhtReqKey{next.key, sinfo.dest}
sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes)
sinfo.core.dht.ping(next, &sinfo.dest)
sinfo.time = time.Now()
}
// If we've recenty sent a ping for this search, do nothing.
// Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME.
func (sinfo *searchInfo) continueSearch() {
if time.Since(sinfo.time) < search_RETRY_TIME {
return
}
sinfo.time = time.Now()
sinfo.doSearchStep()
// In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes
@@ -209,6 +202,8 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool {
if sess == nil {
// nil if the DHT search finished but the session wasn't allowed
sinfo.callback(nil, errors.New("session not allowed"))
// Cleanup
delete(sinfo.core.searches.searches, res.Dest)
return true
}
_, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key)

View File

@@ -6,46 +6,71 @@ package yggdrasil
import (
"bytes"
"container/heap"
"errors"
"sync"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
)
// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
const nonceWindow = time.Second
// A heap of nonces, used with a map[nonce]time to allow out-of-order packets a little time to arrive without rejecting them
type nonceHeap []crypto.BoxNonce
func (h nonceHeap) Len() int { return len(h) }
func (h nonceHeap) Less(i, j int) bool { return h[i].Minus(&h[j]) < 0 }
func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nonceHeap) Push(x interface{}) { *h = append(*h, x.(crypto.BoxNonce)) }
func (h *nonceHeap) Pop() interface{} {
l := len(*h)
var n crypto.BoxNonce
n, *h = (*h)[l-1], (*h)[:l-1]
return n
}
func (h nonceHeap) peek() *crypto.BoxNonce { return &h[len(h)-1] }
// All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct {
mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session
core *Core //
reconfigure chan chan error //
theirAddr address.Address //
theirSubnet address.Subnet //
theirPermPub crypto.BoxPubKey //
theirSesPub crypto.BoxPubKey //
mySesPub crypto.BoxPubKey //
mySesPriv crypto.BoxPrivKey //
sharedSesKey crypto.BoxSharedKey // derived from session keys
theirHandle crypto.Handle //
myHandle crypto.Handle //
theirNonce crypto.BoxNonce //
theirNonceMask uint64 //
myNonce crypto.BoxNonce //
theirMTU uint16 //
myMTU uint16 //
wasMTUFixed bool // Was the MTU fixed by a receive error?
timeOpened time.Time // Time the sessino was opened
time time.Time // Time we last received a packet
mtuTime time.Time // time myMTU was last changed
pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent
coords []byte // coords of destination
reset bool // reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session
core *Core //
reconfigure chan chan error //
theirAddr address.Address //
theirSubnet address.Subnet //
theirPermPub crypto.BoxPubKey //
theirSesPub crypto.BoxPubKey //
mySesPub crypto.BoxPubKey //
mySesPriv crypto.BoxPrivKey //
sharedSesKey crypto.BoxSharedKey // derived from session keys
theirHandle crypto.Handle //
myHandle crypto.Handle //
theirNonce crypto.BoxNonce //
theirNonceHeap nonceHeap // priority queue to keep track of the lowest nonce we recently accepted
theirNonceMap map[crypto.BoxNonce]time.Time // time we added each nonce to the heap
myNonce crypto.BoxNonce //
theirMTU uint16 //
myMTU uint16 //
wasMTUFixed bool // Was the MTU fixed by a receive error?
timeOpened time.Time // Time the sessino was opened
time time.Time // Time we last received a packet
mtuTime time.Time // time myMTU was last changed
pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent
coords []byte // coords of destination
reset bool // reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers
fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session
recv chan []byte // Decrypted packets go here, picked up by the associated Conn
send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
}
func (sinfo *sessionInfo) doFunc(f func()) {
@@ -82,7 +107,8 @@ func (s *sessionInfo) update(p *sessionPing) bool {
s.theirHandle = p.Handle
s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub)
s.theirNonce = crypto.BoxNonce{}
s.theirNonceMask = 0
s.theirNonceHeap = nil
s.theirNonceMap = make(map[crypto.BoxNonce]time.Time)
}
if p.MTU >= 1280 || p.MTU == 0 {
s.theirMTU = p.MTU
@@ -203,6 +229,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.pingTime = now
sinfo.pingSend = now
sinfo.init = make(chan struct{})
sinfo.cancel = util.NewCancellation()
higher := false
for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@@ -222,9 +249,17 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.recv = make(chan *wire_trafficPacket, 32)
sinfo.fromRouter = make(chan wire_trafficPacket, 1)
sinfo.recv = make(chan []byte, 32)
sinfo.send = make(chan FlowKeyMessage, 32)
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go func() {
// Run cleanup when the session is canceled
<-sinfo.cancel.Finished()
sinfo.core.router.doAdmin(sinfo.close)
}()
go sinfo.startWorkers()
return &sinfo
}
@@ -335,39 +370,39 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
func (ss *sessions) handlePing(ping *sessionPing) {
// Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
// Check if the session is allowed
// TODO: this check may need to be moved
if !isIn && !ss.isSessionAllowed(&ping.SendPermPub, false) {
return
}
// Create the session if it doesn't already exist
if !isIn {
ss.createSession(&ping.SendPermPub)
sinfo, isIn = ss.getByTheirPerm(&ping.SendPermPub)
if !isIn {
panic("This should not happen")
}
switch {
case isIn: // Session already exists
case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed
case ping.IsPong: // This is a response, not an initial ping, so ignore it.
default:
ss.listenerMutex.Lock()
// Check and see if there's a Listener waiting to accept connections
// TODO: this should not block if nothing is accepting
if !ping.IsPong && ss.listener != nil {
if ss.listener != nil {
// This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions.
// We need to create a session and pass it to the listener.
sinfo = ss.createSession(&ping.SendPermPub)
if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
panic("This should not happen")
}
conn := newConn(ss.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF
}
ss.listener.conn <- conn
c := ss.listener.conn
go func() { c <- conn }()
}
ss.listenerMutex.Unlock()
}
sinfo.doFunc(func() {
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
return
}
if !ping.IsPong {
ss.sendPingPong(sinfo, true)
}
})
if sinfo != nil {
sinfo.doFunc(func() {
// Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
return
}
if !ping.IsPong {
ss.sendPingPong(sinfo, true)
}
})
}
}
// Get the MTU of the session.
@@ -386,27 +421,40 @@ func (sinfo *sessionInfo) getMTU() uint16 {
// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received.
func (sinfo *sessionInfo) nonceIsOK(theirNonce *crypto.BoxNonce) bool {
// The bitmask is to allow for some non-duplicate out-of-order packets
diff := theirNonce.Minus(&sinfo.theirNonce)
if diff > 0 {
if theirNonce.Minus(&sinfo.theirNonce) > 0 {
// This is newer than the newest nonce we've seen
return true
}
return ^sinfo.theirNonceMask&(0x01<<uint64(-diff)) != 0
if len(sinfo.theirNonceHeap) > 0 {
if theirNonce.Minus(sinfo.theirNonceHeap.peek()) > 0 {
if _, isIn := sinfo.theirNonceMap[*theirNonce]; !isIn {
// This nonce is recent enough that we keep track of older nonces, but it's not one we've seen yet
return true
}
}
}
return false
}
// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce
func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Shift nonce mask if needed
// Set bit
diff := theirNonce.Minus(&sinfo.theirNonce)
if diff > 0 {
// This nonce is newer, so shift the window before setting the bit, and update theirNonce in the session info.
sinfo.theirNonceMask <<= uint64(diff)
sinfo.theirNonceMask &= 0x01
sinfo.theirNonce = *theirNonce
} else {
// This nonce is older, so set the bit but do not shift the window.
sinfo.theirNonceMask &= 0x01 << uint64(-diff)
// Start with some cleanup
for len(sinfo.theirNonceHeap) > 64 {
if time.Since(sinfo.theirNonceMap[*sinfo.theirNonceHeap.peek()]) < nonceWindow {
// This nonce is still fairly new, so keep it around
break
}
// TODO? reallocate the map in some cases, to free unused map space?
delete(sinfo.theirNonceMap, *sinfo.theirNonceHeap.peek())
heap.Pop(&sinfo.theirNonceHeap)
}
if theirNonce.Minus(&sinfo.theirNonce) > 0 {
// This nonce is the newest we've seen, so make a note of that
sinfo.theirNonce = *theirNonce
}
// Add it to the heap/map so we know not to allow it again
heap.Push(&sinfo.theirNonceHeap, *theirNonce)
sinfo.theirNonceMap[*theirNonce] = time.Now()
}
// Resets all sessions to an uninitialized state.
@@ -418,3 +466,203 @@ func (ss *sessions) reset() {
})
}
}
////////////////////////////////////////////////////////////////////////////////
//////////////////////////// Worker Functions Below ////////////////////////////
////////////////////////////////////////////////////////////////////////////////
func (sinfo *sessionInfo) startWorkers() {
go sinfo.recvWorker()
go sinfo.sendWorker()
}
type FlowKeyMessage struct {
FlowKey uint64
Message []byte
}
func (sinfo *sessionInfo) recvWorker() {
// TODO move theirNonce etc into a struct that gets stored here, passed in over a channel
// Since there's no reason for anywhere else in the session code to need to *read* it...
// Only needs to be updated from the outside if a ping resets it...
// That would get rid of the need to take a mutex for the sessionFunc
var callbacks []chan func()
doRecv := func(p wire_trafficPacket) {
var bs []byte
var err error
var k crypto.BoxSharedKey
sessionFunc := func() {
if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
return
}
k = sinfo.sharedSesKey
}
sinfo.doFunc(sessionFunc)
if err != nil {
util.PutBytes(p.Payload)
return
}
var isOK bool
ch := make(chan func(), 1)
poolFunc := func() {
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
callback := func() {
util.PutBytes(p.Payload)
if !isOK {
util.PutBytes(bs)
return
}
sessionFunc = func() {
if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) {
// The session updated in the mean time, so return an error
err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0}
return
}
sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs))
}
sinfo.doFunc(sessionFunc)
if err != nil {
// Not sure what else to do with this packet, I guess just drop it
util.PutBytes(bs)
} else {
// Pass the packet to the buffer for Conn.Read
select {
case <-sinfo.cancel.Finished():
util.PutBytes(bs)
case sinfo.recv <- bs:
}
}
}
ch <- callback
}
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
}
fromHelper := make(chan wire_trafficPacket, 1)
go func() {
var buf []wire_trafficPacket
for {
for len(buf) > 0 {
select {
case <-sinfo.cancel.Finished():
return
case p := <-sinfo.fromRouter:
buf = append(buf, p)
for len(buf) > 64 { // Based on nonce window size
util.PutBytes(buf[0].Payload)
buf = buf[1:]
}
case fromHelper <- buf[0]:
buf = buf[1:]
}
}
select {
case <-sinfo.cancel.Finished():
return
case p := <-sinfo.fromRouter:
buf = append(buf, p)
}
}
}()
select {
case <-sinfo.cancel.Finished():
return
case <-sinfo.init:
// Wait until the session has finished initializing before processing any packets
}
for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case p := <-fromHelper:
doRecv(p)
}
}
select {
case <-sinfo.cancel.Finished():
return
case p := <-fromHelper:
doRecv(p)
}
}
}
func (sinfo *sessionInfo) sendWorker() {
// TODO move info that this worker needs here, send updates via a channel
// Otherwise we need to take a mutex to avoid races with update()
var callbacks []chan func()
doSend := func(msg FlowKeyMessage) {
var p wire_trafficPacket
var k crypto.BoxSharedKey
sessionFunc := func() {
sinfo.bytesSent += uint64(len(msg.Message))
p = wire_trafficPacket{
Coords: append([]byte(nil), sinfo.coords...),
Handle: sinfo.theirHandle,
Nonce: sinfo.myNonce,
}
if msg.FlowKey != 0 {
// Helps ensure that traffic from this flow ends up in a separate queue from other flows
// The zero padding relies on the fact that the self-peer is always on port 0
p.Coords = append(p.Coords, 0)
p.Coords = wire_put_uint64(msg.FlowKey, p.Coords)
}
sinfo.myNonce.Increment()
k = sinfo.sharedSesKey
}
// Get the mutex-protected info needed to encrypt the packet
sinfo.doFunc(sessionFunc)
ch := make(chan func(), 1)
poolFunc := func() {
// Encrypt the packet
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
// The callback will send the packet
callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode()
// Cleanup
util.PutBytes(msg.Message)
util.PutBytes(p.Payload)
// Send the packet
sinfo.core.router.out(packet)
}
ch <- callback
}
// Send to the worker and wait for it to finish
util.WorkerGo(poolFunc)
callbacks = append(callbacks, ch)
}
select {
case <-sinfo.cancel.Finished():
return
case <-sinfo.init:
// Wait until the session has finished initializing before processing any packets
}
for {
for len(callbacks) > 0 {
select {
case f := <-callbacks[0]:
callbacks = callbacks[1:]
f()
case <-sinfo.cancel.Finished():
return
case msg := <-sinfo.send:
doSend(msg)
}
}
select {
case <-sinfo.cancel.Finished():
return
case bs := <-sinfo.send:
doSend(bs)
}
}
}

View File

@@ -1,9 +1,11 @@
package yggdrasil
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
)
@@ -13,9 +15,8 @@ var _ = linkInterfaceMsgIO(&stream{})
type stream struct {
rwc io.ReadWriteCloser
inputBuffer []byte // Incoming packet stream
frag [2 * streamMsgSize]byte // Temporary data read off the underlying rwc, on its way to the inputBuffer
outputBuffer [2 * streamMsgSize]byte // Temporary data about to be written to the rwc
inputBuffer *bufio.Reader
outputBuffer net.Buffers
}
func (s *stream) close() error {
@@ -30,53 +31,33 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
// TODO have this also do the metadata handshake and create the peer struct
s.rwc = rwc
// TODO call something to do the metadata exchange
s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize)
}
// writeMsg writes a message with stream padding, and is *not* thread safe.
func (s *stream) writeMsg(bs []byte) (int, error) {
func (s *stream) writeMsgs(bss [][]byte) (int, error) {
buf := s.outputBuffer[:0]
buf = append(buf, streamMsg[:]...)
buf = wire_put_uint64(uint64(len(bs)), buf)
padLen := len(buf)
buf = append(buf, bs...)
var bn int
for bn < len(buf) {
n, err := s.rwc.Write(buf[bn:])
bn += n
if err != nil {
l := bn - padLen
if l < 0 {
l = 0
}
return l, err
}
var written int
for _, bs := range bss {
buf = append(buf, streamMsg[:])
buf = append(buf, wire_encode_uint64(uint64(len(bs))))
buf = append(buf, bs)
written += len(bs)
}
return len(bs), nil
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
_, err := buf.WriteTo(s.rwc)
// TODO only include number of bytes from bs *successfully* written?
return written, err
}
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
func (s *stream) readMsg() ([]byte, error) {
for {
buf := s.inputBuffer
msg, ok, err := stream_chopMsg(&buf)
switch {
case err != nil:
// Something in the stream format is corrupt
bs, err := s.readMsgFromBuffer()
if err != nil {
return nil, fmt.Errorf("message error: %v", err)
case ok:
// Copy the packet into bs, shift the buffer, and return
msg = append(util.GetBytes(), msg...)
s.inputBuffer = append(s.inputBuffer[:0], buf...)
return msg, nil
default:
// Wait for the underlying reader to return enough info for us to proceed
n, err := s.rwc.Read(s.frag[:])
if n > 0 {
s.inputBuffer = append(s.inputBuffer, s.frag[:n]...)
} else if err != nil {
return nil, err
}
}
return bs, err
}
}
@@ -108,34 +89,30 @@ func (s *stream) _recvMetaBytes() ([]byte, error) {
return metaBytes, nil
}
// This takes a pointer to a slice as an argument. It checks if there's a
// complete message and, if so, slices out those parts and returns the message,
// true, and nil. If there's no error, but also no complete message, it returns
// nil, false, and nil. If there's an error, it returns nil, false, and the
// error, which the reader then handles (currently, by returning from the
// reader, which causes the connection to close).
func stream_chopMsg(bs *[]byte) ([]byte, bool, error) {
// Returns msg, ok, err
if len(*bs) < len(streamMsg) {
return nil, false, nil
// Reads bytes from the underlying rwc and returns 1 full message
func (s *stream) readMsgFromBuffer() ([]byte, error) {
pad := streamMsg // Copy
_, err := io.ReadFull(s.inputBuffer, pad[:])
if err != nil {
return nil, err
} else if pad != streamMsg {
return nil, errors.New("bad message")
}
for idx := range streamMsg {
if (*bs)[idx] != streamMsg[idx] {
return nil, false, errors.New("bad message")
lenSlice := make([]byte, 0, 10)
// FIXME this nextByte stuff depends on wire.go format, kind of ugly to have it here
nextByte := byte(0xff)
for nextByte > 127 {
nextByte, err = s.inputBuffer.ReadByte()
if err != nil {
return nil, err
}
lenSlice = append(lenSlice, nextByte)
}
msgLen, msgLenLen := wire_decode_uint64((*bs)[len(streamMsg):])
msgLen, _ := wire_decode_uint64(lenSlice)
if msgLen > streamMsgSize {
return nil, false, errors.New("oversized message")
return nil, errors.New("oversized message")
}
msgBegin := len(streamMsg) + msgLenLen
msgEnd := msgBegin + int(msgLen)
if msgLenLen == 0 || len(*bs) < msgEnd {
// We don't have the full message
// Need to buffer this and wait for the rest to come in
return nil, false, nil
}
msg := (*bs)[msgBegin:msgEnd]
(*bs) = (*bs)[msgEnd:]
return msg, true, nil
msg := util.ResizeBytes(util.GetBytes(), int(msgLen))
_, err = io.ReadFull(s.inputBuffer, msg)
return msg, err
}

View File

@@ -630,6 +630,16 @@ func switch_getPacketStreamID(packet []byte) string {
return string(switch_getPacketCoords(packet))
}
// Returns the flowlabel from a given set of coords
func switch_getFlowLabelFromCoords(in []byte) []byte {
for i, v := range in {
if v == 0 {
return in[i+1:]
}
}
return []byte{}
}
// Find the best port for a given set of coords
func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
table := t.getTable()
@@ -667,19 +677,28 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
var update bool
switch {
case to == nil:
//nothing
// no port was found, ignore it
case !isIdle:
//nothing
// the port is busy, ignore it
case best == nil:
// this is the first idle port we've found, so select it until we find a
// better candidate port to use instead
update = true
case cinfo.dist < bestDist:
// the port takes a shorter path/is more direct than our current
// candidate, so select that instead
update = true
case cinfo.dist > bestDist:
//nothing
case thisTime.Before(bestTime):
// the port takes a longer path/is less direct than our current candidate,
// ignore it
case thisTime.After(bestTime):
// all else equal, this port was used more recently than our current
// candidate, so choose that instead. this should mean that, in low
// traffic scenarios, we consistently pick the same link which helps with
// packet ordering
update = true
default:
//nothing
// the search for a port has finished
}
if update {
best = to
@@ -690,12 +709,11 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
if best != nil {
// Send to the best idle next hop
delete(idle, best.port)
best.sendPacket(packet)
best.sendPackets([][]byte{packet})
return true
} else {
// Didn't find anyone idle to send it to
return false
}
// Didn't find anyone idle to send it to
return false
}
// Info about a buffered packet
@@ -766,39 +784,49 @@ func (t *switchTable) handleIdle(port switchPort) bool {
if to == nil {
return true
}
var best string
var bestPriority float64
var packets [][]byte
var psize int
t.queues.cleanup(t)
now := time.Now()
for streamID, buf := range t.queues.bufs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority > bestPriority && t.portIsCloser(coords, port) {
best = streamID
bestPriority = priority
for psize < 65535 {
var best string
var bestPriority float64
for streamID, buf := range t.queues.bufs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority > bestPriority && t.portIsCloser(coords, port) {
best = streamID
bestPriority = priority
}
}
}
if bestPriority != 0 {
buf := t.queues.bufs[best]
var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 {
delete(t.queues.bufs, best)
if bestPriority != 0 {
buf := t.queues.bufs[best]
var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 {
delete(t.queues.bufs, best)
} else {
// Need to update the map, since buf was retrieved by value
t.queues.bufs[best] = buf
}
packets = append(packets, packet.bytes)
psize += len(packet.bytes)
} else {
// Need to update the map, since buf was retrieved by value
t.queues.bufs[best] = buf
// Finished finding packets
break
}
to.sendPacket(packet.bytes)
return true
} else {
return false
}
if len(packets) > 0 {
to.sendPackets(packets)
return true
}
return false
}
// The switch worker does routing lookups and sends packets to where they need to be
@@ -808,23 +836,29 @@ func (t *switchTable) doWorker() {
// Keep sending packets to the router
self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter {
self.sendPacket(bs)
self.sendPackets([][]byte{bs})
}
}()
go func() {
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
var buf [][]byte
var size int
for {
buf = append(buf, <-t.toRouter)
bs := <-t.toRouter
size += len(bs)
buf = append(buf, bs)
for len(buf) > 0 {
select {
case bs := <-t.toRouter:
size += len(bs)
buf = append(buf, bs)
for len(buf) > 32 {
for size > int(t.queueTotalMaxSize) {
size -= len(buf[0])
util.PutBytes(buf[0])
buf = buf[1:]
}
case sendingToRouter <- buf[0]:
size -= len(buf[0])
buf = buf[1:]
}
}

View File

@@ -115,6 +115,29 @@ func wire_decode_coords(packet []byte) ([]byte, int) {
return packet[coordBegin:coordEnd], coordEnd
}
// Converts a []uint64 set of coords to a []byte set of coords.
func wire_coordsUint64stoBytes(in []uint64) (out []byte) {
for _, coord := range in {
c := wire_encode_uint64(coord)
out = append(out, c...)
}
return out
}
// Converts a []byte set of coords to a []uint64 set of coords.
func wire_coordsBytestoUint64s(in []byte) (out []uint64) {
offset := 0
for {
coord, length := wire_decode_uint64(in[offset:])
if length == 0 {
break
}
out = append(out, coord)
offset += length
}
return out
}
////////////////////////////////////////////////////////////////////////////////
// Encodes a swtichMsg into its wire format.