Merge pull request #734 from yggdrasil-network/develop-future

Future → Develop
This commit is contained in:
Neil Alexander 2020-10-11 16:45:24 +01:00 committed by GitHub
commit 9eb4981ac1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1086 additions and 899 deletions

View File

@ -215,15 +215,9 @@ workflows:
build: build:
jobs: jobs:
- lint - lint
- build-linux: - build-linux
requires: - build-macos
- lint - build-other
- build-macos:
requires:
- lint
- build-other:
requires:
- lint
- upload: - upload:
requires: requires:
- build-linux - build-linux

61
cmd/yggdrasilsim/dial.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"fmt"
"sort"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
func doListen(recvNode *simNode) {
// TODO be able to stop the listeners somehow so they don't leak across different tests
for {
c, err := recvNode.listener.Accept()
if err != nil {
panic(err)
}
c.Close()
}
}
func dialTest(sendNode, recvNode *simNode) {
if sendNode.id == recvNode.id {
fmt.Println("Skipping dial to self")
return
}
var mask crypto.NodeID
for idx := range mask {
mask[idx] = 0xff
}
for {
c, err := sendNode.dialer.DialByNodeIDandMask(nil, &recvNode.nodeID, &mask)
if c != nil {
c.Close()
return
}
if err != nil {
fmt.Println("Dial failed:", err)
}
time.Sleep(time.Second)
}
}
func dialStore(store nodeStore) {
var nodeIdxs []int
for idx, n := range store {
nodeIdxs = append(nodeIdxs, idx)
go doListen(n)
}
sort.Slice(nodeIdxs, func(i, j int) bool {
return nodeIdxs[i] < nodeIdxs[j]
})
for _, idx := range nodeIdxs {
sendNode := store[idx]
for _, jdx := range nodeIdxs {
recvNode := store[jdx]
fmt.Printf("Dialing from node %d to node %d / %d...\n", idx, jdx, len(store))
dialTest(sendNode, recvNode)
}
}
}

6
cmd/yggdrasilsim/main.go Normal file
View File

@ -0,0 +1,6 @@
package main
func main() {
store := makeStoreSquareGrid(4)
dialStore(store)
}

28
cmd/yggdrasilsim/node.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"io/ioutil"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
)
type simNode struct {
core yggdrasil.Core
id int
nodeID crypto.NodeID
dialer *yggdrasil.Dialer
listener *yggdrasil.Listener
}
func newNode(id int) *simNode {
n := simNode{id: id}
n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0))
n.nodeID = *n.core.NodeID()
n.dialer, _ = n.core.ConnDialer()
n.listener, _ = n.core.ConnListen()
return &n
}

41
cmd/yggdrasilsim/store.go Normal file
View File

@ -0,0 +1,41 @@
package main
type nodeStore map[int]*simNode
func makeStoreSingle() nodeStore {
s := make(nodeStore)
s[0] = newNode(0)
return s
}
func linkNodes(a *simNode, b *simNode) {
la := a.core.NewSimlink()
lb := b.core.NewSimlink()
la.SetDestination(lb)
lb.SetDestination(la)
la.Start()
lb.Start()
}
func makeStoreSquareGrid(sideLength int) nodeStore {
store := make(nodeStore)
nNodes := sideLength * sideLength
idxs := make([]int, 0, nNodes)
// TODO shuffle nodeIDs
for idx := 1; idx <= nNodes; idx++ {
idxs = append(idxs, idx)
}
for _, idx := range idxs {
n := newNode(idx)
store[idx] = n
}
for idx := 0; idx < nNodes; idx++ {
if (idx % sideLength) != 0 {
linkNodes(store[idxs[idx]], store[idxs[idx-1]])
}
if idx >= sideLength {
linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]])
}
}
return store
}

View File

@ -17,12 +17,11 @@ import (
"crypto/rand" "crypto/rand"
"crypto/sha512" "crypto/sha512"
"encoding/hex" "encoding/hex"
"sync"
"golang.org/x/crypto/curve25519" "golang.org/x/crypto/curve25519"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"golang.org/x/crypto/nacl/box" "golang.org/x/crypto/nacl/box"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -225,29 +224,36 @@ func GetSharedKey(myPrivKey *BoxPrivKey,
return (*BoxSharedKey)(&shared) return (*BoxSharedKey)(&shared)
} }
// pool is used internally by BoxOpen and BoxSeal to avoid allocating temporary space
var pool = sync.Pool{New: func() interface{} { return []byte(nil) }}
// BoxOpen returns a message and true if it successfully opens a crypto box using the provided shared key and nonce. // BoxOpen returns a message and true if it successfully opens a crypto box using the provided shared key and nonce.
// The boxed input slice's backing array is reused for the unboxed output when possible.
func BoxOpen(shared *BoxSharedKey, func BoxOpen(shared *BoxSharedKey,
boxed []byte, boxed []byte,
nonce *BoxNonce) ([]byte, bool) { nonce *BoxNonce) ([]byte, bool) {
out := util.GetBytes()
s := (*[BoxSharedKeyLen]byte)(shared) s := (*[BoxSharedKeyLen]byte)(shared)
n := (*[BoxNonceLen]byte)(nonce) n := (*[BoxNonceLen]byte)(nonce)
unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s) temp := append(pool.Get().([]byte), boxed...)
unboxed, success := box.OpenAfterPrecomputation(boxed[:0], temp, n, s)
pool.Put(temp[:0])
return unboxed, success return unboxed, success
} }
// BoxSeal seals a crypto box using the provided shared key, returning the box and the nonce needed to decrypt it. // BoxSeal seals a crypto box using the provided shared key, returning the box and the nonce needed to decrypt it.
// If nonce is nil, a random BoxNonce will be used and returned. // If nonce is nil, a random BoxNonce will be used and returned.
// If nonce is non-nil, then nonce.Increment() will be called before using it, and the incremented BoxNonce is what is returned. // If nonce is non-nil, then nonce.Increment() will be called before using it, and the incremented BoxNonce is what is returned.
// The unboxed input slice's backing array is reused for the boxed output when possible.
func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *BoxNonce) { func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *BoxNonce) {
if nonce == nil { if nonce == nil {
nonce = NewBoxNonce() nonce = NewBoxNonce()
} }
nonce.Increment() nonce.Increment()
out := util.GetBytes()
s := (*[BoxSharedKeyLen]byte)(shared) s := (*[BoxSharedKeyLen]byte)(shared)
n := (*[BoxNonceLen]byte)(nonce) n := (*[BoxNonceLen]byte)(nonce)
boxed := box.SealAfterPrecomputation(out, unboxed, n, s) temp := append(pool.Get().([]byte), unboxed...)
boxed := box.SealAfterPrecomputation(unboxed[:0], temp, n, s)
pool.Put(temp[:0])
return boxed, nonce return boxed, nonce
} }

View File

@ -44,13 +44,11 @@ func (s *tunConn) _read(bs []byte) (err error) {
select { select {
case <-s.stop: case <-s.stop:
err = errors.New("session was already closed") err = errors.New("session was already closed")
util.PutBytes(bs)
return return
default: default:
} }
if len(bs) == 0 { if len(bs) == 0 {
err = errors.New("read packet with 0 size") err = errors.New("read packet with 0 size")
util.PutBytes(bs)
return return
} }
ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40 ipv4 := len(bs) > 20 && bs[0]&0xf0 == 0x40
@ -107,7 +105,6 @@ func (s *tunConn) _read(bs []byte) (err error) {
} }
if skip { if skip {
err = errors.New("address not allowed") err = errors.New("address not allowed")
util.PutBytes(bs)
return return
} }
s.tun.writer.writeFrom(s, bs) s.tun.writer.writeFrom(s, bs)
@ -125,7 +122,6 @@ func (s *tunConn) _write(bs []byte) (err error) {
select { select {
case <-s.stop: case <-s.stop:
err = errors.New("session was already closed") err = errors.New("session was already closed")
util.PutBytes(bs)
return return
default: default:
} }
@ -183,7 +179,6 @@ func (s *tunConn) _write(bs []byte) (err error) {
} }
if skip { if skip {
err = errors.New("address not allowed") err = errors.New("address not allowed")
util.PutBytes(bs)
return return
} }
msg := yggdrasil.FlowKeyMessage{ msg := yggdrasil.FlowKeyMessage{

View File

@ -3,7 +3,6 @@ package tuntap
import ( import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
@ -14,6 +13,7 @@ const TUN_OFFSET_BYTES = 4
type tunWriter struct { type tunWriter struct {
phony.Inbox phony.Inbox
tun *TunAdapter tun *TunAdapter
buf [TUN_OFFSET_BYTES + 65536]byte
} }
func (w *tunWriter) writeFrom(from phony.Actor, b []byte) { func (w *tunWriter) writeFrom(from phony.Actor, b []byte) {
@ -25,15 +25,13 @@ func (w *tunWriter) writeFrom(from phony.Actor, b []byte) {
// write is pretty loose with the memory safety rules, e.g. it assumes it can // write is pretty loose with the memory safety rules, e.g. it assumes it can
// read w.tun.iface.IsTap() safely // read w.tun.iface.IsTap() safely
func (w *tunWriter) _write(b []byte) { func (w *tunWriter) _write(b []byte) {
defer util.PutBytes(b)
var written int var written int
var err error var err error
n := len(b) n := len(b)
if n == 0 { if n == 0 {
return return
} }
temp := append(util.ResizeBytes(util.GetBytes(), TUN_OFFSET_BYTES), b...) temp := append(w.buf[:TUN_OFFSET_BYTES], b...)
defer util.PutBytes(temp)
written, err = w.tun.iface.Write(temp, TUN_OFFSET_BYTES) written, err = w.tun.iface.Write(temp, TUN_OFFSET_BYTES)
if err != nil { if err != nil {
w.tun.Act(w, func() { w.tun.Act(w, func() {
@ -51,22 +49,23 @@ func (w *tunWriter) _write(b []byte) {
type tunReader struct { type tunReader struct {
phony.Inbox phony.Inbox
tun *TunAdapter tun *TunAdapter
buf [TUN_OFFSET_BYTES + 65536]byte
} }
func (r *tunReader) _read() { func (r *tunReader) _read() {
// Get a slice to store the packet in // Get a slice to store the packet in
recvd := util.ResizeBytes(util.GetBytes(), int(r.tun.mtu)+TUN_OFFSET_BYTES)
// Wait for a packet to be delivered to us through the TUN adapter // Wait for a packet to be delivered to us through the TUN adapter
n, err := r.tun.iface.Read(recvd, TUN_OFFSET_BYTES) n, err := r.tun.iface.Read(r.buf[:], TUN_OFFSET_BYTES)
if n <= TUN_OFFSET_BYTES || err != nil { if n <= TUN_OFFSET_BYTES || err != nil {
r.tun.log.Errorln("Error reading TUN:", err) r.tun.log.Errorln("Error reading TUN:", err)
ferr := r.tun.iface.Flush() ferr := r.tun.iface.Flush()
if ferr != nil { if ferr != nil {
r.tun.log.Errorln("Unable to flush packets:", ferr) r.tun.log.Errorln("Unable to flush packets:", ferr)
} }
util.PutBytes(recvd)
} else { } else {
r.tun.handlePacketFrom(r, recvd[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES], err) bs := make([]byte, n, n+crypto.BoxOverhead) // extra capacity for later...
copy(bs, r.buf[TUN_OFFSET_BYTES:n+TUN_OFFSET_BYTES])
r.tun.handlePacketFrom(r, bs, err)
} }
if err == nil { if err == nil {
// Now read again // Now read again
@ -175,7 +174,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
_, known := tun.dials[dstString] _, known := tun.dials[dstString]
tun.dials[dstString] = append(tun.dials[dstString], bs) tun.dials[dstString] = append(tun.dials[dstString], bs)
for len(tun.dials[dstString]) > 32 { for len(tun.dials[dstString]) > 32 {
util.PutBytes(tun.dials[dstString][0])
tun.dials[dstString] = tun.dials[dstString][1:] tun.dials[dstString] = tun.dials[dstString][1:]
} }
if !known { if !known {

View File

@ -1,21 +0,0 @@
//+build mobile
package util
import "runtime/debug"
func init() {
debug.SetGCPercent(25)
}
// GetBytes always returns a nil slice on mobile platforms.
func GetBytes() []byte {
return nil
}
// PutBytes does literally nothing on mobile platforms.
// This is done rather than keeping a free list of bytes on platforms with memory constraints.
// It's needed to help keep memory usage low enough to fall under the limits set for e.g. iOS NEPacketTunnelProvider apps.
func PutBytes(bs []byte) {
return
}

View File

@ -1,18 +0,0 @@
//+build !mobile
package util
import "sync"
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
var byteStore = sync.Pool{New: func() interface{} { return []byte(nil) }}
// GetBytes returns a 0-length (possibly nil) slice of bytes from a free list, so it may have a larger capacity.
func GetBytes() []byte {
return byteStore.Get().([]byte)[:0]
}
// PutBytes stores a slice in a free list, where it can potentially be reused to prevent future allocations.
func PutBytes(bs []byte) {
byteStore.Put(bs)
}

View File

@ -110,7 +110,8 @@ type Session struct {
// there is exactly one entry then this node is not connected to any other nodes // there is exactly one entry then this node is not connected to any other nodes
// and is therefore isolated. // and is therefore isolated.
func (c *Core) GetPeers() []Peer { func (c *Core) GetPeers() []Peer {
ports := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() { ports = c.peers.ports })
var peers []Peer var peers []Peer
var ps []switchPort var ps []switchPort
for port := range ports { for port := range ports {
@ -122,10 +123,10 @@ func (c *Core) GetPeers() []Peer {
var info Peer var info Peer
phony.Block(p, func() { phony.Block(p, func() {
info = Peer{ info = Peer{
Endpoint: p.intf.name, Endpoint: p.intf.name(),
BytesSent: p.bytesSent, BytesSent: p.bytesSent,
BytesRecvd: p.bytesRecvd, BytesRecvd: p.bytesRecvd,
Protocol: p.intf.info.linkType, Protocol: p.intf.interfaceType(),
Port: uint64(port), Port: uint64(port),
Uptime: time.Since(p.firstSeen), Uptime: time.Since(p.firstSeen),
} }
@ -143,10 +144,14 @@ func (c *Core) GetPeers() []Peer {
// isolated or not connected to any peers. // isolated or not connected to any peers.
func (c *Core) GetSwitchPeers() []SwitchPeer { func (c *Core) GetSwitchPeers() []SwitchPeer {
var switchpeers []SwitchPeer var switchpeers []SwitchPeer
table := c.switchTable.table.Load().(lookupTable) var table *lookupTable
peers := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() {
table = c.peers.table
ports = c.peers.ports
})
for _, elem := range table.elems { for _, elem := range table.elems {
peer, isIn := peers[elem.port] peer, isIn := ports[elem.port]
if !isIn { if !isIn {
continue continue
} }
@ -158,8 +163,8 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
BytesSent: peer.bytesSent, BytesSent: peer.bytesSent,
BytesRecvd: peer.bytesRecvd, BytesRecvd: peer.bytesRecvd,
Port: uint64(elem.port), Port: uint64(elem.port),
Protocol: peer.intf.info.linkType, Protocol: peer.intf.interfaceType(),
Endpoint: peer.intf.info.remote, Endpoint: peer.intf.remote(),
} }
copy(info.PublicKey[:], peer.box[:]) copy(info.PublicKey[:], peer.box[:])
}) })
@ -194,34 +199,6 @@ func (c *Core) GetDHT() []DHTEntry {
return dhtentries return dhtentries
} }
// GetSwitchQueues returns information about the switch queues that are
// currently in effect. These values can change within an instant.
func (c *Core) GetSwitchQueues() SwitchQueues {
var switchqueues SwitchQueues
switchTable := &c.switchTable
getSwitchQueues := func() {
switchqueues = SwitchQueues{
Count: uint64(len(switchTable.queues.bufs)),
Size: switchTable.queues.size,
HighestCount: uint64(switchTable.queues.maxbufs),
HighestSize: switchTable.queues.maxsize,
MaximumSize: switchTable.queues.totalMaxSize,
}
for k, v := range switchTable.queues.bufs {
nexthop := switchTable.bestPortForCoords([]byte(k))
queue := SwitchQueue{
ID: k,
Size: v.size,
Packets: uint64(len(v.packets)),
Port: uint64(nexthop),
}
switchqueues.Queues = append(switchqueues.Queues, queue)
}
}
phony.Block(&c.switchTable, getSwitchQueues)
return switchqueues
}
// GetSessions returns a list of open sessions from this node to other nodes. // GetSessions returns a list of open sessions from this node to other nodes.
func (c *Core) GetSessions() []Session { func (c *Core) GetSessions() []Session {
var sessions []Session var sessions []Session
@ -280,14 +257,14 @@ func (c *Core) ConnDialer() (*Dialer, error) {
// "Listen" configuration item, e.g. // "Listen" configuration item, e.g.
// tcp://a.b.c.d:e // tcp://a.b.c.d:e
func (c *Core) ListenTCP(uri string) (*TcpListener, error) { func (c *Core) ListenTCP(uri string) (*TcpListener, error) {
return c.link.tcp.listen(uri, nil) return c.links.tcp.listen(uri, nil)
} }
// ListenTLS starts a new TLS listener. The input URI should match that of the // ListenTLS starts a new TLS listener. The input URI should match that of the
// "Listen" configuration item, e.g. // "Listen" configuration item, e.g.
// tls://a.b.c.d:e // tls://a.b.c.d:e
func (c *Core) ListenTLS(uri string) (*TcpListener, error) { func (c *Core) ListenTLS(uri string) (*TcpListener, error) {
return c.link.tcp.listen(uri, c.link.tcp.tls.forListener) return c.links.tcp.listen(uri, c.links.tcp.tls.forListener)
} }
// NodeID gets the node ID. This is derived from your router encryption keys. // NodeID gets the node ID. This is derived from your router encryption keys.
@ -324,8 +301,11 @@ func (c *Core) EncryptionPublicKey() string {
// connected to any other nodes (effectively making you the root of a // connected to any other nodes (effectively making you the root of a
// single-node network). // single-node network).
func (c *Core) Coords() []uint64 { func (c *Core) Coords() []uint64 {
table := c.switchTable.table.Load().(lookupTable) var coords []byte
return wire_coordsBytestoUint64s(table.self.getCoords()) phony.Block(&c.router, func() {
coords = c.router.table.self.getCoords()
})
return wire_coordsBytestoUint64s(coords)
} }
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128 // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
@ -485,12 +465,14 @@ func (c *Core) RemovePeer(addr string, sintf string) error {
} }
} }
ports := c.peers.ports.Load().(map[switchPort]*peer) c.peers.Act(nil, func() {
for p, peer := range ports { ports := c.peers.ports
if addr == peer.intf.name { for _, peer := range ports {
c.peers.removePeer(p) if addr == peer.intf.name() {
c.peers._removePeer(peer)
}
} }
} })
return nil return nil
} }
@ -502,13 +484,17 @@ func (c *Core) RemovePeer(addr string, sintf string) error {
// This does not add the peer to the peer list, so if the connection drops, the // This does not add the peer to the peer list, so if the connection drops, the
// peer will not be called again automatically. // peer will not be called again automatically.
func (c *Core) CallPeer(addr string, sintf string) error { func (c *Core) CallPeer(addr string, sintf string) error {
return c.link.call(addr, sintf) return c.links.call(addr, sintf)
} }
// DisconnectPeer disconnects a peer once. This should be specified as a port // DisconnectPeer disconnects a peer once. This should be specified as a port
// number. // number.
func (c *Core) DisconnectPeer(port uint64) error { func (c *Core) DisconnectPeer(port uint64) error {
c.peers.removePeer(switchPort(port)) c.peers.Act(nil, func() {
if p, isIn := c.peers.ports[switchPort(port)]; isIn {
p.Act(&c.peers, p._removeSelf)
}
})
return nil return nil
} }

View File

@ -251,7 +251,6 @@ func (c *Conn) Read(b []byte) (int, error) {
} }
// Copy results to the output slice and clean up // Copy results to the output slice and clean up
copy(b, bs) copy(b, bs)
util.PutBytes(bs)
// Return the number of bytes copied to the slice, along with any error // Return the number of bytes copied to the slice, along with any error
return n, err return n, err
} }
@ -322,10 +321,11 @@ func (c *Conn) writeNoCopy(msg FlowKeyMessage) error {
// returned. // returned.
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
written := len(b) written := len(b)
msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)} bs := make([]byte, 0, len(b)+crypto.BoxOverhead)
bs = append(bs, b...)
msg := FlowKeyMessage{Message: bs}
err := c.writeNoCopy(msg) err := c.writeNoCopy(msg)
if err != nil { if err != nil {
util.PutBytes(msg.Message)
written = 0 written = 0
} }
return written, err return written, err

View File

@ -29,7 +29,7 @@ type Core struct {
switchTable switchTable switchTable switchTable
peers peers peers peers
router router router router
link link links links
log *log.Logger log *log.Logger
addPeerTimer *time.Timer addPeerTimer *time.Timer
} }
@ -165,7 +165,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState
return nil, err return nil, err
} }
if err := c.link.init(c); err != nil { if err := c.links.init(c); err != nil {
c.log.Errorln("Failed to start link interfaces") c.log.Errorln("Failed to start link interfaces")
return nil, err return nil, err
} }
@ -197,9 +197,11 @@ func (c *Core) _stop() {
if c.addPeerTimer != nil { if c.addPeerTimer != nil {
c.addPeerTimer.Stop() c.addPeerTimer.Stop()
} }
c.link.stop() c.links.stop()
/* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown
for _, peer := range c.GetPeers() { for _, peer := range c.GetPeers() {
c.DisconnectPeer(peer.Port) c.DisconnectPeer(peer.Port)
} }
*/
c.log.Infoln("Stopped") c.log.Infoln("Stopped")
} }

View File

@ -89,6 +89,11 @@ func (t *dht) reconfigure() {
// Resets the DHT in response to coord changes. // Resets the DHT in response to coord changes.
// This empties all info from the DHT and drops outstanding requests. // This empties all info from the DHT and drops outstanding requests.
func (t *dht) reset() { func (t *dht) reset() {
for _, info := range t.table {
if t.isImportant(info) {
t.ping(info, nil)
}
}
t.reqs = make(map[dhtReqKey]time.Time) t.reqs = make(map[dhtReqKey]time.Time)
t.table = make(map[crypto.NodeID]*dhtInfo) t.table = make(map[crypto.NodeID]*dhtInfo)
t.imp = nil t.imp = nil
@ -144,12 +149,8 @@ func (t *dht) insert(info *dhtInfo) {
// Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping // Insert a peer into the table if it hasn't been pinged lately, to keep peers from dropping
func (t *dht) insertPeer(info *dhtInfo) { func (t *dht) insertPeer(info *dhtInfo) {
oldInfo, isIn := t.table[*info.getNodeID()] t.insert(info) // FIXME this resets timers / ping counts / etc, so it seems kind of dangerous
if !isIn || time.Since(oldInfo.recv) > dht_max_delay+30*time.Second { t.ping(info, nil) // This is a quick fix to the above, ping them immediately...
// TODO? also check coords?
newInfo := *info // Insert a copy
t.insert(&newInfo)
}
} }
// Return true if first/second/third are (partially) ordered correctly. // Return true if first/second/third are (partially) ordered correctly.
@ -186,11 +187,9 @@ func dht_ordered(first, second, third *crypto.NodeID) bool {
// Update info about the node that sent the request. // Update info about the node that sent the request.
func (t *dht) handleReq(req *dhtReq) { func (t *dht) handleReq(req *dhtReq) {
// Send them what they asked for // Send them what they asked for
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{ res := dhtRes{
Key: t.router.core.boxPub, Key: t.router.core.boxPub,
Coords: coords, Coords: t.router.table.self.getCoords(),
Dest: req.Dest, Dest: req.Dest,
Infos: t.lookup(&req.Dest, false), Infos: t.lookup(&req.Dest, false),
} }
@ -302,11 +301,9 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) {
if target == nil { if target == nil {
target = &t.nodeID target = &t.nodeID
} }
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{ req := dhtReq{
Key: t.router.core.boxPub, Key: t.router.core.boxPub,
Coords: coords, Coords: t.router.table.self.getCoords(),
Dest: *target, Dest: *target,
} }
t.sendReq(&req, info) t.sendReq(&req, info)
@ -380,7 +377,7 @@ func (t *dht) getImportant() []*dhtInfo {
}) })
// Keep the ones that are no further than the closest seen so far // Keep the ones that are no further than the closest seen so far
minDist := ^uint64(0) minDist := ^uint64(0)
loc := t.router.core.switchTable.getLocator() loc := t.router.table.self
important := infos[:0] important := infos[:0]
for _, info := range infos { for _, info := range infos {
dist := uint64(loc.dist(info.coords)) dist := uint64(loc.dist(info.coords))
@ -418,7 +415,7 @@ func (t *dht) isImportant(ninfo *dhtInfo) bool {
} }
important := t.getImportant() important := t.getImportant()
// Check if ninfo is of equal or greater importance to what we already know // Check if ninfo is of equal or greater importance to what we already know
loc := t.router.core.switchTable.getLocator() loc := t.router.table.self
ndist := uint64(loc.dist(ninfo.coords)) ndist := uint64(loc.dist(ninfo.coords))
minDist := ^uint64(0) minDist := ^uint64(0)
for _, info := range important { for _, info := range important {

View File

@ -21,12 +21,12 @@ import (
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
type link struct { type links struct {
core *Core core *Core
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects links below
interfaces map[linkInfo]*linkInterface links map[linkInfo]*link
tcp tcp // TCP interface support tcp tcp // TCP interface support
stopped chan struct{} stopped chan struct{}
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
} }
@ -38,7 +38,7 @@ type linkInfo struct {
remote string // Remote name or address remote string // Remote name or address
} }
type linkInterfaceMsgIO interface { type linkMsgIO interface {
readMsg() ([]byte, error) readMsg() ([]byte, error)
writeMsgs([][]byte) (int, error) writeMsgs([][]byte) (int, error)
close() error close() error
@ -47,26 +47,26 @@ type linkInterfaceMsgIO interface {
_recvMetaBytes() ([]byte, error) _recvMetaBytes() ([]byte, error)
} }
type linkInterface struct { type link struct {
name string lname string
link *link links *links
peer *peer peer *peer
options linkOptions options linkOptions
msgIO linkInterfaceMsgIO msgIO linkMsgIO
info linkInfo info linkInfo
incoming bool incoming bool
force bool force bool
closed chan struct{} closed chan struct{}
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch reader linkReader // Reads packets, notifies this link, passes packets to switch
writer linkWriter // Writes packets, notifies this linkInterface writer linkWriter // Writes packets, notifies this link
phony.Inbox // Protects the below phony.Inbox // Protects the below
sendTimer *time.Timer // Fires to signal that sending is blocked sendTimer *time.Timer // Fires to signal that sending is blocked
keepAliveTimer *time.Timer // Fires to send keep-alive traffic keepAliveTimer *time.Timer // Fires to send keep-alive traffic
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link readUnblocked bool // True if we've sent a read message unblocking this peer in the switch
stalled bool // True if we haven't been receiving any response traffic writeUnblocked bool // True if we've sent a write message unblocking this peer in the swithc
unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) shutdown bool // True if we're shutting down, avoids sending some messages that could race with new peers being crated in the same port
} }
type linkOptions struct { type linkOptions struct {
@ -74,10 +74,10 @@ type linkOptions struct {
pinnedEd25519Keys map[crypto.SigPubKey]struct{} pinnedEd25519Keys map[crypto.SigPubKey]struct{}
} }
func (l *link) init(c *Core) error { func (l *links) init(c *Core) error {
l.core = c l.core = c
l.mutex.Lock() l.mutex.Lock()
l.interfaces = make(map[linkInfo]*linkInterface) l.links = make(map[linkInfo]*link)
l.mutex.Unlock() l.mutex.Unlock()
l.stopped = make(chan struct{}) l.stopped = make(chan struct{})
@ -89,11 +89,11 @@ func (l *link) init(c *Core) error {
return nil return nil
} }
func (l *link) reconfigure() { func (l *links) reconfigure() {
l.tcp.reconfigure() l.tcp.reconfigure()
} }
func (l *link) call(uri string, sintf string) error { func (l *links) call(uri string, sintf string) error {
u, err := url.Parse(uri) u, err := url.Parse(uri)
if err != nil { if err != nil {
return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
@ -140,7 +140,7 @@ func (l *link) call(uri string, sintf string) error {
return nil return nil
} }
func (l *link) listen(uri string) error { func (l *links) listen(uri string) error {
u, err := url.Parse(uri) u, err := url.Parse(uri)
if err != nil { if err != nil {
return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err) return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err)
@ -157,11 +157,11 @@ func (l *link) listen(uri string) error {
} }
} }
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*linkInterface, error) { func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
// Technically anything unique would work for names, but let's pick something human readable, just for debugging // Technically anything unique would work for names, but let's pick something human readable, just for debugging
intf := linkInterface{ intf := link{
name: name, lname: name,
link: l, links: l,
options: options, options: options,
msgIO: msgIO, msgIO: msgIO,
info: linkInfo{ info: linkInfo{
@ -173,12 +173,13 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
force: force, force: force,
} }
intf.writer.intf = &intf intf.writer.intf = &intf
intf.writer.worker = make(chan [][]byte, 1)
intf.reader.intf = &intf intf.reader.intf = &intf
intf.reader.err = make(chan error) intf.reader.err = make(chan error)
return &intf, nil return &intf, nil
} }
func (l *link) stop() error { func (l *links) stop() error {
close(l.stopped) close(l.stopped)
if err := l.tcp.stop(); err != nil { if err := l.tcp.stop(); err != nil {
return err return err
@ -186,12 +187,21 @@ func (l *link) stop() error {
return nil return nil
} }
func (intf *linkInterface) handler() error { func (intf *link) handler() error {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
go func() {
for bss := range intf.writer.worker {
intf.msgIO.writeMsgs(bss)
}
}()
defer intf.writer.Act(nil, func() {
intf.writer.closed = true
close(intf.writer.worker)
})
myLinkPub, myLinkPriv := crypto.NewBoxKeys() myLinkPub, myLinkPriv := crypto.NewBoxKeys()
meta := version_getBaseMetadata() meta := version_getBaseMetadata()
meta.box = intf.link.core.boxPub meta.box = intf.links.core.boxPub
meta.sig = intf.link.core.sigPub meta.sig = intf.links.core.sigPub
meta.link = *myLinkPub meta.link = *myLinkPub
metaBytes := meta.encode() metaBytes := meta.encode()
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
@ -214,26 +224,26 @@ func (intf *linkInterface) handler() error {
} }
base := version_getBaseMetadata() base := version_getBaseMetadata()
if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer {
intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
return errors.New("failed to connect: wrong version") return errors.New("failed to connect: wrong version")
} }
// Check if the remote side matches the keys we expected. This is a bit of a weak // Check if the remote side matches the keys we expected. This is a bit of a weak
// check - in future versions we really should check a signature or something like that. // check - in future versions we really should check a signature or something like that.
if pinned := intf.options.pinnedCurve25519Keys; pinned != nil { if pinned := intf.options.pinnedCurve25519Keys; pinned != nil {
if _, allowed := pinned[meta.box]; !allowed { if _, allowed := pinned[meta.box]; !allowed {
intf.link.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name) intf.links.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name)
return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys") return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys")
} }
} }
if pinned := intf.options.pinnedEd25519Keys; pinned != nil { if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
if _, allowed := pinned[meta.sig]; !allowed { if _, allowed := pinned[meta.sig]; !allowed {
intf.link.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name) intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys") return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
} }
} }
// Check if we're authorized to connect to this key / IP // Check if we're authorized to connect to this key / IP
if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { if intf.incoming && !intf.force && !intf.links.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
intf.msgIO.close() intf.msgIO.close()
return nil return nil
@ -241,52 +251,54 @@ func (intf *linkInterface) handler() error {
// Check if we already have a link to this node // Check if we already have a link to this node
intf.info.box = meta.box intf.info.box = meta.box
intf.info.sig = meta.sig intf.info.sig = meta.sig
intf.link.mutex.Lock() intf.links.mutex.Lock()
if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { if oldIntf, isIn := intf.links.links[intf.info]; isIn {
intf.link.mutex.Unlock() intf.links.mutex.Unlock()
// FIXME we should really return an error and let the caller block instead // FIXME we should really return an error and let the caller block instead
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name)
intf.msgIO.close() intf.msgIO.close()
if !intf.incoming { if !intf.incoming {
// Block outgoing connection attempts until the existing connection closes // Block outgoing connection attempts until the existing connection closes
<-oldIntf.closed <-oldIntf.closed
} }
return nil return nil
} else {
intf.closed = make(chan struct{})
intf.links.links[intf.info] = intf
defer func() {
intf.links.mutex.Lock()
delete(intf.links.links, intf.info)
intf.links.mutex.Unlock()
close(intf.closed)
}()
intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name)
} }
intf.closed = make(chan struct{}) intf.links.mutex.Unlock()
intf.link.interfaces[intf.info] = intf
defer func() {
intf.link.mutex.Lock()
delete(intf.link.interfaces, intf.info)
intf.link.mutex.Unlock()
close(intf.closed)
}()
intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name)
intf.link.mutex.Unlock()
// Create peer // Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link) shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) phony.Block(&intf.links.core.peers, func() {
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
intf.peer = intf.links.core.peers._newPeer(&meta.box, &meta.sig, shared, intf)
})
if intf.peer == nil { if intf.peer == nil {
return errors.New("failed to create peer") return errors.New("failed to create peer")
} }
defer func() { defer func() {
// More cleanup can go here // More cleanup can go here
intf.link.core.peers.removePeer(intf.peer.port) intf.Act(nil, func() {
intf.shutdown = true
intf.peer.Act(intf, intf.peer._removeSelf)
})
}() }()
intf.peer.out = func(msgs [][]byte) {
intf.writer.sendFrom(intf.peer, msgs, false)
}
intf.peer.linkOut = func(bs []byte) {
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true)
}
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
intf.link.core.log.Infof("Connected %s: %s, source %s", intf.links.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start things // Start things
go intf.peer.start() go intf.peer.start()
intf.Act(nil, intf._notifyIdle)
intf.reader.Act(nil, intf.reader._read) intf.reader.Act(nil, intf.reader._read)
// Wait for the reader to finish // Wait for the reader to finish
// TODO find a way to do this without keeping live goroutines around // TODO find a way to do this without keeping live goroutines around
@ -294,7 +306,7 @@ func (intf *linkInterface) handler() error {
defer close(done) defer close(done)
go func() { go func() {
select { select {
case <-intf.link.stopped: case <-intf.links.stopped:
intf.msgIO.close() intf.msgIO.close()
case <-done: case <-done:
} }
@ -302,10 +314,10 @@ func (intf *linkInterface) handler() error {
err = <-intf.reader.err err = <-intf.reader.err
// TODO don't report an error if it's just a 'use of closed network connection' // TODO don't report an error if it's just a 'use of closed network connection'
if err != nil { if err != nil {
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
} else { } else {
intf.link.core.log.Infof("Disconnected %s: %s, source %s", intf.links.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
return err return err
@ -313,6 +325,60 @@ func (intf *linkInterface) handler() error {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// link needs to match the linkInterface type needed by the peers
type linkInterface interface {
out([][]byte)
linkOut([]byte)
close()
// These next ones are only used by the API
name() string
local() string
remote() string
interfaceType() string
}
func (intf *link) out(bss [][]byte) {
intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies
// the peer that it's ready for one
intf.writer.sendFrom(nil, bss)
})
}
func (intf *link) linkOut(bs []byte) {
intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen
// FIXME this is hypothetically not safe, the peer shouldn't be sending
// additional packets until this one finishes, otherwise this could leak
// memory if writing happens slower than link packets are generated...
// that seems unlikely, so it's a lesser evil than deadlocking for now
intf.writer.sendFrom(nil, [][]byte{bs})
})
}
func (intf *link) close() {
intf.Act(nil, func() { intf.msgIO.close() })
}
func (intf *link) name() string {
return intf.lname
}
func (intf *link) local() string {
return intf.info.local
}
func (intf *link) remote() string {
return intf.info.remote
}
func (intf *link) interfaceType() string {
return intf.info.linkType
}
////////////////////////////////////////////////////////////////////////////////
const ( const (
sendTime = 1 * time.Second // How long to wait before deciding a send is blocked sendTime = 1 * time.Second // How long to wait before deciding a send is blocked
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
@ -321,85 +387,78 @@ const (
) )
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { func (intf *link) notifySending(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
if !isLinkTraffic {
intf.inSwitch = false
}
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelStallTimer() if intf.keepAliveTimer != nil {
intf.keepAliveTimer.Stop()
intf.keepAliveTimer = nil
}
intf.peer.notifyBlocked(intf)
}) })
} }
// called by an AfterFunc if we seem to be blocked in a send syscall for a long time
func (intf *linkInterface) _notifySyscall() {
intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._sendingIn(intf.peer.port)
})
}
// we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelStallTimer() {
if intf.stallTimer != nil {
intf.stallTimer.Stop()
intf.stallTimer = nil
}
}
// This gets called from a time.AfterFunc, and notifies the switch that we appear // This gets called from a time.AfterFunc, and notifies the switch that we appear
// to have gotten blocked on a write, so the switch should start routing traffic // to have gotten blocked on a write, so the switch should start routing traffic
// through other links, if alternatives exist // through other links, if alternatives exist
func (intf *linkInterface) notifyBlockedSend() { func (intf *link) notifyBlockedSend() {
intf.Act(nil, func() { intf.Act(nil, func() {
if intf.sendTimer != nil { if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.sendTimer.Stop()
intf.sendTimer = nil
if !intf.shutdown && intf.writeUnblocked {
intf.writeUnblocked = false
intf.links.core.switchTable.blockPeer(intf, intf.peer.port, true)
}
} }
}) })
} }
// notify the intf that we've finished sending, returning the peer to the switch // notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { func (intf *link) notifySent(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
intf.sendTimer.Stop() if intf.sendTimer != nil {
intf.sendTimer = nil intf.sendTimer.Stop()
if !isLinkTraffic { intf.sendTimer = nil
intf._notifySwitch()
} }
if intf.keepAliveTimer != nil {
// TODO? unset this when we start sending, not when we finish...
intf.keepAliveTimer.Stop()
intf.keepAliveTimer = nil
}
intf._notifyIdle()
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
} }
if !intf.shutdown && !intf.writeUnblocked {
intf.writeUnblocked = true
intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, true)
}
}) })
} }
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state // Notify the peer that we're ready for more traffic
func (intf *linkInterface) _notifySwitch() { func (intf *link) _notifyIdle() {
if !intf.inSwitch { intf.peer.Act(intf, intf.peer._handleIdle)
if intf.stalled {
intf.unstalled = false
} else {
intf.inSwitch = true
intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._idleIn(intf.peer.port)
})
}
}
} }
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *link) notifyStalled() {
intf.Act(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.stalled = true if !intf.shutdown && intf.readUnblocked {
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.readUnblocked = false
intf.links.core.switchTable.blockPeer(intf, intf.peer.port, false)
}
} }
}) })
} }
// reset the close timer // reset the close timer
func (intf *linkInterface) notifyReading() { func (intf *link) notifyReading() {
intf.Act(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.closeTimer != nil { if intf.closeTimer != nil {
intf.closeTimer.Stop() intf.closeTimer.Stop()
@ -409,30 +468,29 @@ func (intf *linkInterface) notifyReading() {
} }
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
func (intf *linkInterface) notifyRead(size int) { func (intf *link) notifyRead(size int) {
intf.Act(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
} }
intf.stalled = false if size > 0 && intf.keepAliveTimer == nil {
if !intf.unstalled { intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
intf._notifySwitch()
intf.unstalled = true
} }
if size > 0 && intf.stallTimer == nil { if !intf.shutdown && !intf.readUnblocked {
intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) intf.readUnblocked = true
intf.links.core.switchTable.unblockPeer(intf, intf.peer.port, false)
} }
}) })
} }
// We need to send keep-alive traffic now // We need to send keep-alive traffic now
func (intf *linkInterface) notifyDoKeepAlive() { func (intf *link) notifyDoKeepAlive() {
intf.Act(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.keepAliveTimer != nil {
intf.stallTimer.Stop() intf.keepAliveTimer.Stop()
intf.stallTimer = nil intf.keepAliveTimer = nil
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic
} }
}) })
} }
@ -441,34 +499,23 @@ func (intf *linkInterface) notifyDoKeepAlive() {
type linkWriter struct { type linkWriter struct {
phony.Inbox phony.Inbox
intf *linkInterface intf *link
worker chan [][]byte
closed bool
} }
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) {
w.Act(from, func() { w.Act(from, func() {
if w.closed {
return
}
var size int var size int
for _, bs := range bss { for _, bs := range bss {
size += len(bs) size += len(bs)
} }
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size)
// start a timer that will fire if we get stuck in writeMsgs for an oddly long time w.worker <- bss
var once sync.Once w.intf.notifySent(size)
timer := time.AfterFunc(time.Millisecond, func() {
// 1 ms is kind of arbitrary
// the rationale is that this should be very long compared to a syscall
// but it's still short compared to end-to-end latency or human perception
once.Do(func() {
w.intf.Act(nil, w.intf._notifySyscall)
})
})
w.intf.msgIO.writeMsgs(bss)
// Make sure we either stop the timer from doing anything or wait until it's done
once.Do(func() { timer.Stop() })
w.intf.notifySent(size, isLinkTraffic)
// Cleanup
for _, bs := range bss {
util.PutBytes(bs)
}
}) })
} }
@ -476,7 +523,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
type linkReader struct { type linkReader struct {
phony.Inbox phony.Inbox
intf *linkInterface intf *link
err chan error err chan error
} }

View File

@ -18,6 +18,7 @@ type nodeinfo struct {
myNodeInfo NodeInfoPayload myNodeInfo NodeInfoPayload
callbacks map[crypto.BoxPubKey]nodeinfoCallback callbacks map[crypto.BoxPubKey]nodeinfoCallback
cache map[crypto.BoxPubKey]nodeinfoCached cache map[crypto.BoxPubKey]nodeinfoCached
table *lookupTable
} }
type nodeinfoCached struct { type nodeinfoCached struct {
@ -187,9 +188,9 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
} }
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable) loc := m.table.self
nodeinfo := nodeinfoReqRes{ nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(), SendCoords: loc.getCoords(),
IsResponse: isResponse, IsResponse: isResponse,
NodeInfo: m._getNodeInfo(), NodeInfo: m._getNodeInfo(),
} }

View File

@ -0,0 +1,118 @@
package yggdrasil
import (
"container/heap"
"time"
)
// TODO separate queues per e.g. traffic flow
// For now, we put everything in queue
type pqStreamID string
type pqPacketInfo struct {
packet []byte
time time.Time
}
type pqStream struct {
id pqStreamID
infos []pqPacketInfo
size uint64
}
type packetQueue struct {
streams []pqStream
size uint64
}
// drop will remove a packet from the queue, returning it to the pool
// returns true if a packet was removed, false otherwise
func (q *packetQueue) drop() bool {
if q.size == 0 {
return false
}
var longestIdx int
for idx := range q.streams {
if q.streams[idx].size > q.streams[longestIdx].size {
longestIdx = idx
}
}
stream := q.streams[longestIdx]
info := stream.infos[0]
if len(stream.infos) > 1 {
stream.infos = stream.infos[1:]
stream.size -= uint64(len(info.packet))
q.streams[longestIdx] = stream
q.size -= uint64(len(info.packet))
heap.Fix(q, longestIdx)
} else {
heap.Remove(q, longestIdx)
}
pool_putBytes(info.packet)
return true
}
func (q *packetQueue) push(packet []byte) {
id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
info := pqPacketInfo{packet: packet, time: time.Now()}
for idx := range q.streams {
if q.streams[idx].id == id {
q.streams[idx].infos = append(q.streams[idx].infos, info)
q.streams[idx].size += uint64(len(packet))
q.size += uint64(len(packet))
return
}
}
stream := pqStream{id: id, size: uint64(len(packet))}
stream.infos = append(stream.infos, info)
heap.Push(q, stream)
}
func (q *packetQueue) pop() ([]byte, bool) {
if q.size > 0 {
stream := q.streams[0]
info := stream.infos[0]
if len(stream.infos) > 1 {
stream.infos = stream.infos[1:]
stream.size -= uint64(len(info.packet))
q.streams[0] = stream
q.size -= uint64(len(info.packet))
heap.Fix(q, 0)
} else {
heap.Remove(q, 0)
}
return info.packet, true
}
return nil, false
}
////////////////////////////////////////////////////////////////////////////////
// Interface methods for packetQueue to satisfy heap.Interface
func (q *packetQueue) Len() int {
return len(q.streams)
}
func (q *packetQueue) Less(i, j int) bool {
return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time)
}
func (q *packetQueue) Swap(i, j int) {
q.streams[i], q.streams[j] = q.streams[j], q.streams[i]
}
func (q *packetQueue) Push(x interface{}) {
stream := x.(pqStream)
q.streams = append(q.streams, stream)
q.size += stream.size
}
func (q *packetQueue) Pop() interface{} {
idx := len(q.streams) - 1
stream := q.streams[idx]
q.streams = q.streams[:idx]
q.size -= stream.size
return stream
}

View File

@ -6,12 +6,9 @@ package yggdrasil
import ( import (
"encoding/hex" "encoding/hex"
"sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -21,17 +18,17 @@ import (
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. // In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
type peers struct { type peers struct {
phony.Inbox
core *Core core *Core
mutex sync.Mutex // Synchronize writes to atomic ports map[switchPort]*peer // use CoW semantics, share updated version with each peer
ports atomic.Value //map[switchPort]*peer, use CoW semantics table *lookupTable // Sent from switch, share updated version with each peer
} }
// Initializes the peers struct. // Initializes the peers struct.
func (ps *peers) init(c *Core) { func (ps *peers) init(c *Core) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c ps.core = c
ps.ports = make(map[switchPort]*peer)
ps.table = new(lookupTable)
} }
func (ps *peers) reconfigure() { func (ps *peers) reconfigure() {
@ -80,54 +77,62 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
return ps.core.config.Current.AllowedEncryptionPublicKeys return ps.core.config.Current.AllowedEncryptionPublicKeys
} }
// Atomically gets a map[switchPort]*peer of known peers.
func (ps *peers) getPorts() map[switchPort]*peer {
return ps.ports.Load().(map[switchPort]*peer)
}
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
// Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
phony.Inbox phony.Inbox
core *Core core *Core
intf *linkInterface intf linkInterface
port switchPort port switchPort
box crypto.BoxPubKey box crypto.BoxPubKey
sig crypto.SigPubKey sig crypto.SigPubKey
shared crypto.BoxSharedKey shared crypto.BoxSharedKey
linkShared crypto.BoxSharedKey linkShared crypto.BoxSharedKey
endpoint string endpoint string
firstSeen time.Time // To track uptime for getPeers firstSeen time.Time // To track uptime for getPeers
linkOut func([]byte) // used for protocol traffic (bypasses the switch) dinfo *dhtInfo // used to keep the DHT working
dinfo *dhtInfo // used to keep the DHT working
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
done (chan struct{}) // closed to exit the linkLoop
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
// The below aren't actually useful internally, they're just gathered for getPeers statistics // The below aren't actually useful internally, they're just gathered for getPeers statistics
bytesSent uint64 bytesSent uint64
bytesRecvd uint64 bytesRecvd uint64
ports map[switchPort]*peer
table *lookupTable
queue packetQueue
max uint64
seq uint64 // this and idle are used to detect when to drop packets from queue
idle bool
drop bool // set to true if we're dropping packets from the queue
}
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
ps.Act(from, func() {
ps.table = table
ps._updatePeers()
})
}
func (ps *peers) _updatePeers() {
ports := ps.ports
table := ps.table
for _, peer := range ps.ports {
p := peer // peer is mutated during iteration
p.Act(ps, func() {
p.ports = ports
p.table = table
})
}
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf linkInterface) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
core: ps.core,
intf: intf,
sig: *sig, sig: *sig,
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
linkShared: *linkShared, linkShared: *linkShared,
firstSeen: now, firstSeen: now,
done: make(chan struct{}),
close: closer,
core: ps.core,
intf: intf,
} }
ps.mutex.Lock() oldPorts := ps.ports
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
@ -139,63 +144,62 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
break break
} }
} }
ps.putPorts(newPorts) ps.ports = newPorts
ps._updatePeers()
return &p return &p
} }
// Removes a peer for a given port, if one exists. func (p *peer) _removeSelf() {
func (ps *peers) removePeer(port switchPort) { p.core.peers.Act(p, func() {
if port == 0 { p.core.peers._removePeer(p)
return
} // Can't remove self peer
phony.Block(&ps.core.router, func() {
ps.core.switchTable.forgetPeer(port)
}) })
ps.mutex.Lock() }
oldPorts := ps.getPorts()
p, isIn := oldPorts[port] // Removes a peer for a given port, if one exists.
func (ps *peers) _removePeer(p *peer) {
if q := ps.ports[p.port]; p.port == 0 || q != p {
return
} // Can't remove self peer or nonexistant peer
ps.core.switchTable.forgetPeer(ps, p.port)
oldPorts := ps.ports
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
} }
delete(newPorts, port) delete(newPorts, p.port)
ps.putPorts(newPorts) p.intf.close()
ps.mutex.Unlock() ps.ports = newPorts
if isIn { ps._updatePeers()
if p.close != nil {
p.close()
}
close(p.done)
}
} }
// If called, sends a notification to each peer that they should send a new switch message. // If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update. // Mainly called by the switch after an update.
func (ps *peers) sendSwitchMsgs(from phony.Actor) { func (ps *peers) sendSwitchMsgs(from phony.Actor) {
ports := ps.getPorts() ps.Act(from, func() {
for _, p := range ports { for _, peer := range ps.ports {
if p.port == 0 { p := peer
continue if p.port == 0 {
continue
}
p.Act(ps, p._sendSwitchMsg)
} }
p.Act(from, p._sendSwitchMsg) })
} }
func (ps *peers) updateDHT(from phony.Actor) {
ps.Act(from, func() {
for _, peer := range ps.ports {
p := peer
if p.port == 0 {
continue
}
p.Act(ps, p._updateDHT)
}
})
} }
// This must be launched in a separate goroutine by whatever sets up the peer struct. // This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic.
func (p *peer) start() { func (p *peer) start() {
var updateDHT func()
updateDHT = func() {
phony.Block(p, func() {
select {
case <-p.done:
default:
p._updateDHT()
time.AfterFunc(time.Second, updateDHT)
}
})
}
updateDHT()
// Just for good measure, immediately send a switch message to this peer when we start // Just for good measure, immediately send a switch message to this peer when we start
p.Act(nil, p._sendSwitchMsg) p.Act(nil, p._sendSwitchMsg)
} }
@ -229,37 +233,81 @@ func (p *peer) _handlePacket(packet []byte) {
case wire_LinkProtocolTraffic: case wire_LinkProtocolTraffic:
p._handleLinkTraffic(packet) p._handleLinkTraffic(packet)
default: default:
util.PutBytes(packet)
} }
} }
// Get the coords of a packet without decoding
func peer_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Called to handle traffic or protocolTraffic packets. // Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
func (p *peer) _handleTraffic(packet []byte) { func (p *peer) _handleTraffic(packet []byte) {
table := p.core.switchTable.getTable() if _, isIn := p.table.elems[p.port]; !isIn && p.port != 0 {
if _, isIn := table.elems[p.port]; !isIn && p.port != 0 {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
return return
} }
p.core.switchTable.packetInFrom(p, packet) coords := peer_getPacketCoords(packet)
next := p.table.lookup(coords)
if nPeer, isIn := p.ports[next]; isIn {
nPeer.sendPacketFrom(p, packet)
}
//p.core.switchTable.packetInFrom(p, packet)
} }
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) {
p.Act(from, func() { p.Act(from, func() {
p._sendPackets(packets) p._sendPacket(packet)
}) })
} }
// This just calls p.out(packet) for now. func (p *peer) _sendPacket(packet []byte) {
func (p *peer) _sendPackets(packets [][]byte) { p.queue.push(packet)
// Is there ever a case where something more complicated is needed? if p.idle {
// What if p.out blocks? p.idle = false
var size int p._handleIdle()
for _, packet := range packets { } else if p.drop {
size += len(packet) for p.queue.size > p.max {
p.queue.drop()
}
} }
p.bytesSent += uint64(size) }
p.out(packets)
func (p *peer) _handleIdle() {
var packets [][]byte
var size uint64
for {
if packet, success := p.queue.pop(); success {
packets = append(packets, packet)
size += uint64(len(packet))
} else {
break
}
}
p.seq++
if len(packets) > 0 {
p.bytesSent += uint64(size)
p.intf.out(packets)
p.max = p.queue.size
} else {
p.idle = true
}
p.drop = false
}
func (p *peer) notifyBlocked(from phony.Actor) {
p.Act(from, func() {
seq := p.seq
p.Act(nil, func() {
if seq == p.seq {
p.drop = true
p.max = 2*p.queue.size + streamMsgSize
}
})
})
} }
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
@ -277,7 +325,7 @@ func (p *peer) _sendLinkPacket(packet []byte) {
Payload: bs, Payload: bs,
} }
packet = linkPacket.encode() packet = linkPacket.encode()
p.linkOut(packet) p.intf.linkOut(packet)
} }
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
@ -307,13 +355,12 @@ func (p *peer) _handleLinkTraffic(bs []byte) {
case wire_SwitchMsg: case wire_SwitchMsg:
p._handleSwitchMsg(payload) p._handleSwitchMsg(payload)
default: default:
util.PutBytes(bs)
} }
} }
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
func (p *peer) _sendSwitchMsg() { func (p *peer) _sendSwitchMsg() {
msg := p.core.switchTable.getMsg() msg := p.table.getMsg()
if msg == nil { if msg == nil {
return return
} }
@ -335,7 +382,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
return return
} }
if len(msg.Hops) < 1 { if len(msg.Hops) < 1 {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
var loc switchLocator var loc switchLocator
prevKey := msg.Root prevKey := msg.Root
@ -346,23 +394,31 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
loc.coords = append(loc.coords, hop.Port) loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg) bs := getBytesForSig(&hop.Next, &sigMsg)
if !crypto.Verify(&prevKey, bs, &hop.Sig) { if !crypto.Verify(&prevKey, bs, &hop.Sig) {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
prevKey = hop.Next prevKey = hop.Next
} }
p.core.switchTable.handleMsg(&msg, p.port) p.core.switchTable.Act(p, func() {
if !p.core.switchTable.checkRoot(&msg) { if !p.core.switchTable._checkRoot(&msg) {
// Bad switch message // Bad switch message
p.dinfo = nil p.Act(&p.core.switchTable, func() {
return p.dinfo = nil
} })
// Pass a message to the dht informing it that this peer (still) exists } else {
loc.coords = loc.coords[:len(loc.coords)-1] // handle the message
p.dinfo = &dhtInfo{ p.core.switchTable._handleMsg(&msg, p.port, false)
key: p.box, p.Act(&p.core.switchTable, func() {
coords: loc.getCoords(), // Pass a message to the dht informing it that this peer (still) exists
} loc.coords = loc.coords[:len(loc.coords)-1]
p._updateDHT() p.dinfo = &dhtInfo{
key: p.box,
coords: loc.getCoords(),
}
p._updateDHT()
})
}
})
} }
// This generates the bytes that we sign or check the signature of for a switchMsg. // This generates the bytes that we sign or check the signature of for a switchMsg.

20
src/yggdrasil/pool.go Normal file
View File

@ -0,0 +1,20 @@
package yggdrasil
import "sync"
// Used internally to reduce allocations in the hot loop
// I.e. packets being switched or between the crypto and the switch
// For safety reasons, these must not escape this package
var pool = sync.Pool{New: func() interface{} { return []byte(nil) }}
func pool_getBytes(size int) []byte {
bs := pool.Get().([]byte)
if cap(bs) < size {
bs = make([]byte, size)
}
return bs[:size]
}
func pool_putBytes(bs []byte) {
pool.Put(bs)
}

View File

@ -29,7 +29,6 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -46,6 +45,9 @@ type router struct {
nodeinfo nodeinfo nodeinfo nodeinfo
searches searches searches searches
sessions sessions sessions sessions
intf routerInterface
peer *peer
table *lookupTable // has a copy of our locator
} }
// Initializes the router struct, which includes setting up channels to/from the adapter. // Initializes the router struct, which includes setting up channels to/from the adapter.
@ -53,17 +55,15 @@ func (r *router) init(core *Core) {
r.core = core r.core = core
r.addr = *address.AddrForNodeID(&r.dht.nodeID) r.addr = *address.AddrForNodeID(&r.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.dht.nodeID)
self := linkInterface{ r.intf.router = r
name: "(self)", phony.Block(&r.core.peers, func() {
info: linkInfo{ // FIXME don't block here!
local: "(self)", r.peer = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &r.intf)
remote: "(self)", })
linkType: "self", r.peer.Act(r, r.peer._handleIdle)
}, r.out = func(bs []byte) {
r.peer.handlePacketFrom(r, bs)
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packets [][]byte) { r.handlePackets(p, packets) }
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.nodeinfo.init(r.core) r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock() r.core.config.Mutex.RLock()
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy) r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)
@ -73,6 +73,21 @@ func (r *router) init(core *Core) {
r.sessions.init(r) r.sessions.init(r)
} }
func (r *router) updateTable(from phony.Actor, table *lookupTable) {
r.Act(from, func() {
r.table = table
r.nodeinfo.Act(r, func() {
r.nodeinfo.table = table
})
for _, ses := range r.sessions.sinfos {
sinfo := ses
sinfo.Act(r, func() {
sinfo.table = table
})
}
})
}
// Reconfigures the router and any child modules. This should only ever be run // Reconfigures the router and any child modules. This should only ever be run
// by the router actor. // by the router actor.
func (r *router) reconfigure() { func (r *router) reconfigure() {
@ -97,15 +112,6 @@ func (r *router) start() error {
return nil return nil
} }
// In practice, the switch will call this with 1 packet
func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
r.Act(from, func() {
for _, packet := range packets {
r._handlePacket(packet)
}
})
}
// Insert a peer info into the dht, TODO? make the dht a separate actor // Insert a peer info into the dht, TODO? make the dht a separate actor
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) { func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
r.Act(from, func() { r.Act(from, func() {
@ -126,7 +132,7 @@ func (r *router) reset(from phony.Actor) {
func (r *router) doMaintenance() { func (r *router) doMaintenance() {
phony.Block(r, func() { phony.Block(r, func() {
// Any periodic maintenance stuff goes here // Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance(r)
r.dht.doMaintenance() r.dht.doMaintenance()
r.sessions.cleanup() r.sessions.cleanup()
}) })
@ -151,14 +157,12 @@ func (r *router) _handlePacket(packet []byte) {
// Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets. // Handles incoming traffic, i.e. encapuslated ordinary IPv6 packets.
// Passes them to the crypto session worker to be decrypted and sent to the adapter. // Passes them to the crypto session worker to be decrypted and sent to the adapter.
func (r *router) _handleTraffic(packet []byte) { func (r *router) _handleTraffic(packet []byte) {
defer util.PutBytes(packet)
p := wire_trafficPacket{} p := wire_trafficPacket{}
if !p.decode(packet) { if !p.decode(packet) {
return return
} }
sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle) sinfo, isIn := r.sessions.getSessionForHandle(&p.Handle)
if !isIn { if !isIn {
util.PutBytes(p.Payload)
return return
} }
sinfo.recv(r, &p) sinfo.recv(r, &p)
@ -204,7 +208,6 @@ func (r *router) _handleProto(packet []byte) {
case wire_DHTLookupResponse: case wire_DHTLookupResponse:
r._handleDHTRes(bs, &p.FromKey) r._handleDHTRes(bs, &p.FromKey)
default: default:
util.PutBytes(packet)
} }
} }
@ -252,3 +255,35 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
req.SendPermPub = *fromKey req.SendPermPub = *fromKey
r.nodeinfo.handleNodeInfo(r, &req) r.nodeinfo.handleNodeInfo(r, &req)
} }
////////////////////////////////////////////////////////////////////////////////
// routerInterface is a helper that implements linkInterface
type routerInterface struct {
router *router
}
func (intf *routerInterface) out(bss [][]byte) {
// Note that this is run in the peer's goroutine
intf.router.Act(intf.router.peer, func() {
for _, bs := range bss {
intf.router._handlePacket(bs)
}
})
// This should now immediately make the peer idle again
// So the self-peer shouldn't end up buffering anything
// We let backpressure act as a throttle instead
intf.router.peer._handleIdle()
}
func (intf *routerInterface) linkOut(_ []byte) {}
func (intf *routerInterface) close() {}
func (intf *routerInterface) name() string { return "(self)" }
func (intf *routerInterface) local() string { return "(self)" }
func (intf *routerInterface) remote() string { return "(self)" }
func (intf *routerInterface) interfaceType() string { return "self" }

View File

@ -161,11 +161,10 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
// Initially start a search // Initially start a search
func (sinfo *searchInfo) startSearch() { func (sinfo *searchInfo) startSearch() {
loc := sinfo.searches.router.core.switchTable.getLocator()
var infos []*dhtInfo var infos []*dhtInfo
infos = append(infos, &dhtInfo{ infos = append(infos, &dhtInfo{
key: sinfo.searches.router.core.boxPub, key: sinfo.searches.router.core.boxPub,
coords: loc.getCoords(), coords: sinfo.searches.router.table.self.getCoords(),
}) })
// Start the search by asking ourself, useful if we're the destination // Start the search by asking ourself, useful if we're the destination
sinfo.continueSearch(infos) sinfo.continueSearch(infos)

View File

@ -16,9 +16,6 @@ import (
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
const nonceWindow = time.Second
// All the information we know about an active session. // All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct { type sessionInfo struct {
@ -52,6 +49,7 @@ type sessionInfo struct {
cancel util.Cancellation // Used to terminate workers cancel util.Cancellation // Used to terminate workers
conn *Conn // The associated Conn object conn *Conn // The associated Conn object
callbacks []chan func() // Finished work from crypto workers callbacks []chan func() // Finished work from crypto workers
table *lookupTable // table.self is a locator where we get our coords
} }
// Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -217,6 +215,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle() sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.table = ss.router.table
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
return &sinfo return &sinfo
@ -266,8 +265,7 @@ func (ss *sessions) removeSession(sinfo *sessionInfo) {
// Returns a session ping appropriate for the given session info. // Returns a session ping appropriate for the given session info.
func (sinfo *sessionInfo) _getPing() sessionPing { func (sinfo *sessionInfo) _getPing() sessionPing {
loc := sinfo.sessions.router.core.switchTable.getLocator() coords := sinfo.table.self.getCoords()
coords := loc.getCoords()
ping := sessionPing{ ping := sessionPing{
SendPermPub: sinfo.sessions.router.core.boxPub, SendPermPub: sinfo.sessions.router.core.boxPub,
Handle: sinfo.myHandle, Handle: sinfo.myHandle,
@ -393,14 +391,9 @@ func (sinfo *sessionInfo) _getMTU() MTU {
return sinfo.myMTU return sinfo.myMTU
} }
// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. // Checks if a packet's nonce is newer than any previously received
func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool {
// The bitmask is to allow for some non-duplicate out-of-order packets return theirNonce.Minus(&sinfo.theirNonce) > 0
if theirNonce.Minus(&sinfo.theirNonce) > 0 {
// This is newer than the newest nonce we've seen
return true
}
return time.Since(sinfo.time) < nonceWindow
} }
// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce
@ -455,12 +448,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
select { select {
case <-sinfo.init: case <-sinfo.init:
default: default:
// TODO find a better way to drop things until initialized
util.PutBytes(p.Payload)
return return
} }
if !sinfo._nonceIsOK(&p.Nonce) { if !sinfo._nonceIsOK(&p.Nonce) {
util.PutBytes(p.Payload)
return return
} }
k := sinfo.sharedSesKey k := sinfo.sharedSesKey
@ -470,11 +460,9 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
poolFunc := func() { poolFunc := func() {
bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
callback := func() { callback := func() {
util.PutBytes(p.Payload)
if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
// Either we failed to decrypt, or the session was updated, or we // Either we failed to decrypt, or the session was updated, or we
// received this packet in the mean time // received this packet in the mean time
util.PutBytes(bs)
return return
} }
sinfo._updateNonce(&p.Nonce) sinfo._updateNonce(&p.Nonce)
@ -492,8 +480,6 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
select { select {
case <-sinfo.init: case <-sinfo.init:
default: default:
// TODO find a better way to drop things until initialized
util.PutBytes(msg.Message)
return return
} }
sinfo.bytesSent += uint64(len(msg.Message)) sinfo.bytesSent += uint64(len(msg.Message))
@ -512,14 +498,8 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
ch := make(chan func(), 1) ch := make(chan func(), 1)
poolFunc := func() { poolFunc := func() {
p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
packet := p.encode()
callback := func() { callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode()
// Cleanup
util.PutBytes(msg.Message)
util.PutBytes(p.Payload)
// Send the packet
// TODO replace this with a send to the peer struct if that becomes an actor
sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.Act(sinfo, func() {
sinfo.sessions.router.out(packet) sinfo.sessions.router.out(packet)
}) })

91
src/yggdrasil/simlink.go Normal file
View File

@ -0,0 +1,91 @@
package yggdrasil
import (
"errors"
"github.com/Arceliar/phony"
)
type Simlink struct {
phony.Inbox
rch chan []byte
dest *Simlink
link *link
started bool
}
func (s *Simlink) readMsg() ([]byte, error) {
bs, ok := <-s.rch
if !ok {
return nil, errors.New("read from closed Simlink")
}
return bs, nil
}
func (s *Simlink) _recvMetaBytes() ([]byte, error) {
return s.readMsg()
}
func (s *Simlink) _sendMetaBytes(bs []byte) error {
_, err := s.writeMsgs([][]byte{bs})
return err
}
func (s *Simlink) close() error {
defer func() { recover() }()
close(s.rch)
return nil
}
func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) {
if s.dest == nil {
return 0, errors.New("write to unpaired Simlink")
}
var size int
for _, msg := range msgs {
size += len(msg)
bs := append([]byte(nil), msg...)
phony.Block(s, func() {
s.dest.Act(s, func() {
defer func() { recover() }()
s.dest.rch <- bs
})
})
}
return size, nil
}
func (c *Core) NewSimlink() *Simlink {
s := &Simlink{rch: make(chan []byte, 1)}
n := "Simlink"
var err error
s.link, err = c.links.create(s, n, n, n, n, false, true, linkOptions{})
if err != nil {
panic(err)
}
return s
}
func (s *Simlink) SetDestination(dest *Simlink) error {
var err error
phony.Block(s, func() {
if s.dest != nil {
err = errors.New("destination already set")
} else {
s.dest = dest
}
})
return err
}
func (s *Simlink) Start() error {
var err error
phony.Block(s, func() {
if s.started {
err = errors.New("already started")
} else {
s.started = true
go s.link.handler()
}
})
return err
}

View File

@ -6,12 +6,10 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
// Test that this matches the interface we expect // Test that this matches the interface we expect
var _ = linkInterfaceMsgIO(&stream{}) var _ = linkMsgIO(&stream{})
type stream struct { type stream struct {
rwc io.ReadWriteCloser rwc io.ReadWriteCloser
@ -46,6 +44,9 @@ func (s *stream) writeMsgs(bss [][]byte) (int, error) {
} }
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
_, err := buf.WriteTo(s.rwc) _, err := buf.WriteTo(s.rwc)
for _, bs := range bss {
pool_putBytes(bs)
}
// TODO only include number of bytes from bs *successfully* written? // TODO only include number of bytes from bs *successfully* written?
return written, err return written, err
} }
@ -112,7 +113,7 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) {
if msgLen > streamMsgSize { if msgLen > streamMsgSize {
return nil, errors.New("oversized message") return nil, errors.New("oversized message")
} }
msg := util.ResizeBytes(util.GetBytes(), int(msgLen)) msg := pool_getBytes(int(msgLen))
_, err = io.ReadFull(s.inputBuffer, msg) _, err = io.ReadFull(s.inputBuffer, msg)
return msg, err return msg, err
} }

View File

@ -12,13 +12,9 @@ package yggdrasil
// A little annoying to do with constant changes from backpressure // A little annoying to do with constant changes from backpressure
import ( import (
"math/rand"
"sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -97,6 +93,20 @@ func (l *switchLocator) dist(dest []byte) int {
return dist return dist
} }
func (l *switchLocator) ldist(sl *switchLocator) int {
lca := -1
for idx := 0; idx < len(l.coords); idx++ {
if idx >= len(sl.coords) {
break
}
if l.coords[idx] != sl.coords[idx] {
break
}
lca = idx
}
return len(l.coords) + len(sl.coords) - 2*(lca+1)
}
// Gets coords in wire encoded format, with *no* length prefix. // Gets coords in wire encoded format, with *no* length prefix.
func (l *switchLocator) getCoords() []byte { func (l *switchLocator) getCoords() []byte {
bs := make([]byte, 0, len(l.coords)) bs := make([]byte, 0, len(l.coords))
@ -126,14 +136,19 @@ func (x *switchLocator) isAncestorOf(y *switchLocator) bool {
// Information about a peer, used by the switch to build the tree and eventually make routing decisions. // Information about a peer, used by the switch to build the tree and eventually make routing decisions.
type peerInfo struct { type peerInfo struct {
key crypto.SigPubKey // ID of this peer key crypto.SigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree degree uint64 // Self-reported degree
time time.Time // Time this node was last seen time time.Time // Time this node was last seen
faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
port switchPort // Interface number of this peer port switchPort // Interface number of this peer
msg switchMsg // The wire switchMsg used msg switchMsg // The wire switchMsg used
blocked bool // True if the link is blocked, used to avoid parenting a blocked link readBlock bool // True if the link notified us of a read that blocked too long
writeBlock bool // True of the link notified us of a write that blocked too long
}
func (pinfo *peerInfo) blocked() bool {
return pinfo.readBlock || pinfo.writeBlock
} }
// This is just a uint64 with a named type for clarity reasons. // This is just a uint64 with a named type for clarity reasons.
@ -144,12 +159,15 @@ type tableElem struct {
port switchPort port switchPort
locator switchLocator locator switchLocator
time time.Time time time.Time
next map[switchPort]*tableElem
} }
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). // This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
type lookupTable struct { type lookupTable struct {
self switchLocator self switchLocator
elems map[switchPort]tableElem elems map[switchPort]tableElem // all switch peers, just for sanity checks + API/debugging
_start tableElem // used for lookups
_msg switchMsg
} }
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
@ -158,7 +176,6 @@ type switchData struct {
// All data that's mutable and used by exported Table methods // All data that's mutable and used by exported Table methods
// To be read/written with atomic.Value Store/Load calls // To be read/written with atomic.Value Store/Load calls
locator switchLocator locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map[switchPort]peerInfo peers map[switchPort]peerInfo
msg *switchMsg msg *switchMsg
} }
@ -167,17 +184,11 @@ type switchData struct {
type switchTable struct { type switchTable struct {
core *Core core *Core
key crypto.SigPubKey // Our own key key crypto.SigPubKey // Our own key
phony.Inbox // Owns the below
time time.Time // Time when locator.tstamp was last updated time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData // data switchData //
updater atomic.Value // *sync.Once
table atomic.Value // lookupTable
phony.Inbox // Owns the below
queues switch_buffers // Queues - not atomic so ONLY use through the actor
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
sending map[switchPort]struct{} // peers known to be blocked in a send (somehow)
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -191,47 +202,27 @@ func (t *switchTable) init(core *Core) {
locator := switchLocator{root: t.key, tstamp: now.Unix()} locator := switchLocator{root: t.key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo) peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers} t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{})
t.drop = make(map[crypto.SigPubKey]int64) t.drop = make(map[crypto.SigPubKey]int64)
phony.Block(t, func() { phony.Block(t, t._updateTable)
core.config.Mutex.RLock()
if core.config.Current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize {
t.queues.totalMaxSize = core.config.Current.SwitchOptions.MaxTotalQueueSize
} else {
t.queues.totalMaxSize = SwitchQueueTotalMinSize
}
core.config.Mutex.RUnlock()
t.queues.bufs = make(map[string]switch_buffer)
t.idle = make(map[switchPort]struct{})
t.sending = make(map[switchPort]struct{})
})
} }
func (t *switchTable) reconfigure() { func (t *switchTable) reconfigure() {
// This is where reconfiguration would go, if we had anything useful to do. // This is where reconfiguration would go, if we had anything useful to do.
t.core.link.reconfigure() t.core.links.reconfigure()
t.core.peers.reconfigure() t.core.peers.reconfigure()
} }
// Safely gets a copy of this node's locator.
func (t *switchTable) getLocator() switchLocator {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
}
// Regular maintenance to possibly timeout/reset the root and similar. // Regular maintenance to possibly timeout/reset the root and similar.
func (t *switchTable) doMaintenance() { func (t *switchTable) doMaintenance(from phony.Actor) {
// Periodic maintenance work to keep things internally consistent t.Act(from, func() {
t.mutex.Lock() // Write lock // Periodic maintenance work to keep things internally consistent
defer t.mutex.Unlock() // Release lock when we're done t._cleanRoot()
t.cleanRoot() t._cleanDropped()
t.cleanDropped() })
} }
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
func (t *switchTable) cleanRoot() { func (t *switchTable) _cleanRoot() {
// TODO rethink how this is done?... // TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out // Get rid of the root if it looks like its timed out
now := time.Now() now := time.Now()
@ -255,59 +246,79 @@ func (t *switchTable) cleanRoot() {
t.parent = switchPort(0) t.parent = switchPort(0)
t.time = now t.time = now
if t.data.locator.root != t.key { if t.data.locator.root != t.key {
t.data.seq++ defer t.core.router.reset(nil)
t.updater.Store(&sync.Once{})
t.core.router.reset(nil)
} }
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t._updateTable() // updates base copy of switch msg in lookupTable
t.core.peers.sendSwitchMsgs(t) t.core.peers.sendSwitchMsgs(t)
} }
} }
// Blocks and, if possible, unparents a peer // Blocks and, if possible, unparents a peer
func (t *switchTable) blockPeer(port switchPort) { func (t *switchTable) blockPeer(from phony.Actor, port switchPort, isWrite bool) {
t.mutex.Lock() t.Act(from, func() {
defer t.mutex.Unlock() peer, isIn := t.data.peers[port]
peer, isIn := t.data.peers[port] switch {
if !isIn { case isIn && !isWrite && !peer.readBlock:
return peer.readBlock = true
} case isIn && isWrite && !peer.writeBlock:
peer.blocked = true peer.writeBlock = true
t.data.peers[port] = peer default:
if port != t.parent { return
return
}
t.parent = 0
for _, info := range t.data.peers {
if info.port == port {
continue
} }
t.unlockedHandleMsg(&info.msg, info.port, true) t.data.peers[port] = peer
} defer t._updateTable()
t.unlockedHandleMsg(&peer.msg, peer.port, true) if port != t.parent {
return
}
t.parent = 0
for _, info := range t.data.peers {
if info.port == port {
continue
}
t._handleMsg(&info.msg, info.port, true)
}
t._handleMsg(&peer.msg, peer.port, true)
})
}
func (t *switchTable) unblockPeer(from phony.Actor, port switchPort, isWrite bool) {
t.Act(from, func() {
peer, isIn := t.data.peers[port]
switch {
case isIn && !isWrite && peer.readBlock:
peer.readBlock = false
case isIn && isWrite && peer.writeBlock:
peer.writeBlock = false
default:
return
}
t.data.peers[port] = peer
t._updateTable()
})
} }
// Removes a peer. // Removes a peer.
// Must be called by the router actor with a lambda that calls this. // Must be called by the router actor with a lambda that calls this.
// If the removed peer was this node's parent, it immediately tries to find a new parent. // If the removed peer was this node's parent, it immediately tries to find a new parent.
func (t *switchTable) forgetPeer(port switchPort) { func (t *switchTable) forgetPeer(from phony.Actor, port switchPort) {
t.mutex.Lock() t.Act(from, func() {
defer t.mutex.Unlock() delete(t.data.peers, port)
delete(t.data.peers, port) defer t._updateTable()
t.updater.Store(&sync.Once{}) if port != t.parent {
if port != t.parent { return
return }
} t.parent = 0
t.parent = 0 for _, info := range t.data.peers {
for _, info := range t.data.peers { t._handleMsg(&info.msg, info.port, true)
t.unlockedHandleMsg(&info.msg, info.port, true) }
} })
} }
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup. // This function is called periodically to do that cleanup.
func (t *switchTable) cleanDropped() { func (t *switchTable) _cleanDropped() {
// TODO? only call this after root changes, not periodically // TODO? only call this after root changes, not periodically
for root := range t.drop { for root := range t.drop {
if !firstIsBetter(&root, &t.data.locator.root) { if !firstIsBetter(&root, &t.data.locator.root) {
@ -333,9 +344,7 @@ type switchMsgHop struct {
} }
// This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer. // This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer.
func (t *switchTable) getMsg() *switchMsg { func (t *switchTable) _getMsg() *switchMsg {
t.mutex.RLock()
defer t.mutex.RUnlock()
if t.parent == 0 { if t.parent == 0 {
return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp}
} else if parent, isIn := t.data.peers[t.parent]; isIn { } else if parent, isIn := t.data.peers[t.parent]; isIn {
@ -347,14 +356,18 @@ func (t *switchTable) getMsg() *switchMsg {
} }
} }
func (t *lookupTable) getMsg() *switchMsg {
msg := t._msg
msg.Hops = append([]switchMsgHop(nil), t._msg.Hops...)
return &msg
}
// This function checks that the root information in a switchMsg is OK. // This function checks that the root information in a switchMsg is OK.
// In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout. // In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout.
func (t *switchTable) checkRoot(msg *switchMsg) bool { func (t *switchTable) _checkRoot(msg *switchMsg) bool {
// returns false if it's a dropped root, not a better root, or has an older timestamp // returns false if it's a dropped root, not a better root, or has an older timestamp
// returns true otherwise // returns true otherwise
// used elsewhere to keep inserting peers into the dht only if root info is OK // used elsewhere to keep inserting peers into the dht only if root info is OK
t.mutex.RLock()
defer t.mutex.RUnlock()
dropTstamp, isIn := t.drop[msg.Root] dropTstamp, isIn := t.drop[msg.Root]
switch { switch {
case isIn && dropTstamp >= msg.TStamp: case isIn && dropTstamp >= msg.TStamp:
@ -370,20 +383,13 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool {
} }
} }
// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch.
func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.unlockedHandleMsg(msg, fromPort, false)
}
// This updates the switch with information about a peer. // This updates the switch with information about a peer.
// Then the tricky part, it decides if it should update our own locator as a result. // Then the tricky part, it decides if it should update our own locator as a result.
// That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc... // That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc...
// There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing. // There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing.
// It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order. // It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order.
// Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things. // Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things.
func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) {
// TODO directly use a switchMsg instead of switchMessage + sigs // TODO directly use a switchMsg instead of switchMessage + sigs
now := time.Now() now := time.Now()
// Set up the sender peerInfo // Set up the sender peerInfo
@ -397,6 +403,9 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
sender.key = prevKey sender.key = prevKey
prevKey = hop.Next prevKey = hop.Next
} }
if sender.key == t.key {
return // Don't peer with ourself via different interfaces
}
sender.msg = *msg sender.msg = *msg
sender.port = fromPort sender.port = fromPort
sender.time = now sender.time = now
@ -426,7 +435,8 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
if reprocessing { if reprocessing {
sender.faster = oldSender.faster sender.faster = oldSender.faster
sender.time = oldSender.time sender.time = oldSender.time
sender.blocked = oldSender.blocked sender.readBlock = oldSender.readBlock
sender.writeBlock = oldSender.writeBlock
} else { } else {
sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
for port, peer := range t.data.peers { for port, peer := range t.data.peers {
@ -449,6 +459,9 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
} }
} }
} }
if sender.blocked() != oldSender.blocked() {
doUpdate = true
}
// Update sender // Update sender
t.data.peers[fromPort] = sender t.data.peers[fromPort] = sender
// Decide if we should also update our root info to make the sender our parent // Decide if we should also update our root info to make the sender our parent
@ -486,10 +499,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
case sender.faster[t.parent] >= switch_faster_threshold: case sender.faster[t.parent] >= switch_faster_threshold:
// The is reliably faster than the current parent. // The is reliably faster than the current parent.
updateRoot = true updateRoot = true
case !sender.blocked && oldParent.blocked: case !sender.blocked() && oldParent.blocked():
// Replace a blocked parent // Replace a blocked parent
updateRoot = true updateRoot = true
case reprocessing && sender.blocked && !oldParent.blocked: case reprocessing && sender.blocked() && !oldParent.blocked():
// Don't replace an unblocked parent when reprocessing // Don't replace an unblocked parent when reprocessing
case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
// The sender seems to be reliably faster than the current parent, so switch to them instead. // The sender seems to be reliably faster than the current parent, so switch to them instead.
@ -506,74 +519,109 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
if peer.port == sender.port { if peer.port == sender.port {
continue continue
} }
t.unlockedHandleMsg(&peer.msg, peer.port, true) t._handleMsg(&peer.msg, peer.port, true)
} }
// Process the sender last, to avoid keeping them as a parent if at all possible. // Process the sender last, to avoid keeping them as a parent if at all possible.
t.unlockedHandleMsg(&sender.msg, sender.port, true) t._handleMsg(&sender.msg, sender.port, true)
case now.Sub(t.time) < switch_throttle: case now.Sub(t.time) < switch_throttle:
// We've already gotten an update from this root recently, so ignore this one to avoid flooding. // We've already gotten an update from this root recently, so ignore this one to avoid flooding.
case sender.locator.tstamp > t.data.locator.tstamp: case sender.locator.tstamp > t.data.locator.tstamp:
// The timestamp was updated, so we need to update locally and send to our peers. // The timestamp was updated, so we need to update locally and send to our peers.
updateRoot = true updateRoot = true
} }
// Note that we depend on the LIFO order of the stack of defers here...
if updateRoot { if updateRoot {
doUpdate = true
if !equiv(&sender.locator, &t.data.locator) { if !equiv(&sender.locator, &t.data.locator) {
doUpdate = true defer t.core.router.reset(t)
t.data.seq++
t.core.router.reset(nil)
} }
if t.data.locator.tstamp != sender.locator.tstamp { if t.data.locator.tstamp != sender.locator.tstamp {
t.time = now t.time = now
} }
t.data.locator = sender.locator t.data.locator = sender.locator
t.parent = sender.port t.parent = sender.port
t.core.peers.sendSwitchMsgs(t) defer t.core.peers.sendSwitchMsgs(t)
} }
if true || doUpdate { if doUpdate {
t.updater.Store(&sync.Once{}) t._updateTable()
} }
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// The rest of these are related to the switch worker // The rest of these are related to the switch lookup table
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. func (t *switchTable) _updateTable() {
func (t *switchTable) updateTable() {
// WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table
// Instead of checking distance for every destination every time
// Array of structs, indexed by first coord that differs from self
// Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest
t.mutex.RLock()
defer t.mutex.RUnlock()
newTable := lookupTable{ newTable := lookupTable{
self: t.data.locator.clone(), self: t.data.locator.clone(),
elems: make(map[switchPort]tableElem, len(t.data.peers)), elems: make(map[switchPort]tableElem, len(t.data.peers)),
_msg: *t._getMsg(),
} }
newTable._init()
for _, pinfo := range t.data.peers { for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue } if pinfo.blocked() || pinfo.locator.root != newTable.self.root {
if pinfo.locator.root != newTable.self.root {
continue continue
} }
loc := pinfo.locator.clone() loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems[pinfo.port] = tableElem{ elem := tableElem{
locator: loc, locator: loc,
port: pinfo.port, port: pinfo.port,
time: pinfo.time, time: pinfo.time,
} }
newTable._insert(&elem)
newTable.elems[pinfo.port] = elem
} }
t.table.Store(newTable) t.core.peers.updateTables(t, &newTable)
t.core.router.updateTable(t, &newTable)
} }
// Returns a copy of the atomically-updated table used for switch lookups func (t *lookupTable) _init() {
func (t *switchTable) getTable() lookupTable { // WARNING: this relies on the convention that the self port is 0
t.updater.Load().(*sync.Once).Do(t.updateTable) self := tableElem{locator: t.self} // create self elem
return t.table.Load().(lookupTable) t._start = self // initialize _start to self
t._insert(&self) // insert self into table
}
func (t *lookupTable) _insert(elem *tableElem) {
// This is a helper that should only be run during _updateTable
here := &t._start
for idx := 0; idx <= len(elem.locator.coords); idx++ {
refLoc := here.locator
refLoc.coords = refLoc.coords[:idx] // Note that this is length idx (starts at length 0)
oldDist := refLoc.ldist(&here.locator)
newDist := refLoc.ldist(&elem.locator)
var update bool
switch {
case newDist < oldDist: // new elem is closer to this point in the tree
update = true
case newDist > oldDist: // new elem is too far
case elem.locator.tstamp > refLoc.tstamp: // new elem has a closer timestamp
update = true
case elem.locator.tstamp < refLoc.tstamp: // new elem's timestamp is too old
case elem.time.Before(here.time): // same dist+timestamp, but new elem delivered it faster
update = true
}
if update {
here.port = elem.port
here.locator = elem.locator
here.time = elem.time
// Problem: here is a value, so this doesn't actually update anything...
}
if idx < len(elem.locator.coords) {
if here.next == nil {
here.next = make(map[switchPort]*tableElem)
}
var next *tableElem
var ok bool
if next, ok = here.next[elem.locator.coords[idx]]; !ok {
nextVal := *elem
next = &nextVal
here.next[next.locator.coords[idx]] = next
}
here = next
}
}
} }
// Starts the switch worker // Starts the switch worker
@ -583,306 +631,17 @@ func (t *switchTable) start() error {
return nil return nil
} }
type closerInfo struct { func (t *lookupTable) lookup(coords []byte) switchPort {
elem tableElem var offset int
dist int here := &t._start
} for offset < len(coords) {
port, l := wire_decode_uint64(coords[offset:])
// Return a map of ports onto distance, keeping only ports closer to the destination than this node offset += l
// If the map is empty (or nil), then no peer is closer if next, ok := here.next[switchPort(port)]; ok {
func (t *switchTable) getCloser(dest []byte) []closerInfo { here = next
table := t.getTable()
myDist := table.self.dist(dest)
if myDist == 0 {
// Skip the iteration step if it's impossible to be closer
return nil
}
t.queues.closer = t.queues.closer[:0]
for _, info := range table.elems {
dist := info.locator.dist(dest)
if dist < myDist {
t.queues.closer = append(t.queues.closer, closerInfo{info, dist})
}
}
return t.queues.closer
}
// Returns true if the peer is closer to the destination than ourself
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
table := t.getTable()
if info, isIn := table.elems[port]; isIn {
theirDist := info.locator.dist(dest)
myDist := table.self.dist(dest)
return theirDist < myDist
}
return false
}
// Get the coords of a packet without decoding
func switch_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Returns a unique string for each stream of traffic
// Equal to coords
// The sender may append arbitrary info to the end of coords (as long as it's begins with a 0x00) to designate separate traffic streams
// Currently, it's the IPv6 next header type and the first 2 uint16 of the next header
// This is equivalent to the TCP/UDP protocol numbers and the source / dest ports
// TODO figure out if something else would make more sense (other transport protocols?)
func switch_getPacketStreamID(packet []byte) string {
return string(switch_getPacketCoords(packet))
}
// Returns the flowlabel from a given set of coords
func switch_getFlowLabelFromCoords(in []byte) []byte {
for i, v := range in {
if v == 0 {
return in[i+1:]
}
}
return []byte{}
}
// Find the best port for a given set of coords
func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
table := t.getTable()
var best switchPort
bestDist := table.self.dist(coords)
for to, elem := range table.elems {
dist := elem.locator.dist(coords)
if !(dist < bestDist) {
continue
}
best = to
bestDist = dist
}
return best
}
// Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet)
closer := t.getCloser(coords)
if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer?
self := t.core.peers.getPorts()[0]
self.sendPacketsFrom(t, [][]byte{packet})
return true
}
var best *closerInfo
ports := t.core.peers.getPorts()
for _, cinfo := range closer {
to := ports[cinfo.elem.port]
//_, isIdle := idle[cinfo.elem.port]
_, isSending := sending[cinfo.elem.port]
var update bool
switch {
case to == nil:
// no port was found, ignore it
case isSending:
// the port is busy, ignore it
case best == nil:
// this is the first idle port we've found, so select it until we find a
// better candidate port to use instead
update = true
case cinfo.dist < best.dist:
// the port takes a shorter path/is more direct than our current
// candidate, so select that instead
update = true
case cinfo.dist > best.dist:
// the port takes a longer path/is less direct than our current candidate,
// ignore it
case cinfo.elem.locator.tstamp > best.elem.locator.tstamp:
// has a newer tstamp from the root, so presumably a better path
update = true
case cinfo.elem.locator.tstamp < best.elem.locator.tstamp:
// has a n older tstamp, so presumably a worse path
case cinfo.elem.time.Before(best.elem.time):
// same tstamp, but got it earlier, so presumably a better path
//t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time)
update = true
default:
// the search for a port has finished
}
if update {
b := cinfo // because cinfo gets mutated by the iteration
best = &b
}
}
if best != nil {
if _, isIdle := idle[best.elem.port]; isIdle {
delete(idle, best.elem.port)
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
return true
}
}
// Didn't find anyone idle to send it to
return false
}
// Info about a buffered packet
type switch_packetInfo struct {
bytes []byte
time time.Time // Timestamp of when the packet arrived
}
// Used to keep track of buffered packets
type switch_buffer struct {
packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
size uint64 // Total queue size in bytes
}
type switch_buffers struct {
totalMaxSize uint64
bufs map[string]switch_buffer // Buffers indexed by StreamID
size uint64 // Total size of all buffers, in bytes
maxbufs int
maxsize uint64
closer []closerInfo // Scratch space
}
func (b *switch_buffers) _cleanup(t *switchTable) {
for streamID, buf := range b.bufs {
// Remove queues for which we have no next hop
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets {
util.PutBytes(packet.bytes)
}
b.size -= buf.size
delete(b.bufs, streamID)
}
}
for b.size > b.totalMaxSize {
// Drop a random queue
target := rand.Uint64() % b.size
var size uint64 // running total
for streamID, buf := range b.bufs {
size += buf.size
if size < target {
continue
}
var packet switch_packetInfo
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
b.size -= uint64(len(packet.bytes))
util.PutBytes(packet.bytes)
if len(buf.packets) == 0 {
delete(b.bufs, streamID)
} else {
// Need to update the map, since buf was retrieved by value
b.bufs[streamID] = buf
}
break
}
}
}
// Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) _handleIdle(port switchPort) bool {
// TODO? only send packets for which this is the best next hop that isn't currently blocked sending
to := t.core.peers.getPorts()[port]
if to == nil {
return true
}
var packets [][]byte
var psize int
t.queues._cleanup(t)
now := time.Now()
for psize < 65535 {
var best *string
var bestPriority float64
for streamID, buf := range t.queues.bufs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority >= bestPriority && t.portIsCloser(coords, port) {
b := streamID // copy since streamID is mutated in the loop
best = &b
bestPriority = priority
}
}
if best != nil {
buf := t.queues.bufs[*best]
var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 {
delete(t.queues.bufs, *best)
} else {
// Need to update the map, since buf was retrieved by value
t.queues.bufs[*best] = buf
}
packets = append(packets, packet.bytes)
psize += len(packet.bytes)
} else { } else {
// Finished finding packets
break break
} }
} }
if len(packets) > 0 { return here.port
to.sendPacketsFrom(t, packets)
return true
}
return false
}
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
t.Act(from, func() {
t._packetIn(bytes)
})
}
func (t *switchTable) _packetIn(bytes []byte) {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t._handleIn(bytes, t.idle, t.sending) {
// There's nobody free to take it right now, so queue it for later
packet := switch_packetInfo{bytes, time.Now()}
streamID := switch_getPacketStreamID(packet.bytes)
buf, bufExists := t.queues.bufs[streamID]
buf.packets = append(buf.packets, packet)
buf.size += uint64(len(packet.bytes))
t.queues.size += uint64(len(packet.bytes))
// Keep a track of the max total queue size
if t.queues.size > t.queues.maxsize {
t.queues.maxsize = t.queues.size
}
t.queues.bufs[streamID] = buf
if !bufExists {
// Keep a track of the max total queue count. Only recalculate this
// when the queue is new because otherwise repeating len(dict) might
// cause unnecessary processing overhead
if len(t.queues.bufs) > t.queues.maxbufs {
t.queues.maxbufs = len(t.queues.bufs)
}
}
t.queues._cleanup(t)
}
}
func (t *switchTable) _idleIn(port switchPort) {
// Try to find something to send to this peer
delete(t.sending, port)
if !t._handleIdle(port) {
// Didn't find anything ready to send yet, so stay idle
t.idle[port] = struct{}{}
}
}
func (t *switchTable) _sendingIn(port switchPort) {
if _, isIn := t.idle[port]; !isIn {
t.sending[port] = struct{}{}
}
} }

View File

@ -34,7 +34,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3)
// The TCP listener and information about active TCP connections, to avoid duplication. // The TCP listener and information about active TCP connections, to avoid duplication.
type tcp struct { type tcp struct {
link *link links *links
waitgroup sync.WaitGroup waitgroup sync.WaitGroup
mutex sync.Mutex // Protecting the below mutex sync.Mutex // Protecting the below
listeners map[string]*TcpListener listeners map[string]*TcpListener
@ -95,8 +95,8 @@ func (t *tcp) getAddr() *net.TCPAddr {
} }
// Initializes the struct. // Initializes the struct.
func (t *tcp) init(l *link) error { func (t *tcp) init(l *links) error {
t.link = l t.links = l
t.tls.init(t) t.tls.init(t)
t.mutex.Lock() t.mutex.Lock()
t.calls = make(map[string]struct{}) t.calls = make(map[string]struct{})
@ -104,9 +104,9 @@ func (t *tcp) init(l *link) error {
t.listeners = make(map[string]*TcpListener) t.listeners = make(map[string]*TcpListener)
t.mutex.Unlock() t.mutex.Unlock()
t.link.core.config.Mutex.RLock() t.links.core.config.Mutex.RLock()
defer t.link.core.config.Mutex.RUnlock() defer t.links.core.config.Mutex.RUnlock()
for _, listenaddr := range t.link.core.config.Current.Listen { for _, listenaddr := range t.links.core.config.Current.Listen {
switch listenaddr[:6] { switch listenaddr[:6] {
case "tcp://": case "tcp://":
if _, err := t.listen(listenaddr[6:], nil); err != nil { if _, err := t.listen(listenaddr[6:], nil); err != nil {
@ -117,7 +117,7 @@ func (t *tcp) init(l *link) error {
return err return err
} }
default: default:
t.link.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring") t.links.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring")
} }
} }
@ -135,35 +135,35 @@ func (t *tcp) stop() error {
} }
func (t *tcp) reconfigure() { func (t *tcp) reconfigure() {
t.link.core.config.Mutex.RLock() t.links.core.config.Mutex.RLock()
added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) added := util.Difference(t.links.core.config.Current.Listen, t.links.core.config.Previous.Listen)
deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen) deleted := util.Difference(t.links.core.config.Previous.Listen, t.links.core.config.Current.Listen)
t.link.core.config.Mutex.RUnlock() t.links.core.config.Mutex.RUnlock()
if len(added) > 0 || len(deleted) > 0 { if len(added) > 0 || len(deleted) > 0 {
for _, a := range added { for _, a := range added {
switch a[:6] { switch a[:6] {
case "tcp://": case "tcp://":
if _, err := t.listen(a[6:], nil); err != nil { if _, err := t.listen(a[6:], nil); err != nil {
t.link.core.log.Errorln("Error adding TCP", a[6:], "listener:", err) t.links.core.log.Errorln("Error adding TCP", a[6:], "listener:", err)
} }
case "tls://": case "tls://":
if _, err := t.listen(a[6:], t.tls.forListener); err != nil { if _, err := t.listen(a[6:], t.tls.forListener); err != nil {
t.link.core.log.Errorln("Error adding TLS", a[6:], "listener:", err) t.links.core.log.Errorln("Error adding TLS", a[6:], "listener:", err)
} }
default: default:
t.link.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring") t.links.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring")
} }
} }
for _, d := range deleted { for _, d := range deleted {
if d[:6] != "tcp://" && d[:6] != "tls://" { if d[:6] != "tcp://" && d[:6] != "tls://" {
t.link.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring") t.links.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring")
continue continue
} }
t.mutex.Lock() t.mutex.Lock()
if listener, ok := t.listeners[d[6:]]; ok { if listener, ok := t.listeners[d[6:]]; ok {
t.mutex.Unlock() t.mutex.Unlock()
listener.Stop() listener.Stop()
t.link.core.log.Infoln("Stopped TCP listener:", d[6:]) t.links.core.log.Infoln("Stopped TCP listener:", d[6:])
} else { } else {
t.mutex.Unlock() t.mutex.Unlock()
} }
@ -210,13 +210,13 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
t.mutex.Unlock() t.mutex.Unlock()
// And here we go! // And here we go!
defer func() { defer func() {
t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) t.links.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String())
l.Listener.Close() l.Listener.Close()
t.mutex.Lock() t.mutex.Lock()
delete(t.listeners, listenaddr) delete(t.listeners, listenaddr)
t.mutex.Unlock() t.mutex.Unlock()
}() }()
t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) t.links.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String())
go func() { go func() {
<-l.stop <-l.stop
l.Listener.Close() l.Listener.Close()
@ -225,7 +225,7 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
for { for {
sock, err := l.Listener.Accept() sock, err := l.Listener.Accept()
if err != nil { if err != nil {
t.link.core.log.Errorln("Failed to accept connection:", err) t.links.core.log.Errorln("Failed to accept connection:", err)
return return
} }
t.waitgroup.Add(1) t.waitgroup.Add(1)
@ -355,7 +355,7 @@ func (t *tcp) call(saddr string, options tcpOptions, sintf string) {
} }
conn, err = dialer.Dial("tcp", dst.String()) conn, err = dialer.Dial("tcp", dst.String())
if err != nil { if err != nil {
t.link.core.log.Debugf("Failed to dial %s: %s", callproto, err) t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err)
return return
} }
t.waitgroup.Add(1) t.waitgroup.Add(1)
@ -372,7 +372,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
if options.upgrade != nil { if options.upgrade != nil {
var err error var err error
if sock, err = options.upgrade.upgrade(sock); err != nil { if sock, err = options.upgrade.upgrade(sock); err != nil {
t.link.core.log.Errorln("TCP handler upgrade failed:", err) t.links.core.log.Errorln("TCP handler upgrade failed:", err)
return return
} }
upgraded = true upgraded = true
@ -405,17 +405,20 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
if laddr.IsValid() || lsubnet.IsValid() { if laddr.IsValid() || lsubnet.IsValid() {
// The local address is with the network address/prefix range // The local address is with the network address/prefix range
// This would route ygg over ygg, which we don't want // This would route ygg over ygg, which we don't want
t.link.core.log.Debugln("Dropping ygg-tunneled connection", local, remote) // FIXME ideally this check should happen outside of the core library
// Maybe dial/listen at the application level
// Then pass a net.Conn to the core library (after these kinds of checks are done)
t.links.core.log.Debugln("Dropping ygg-tunneled connection", local, remote)
return return
} }
} }
force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions) link, err := t.links.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions)
if err != nil { if err != nil {
t.link.core.log.Println(err) t.links.core.log.Println(err)
panic(err) panic(err)
} }
t.link.core.log.Debugln("DEBUG: starting handler for", name) t.links.core.log.Debugln("DEBUG: starting handler for", name)
err = link.handler() err = link.handler()
t.link.core.log.Debugln("DEBUG: stopped handler for", name, err) t.links.core.log.Debugln("DEBUG: stopped handler for", name, err)
} }

View File

@ -20,10 +20,10 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
// Log any errors // Log any errors
if bbr != nil { if bbr != nil {
t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr) t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr)
} }
if control != nil { if control != nil {
t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control) t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control)
} }
// Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal // Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal
@ -38,7 +38,7 @@ func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) err
} }
c.Control(btd) c.Control(btd)
if err != nil { if err != nil {
t.link.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf) t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf)
} }
return t.tcpContext(network, address, c) return t.tcpContext(network, address, c)
} }

View File

@ -34,7 +34,7 @@ func (t *tcptls) init(tcp *tcp) {
} }
edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize) edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize)
copy(edpriv[:], tcp.link.core.sigPriv[:]) copy(edpriv[:], tcp.links.core.sigPriv[:])
certBuf := &bytes.Buffer{} certBuf := &bytes.Buffer{}
@ -42,7 +42,7 @@ func (t *tcptls) init(tcp *tcp) {
pubtemp := x509.Certificate{ pubtemp := x509.Certificate{
SerialNumber: big.NewInt(1), SerialNumber: big.NewInt(1),
Subject: pkix.Name{ Subject: pkix.Name{
CommonName: hex.EncodeToString(tcp.link.core.sigPub[:]), CommonName: hex.EncodeToString(tcp.links.core.sigPub[:]),
}, },
NotBefore: time.Now(), NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365), NotAfter: time.Now().Add(time.Hour * 24 * 365),

View File

@ -9,7 +9,6 @@ package yggdrasil
import ( import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
const ( const (
@ -230,8 +229,9 @@ type wire_trafficPacket struct {
} }
// Encodes a wire_trafficPacket into its wire format. // Encodes a wire_trafficPacket into its wire format.
// The returned slice was taken from the pool.
func (p *wire_trafficPacket) encode() []byte { func (p *wire_trafficPacket) encode() []byte {
bs := util.GetBytes() bs := pool_getBytes(0)
bs = wire_put_uint64(wire_Traffic, bs) bs = wire_put_uint64(wire_Traffic, bs)
bs = wire_put_coords(p.Coords, bs) bs = wire_put_coords(p.Coords, bs)
bs = append(bs, p.Handle[:]...) bs = append(bs, p.Handle[:]...)
@ -241,7 +241,9 @@ func (p *wire_trafficPacket) encode() []byte {
} }
// Decodes an encoded wire_trafficPacket into the struct, returning true if successful. // Decodes an encoded wire_trafficPacket into the struct, returning true if successful.
// Either way, the argument slice is added to the pool.
func (p *wire_trafficPacket) decode(bs []byte) bool { func (p *wire_trafficPacket) decode(bs []byte) bool {
defer pool_putBytes(bs)
var pType uint64 var pType uint64
switch { switch {
case !wire_chop_uint64(&pType, &bs): case !wire_chop_uint64(&pType, &bs):
@ -255,7 +257,7 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
case !wire_chop_slice(p.Nonce[:], &bs): case !wire_chop_slice(p.Nonce[:], &bs):
return false return false
} }
p.Payload = append(util.GetBytes(), bs...) p.Payload = append(p.Payload, bs...)
return true return true
} }