Clean up router, tweaks

This commit is contained in:
Neil Alexander 2019-04-22 23:12:13 +01:00
parent 6e528799e9
commit e1a2d666bf
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 28 additions and 107 deletions

View File

@ -2,6 +2,12 @@ package tuntap
// This manages the tun driver to send/recv packets to/from applications
// TODO: Crypto-key routing
// TODO: Set MTU of session properly
// TODO: Reject packets that exceed session MTU
// TODO: Connection timeouts (call Close() when done)
// TODO: Keep packet that was used to set up a session and send it when done
import (
"encoding/hex"
"errors"
@ -38,9 +44,9 @@ type TunAdapter struct {
icmpv6 ICMPv6
mtu int
iface *water.Interface
mutex sync.RWMutex // Protects the below
addrToConn map[address.Address]*yggdrasil.Conn
subnetToConn map[address.Subnet]*yggdrasil.Conn
mutex sync.RWMutex // Protects the below
addrToConn map[address.Address]*yggdrasil.Conn // Managed by connReader
subnetToConn map[address.Subnet]*yggdrasil.Conn // Managed by connReader
isOpen bool
}
@ -312,7 +318,7 @@ func (tun *TunAdapter) ifaceReader() error {
}
}
// If we have a connection now, try writing to it
if conn != nil {
if isIn && conn != nil {
// If we have an open connection, either because we already had one or
// because we opened one above, try writing the packet to it
w, err := conn.Write(bs[:n])

View File

@ -110,21 +110,24 @@ func (c *Conn) Read(b []byte) (int, error) {
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
// If there is a search in progress then wait for the result
if searching, ok := c.searching.Load().(bool); ok && searching {
<-c.searchwait
}
// If the session is not initialised, do nothing. Currently in this instance
// in a write, we would trigger a new session, but it doesn't make sense for
// us to block forever here if the session will not reopen.
// TODO: should this return an error or just a zero-length buffer?
if sinfo == nil || !sinfo.init {
// block
<-c.searchwait
// return 0, errors.New("session is closed")
time.Sleep(time.Second)
return 0, errors.New("session is closed")
}
// Wait for some traffic to come through from the session
fmt.Println("Start select")
fmt.Println(c.String(), "Start select")
select {
// TODO...
case p, ok := <-c.recv:
fmt.Println("Finish select")
fmt.Println(c.String(), "Finish select")
// If the session is closed then do nothing
if !ok {
return 0, errors.New("session is closed")
@ -176,8 +179,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
// A search is already taking place so wait for it to finish
if sinfo == nil || !sinfo.init {
// If there is a search in progress then wait for the result
if searching, ok := c.searching.Load().(bool); ok && searching {
<-c.searchwait
}
// If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then try to search again

View File

@ -24,6 +24,7 @@ package yggdrasil
import (
//"bytes"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
@ -38,43 +39,12 @@ type router struct {
reconfigure chan chan error
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
toRecv chan router_recvPacket // packets to handle via recvPacket()
recv chan<- []byte // place where the adapter pulls received packets from
//send <-chan []byte // place where the adapter puts outgoing packets
reject chan<- RejectedPacket // place where we send error packets back to adapter
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
nodeinfo nodeinfo
}
// Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the adapter.
type router_recvPacket struct {
bs []byte
sinfo *sessionInfo
}
// RejectedPacketReason is the type code used to represent the reason that a
// packet was rejected.
type RejectedPacketReason int
const (
// The router rejected the packet because it exceeds the session MTU for the
// given destination. In TUN/TAP, this results in the generation of an ICMPv6
// Packet Too Big message.
PacketTooBig = 1 + iota
)
// RejectedPacket represents a rejected packet from the router. This is passed
// back to the adapter so that the adapter can respond appropriately, e.g. in
// the case of TUN/TAP, a "PacketTooBig" reason can be used to generate an
// ICMPv6 Packet Too Big response.
type RejectedPacket struct {
Reason RejectedPacketReason
Packet []byte
Detail interface{}
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
nodeinfo nodeinfo
}
// Initializes the router struct, which includes setting up channels to/from the adapter.
@ -121,13 +91,6 @@ func (r *router) init(core *Core) {
}
}()
r.out = func(packet []byte) { out2 <- packet }
r.toRecv = make(chan router_recvPacket, 32)
//recv := make(chan []byte, 32)
//send := make(chan []byte, 32)
reject := make(chan RejectedPacket, 32)
//r.recv = recv
//r.send = send
r.reject = reject
r.reset = make(chan struct{}, 1)
r.admin = make(chan func(), 32)
r.nodeinfo.init(r.core)
@ -153,12 +116,8 @@ func (r *router) mainLoop() {
defer ticker.Stop()
for {
select {
case rp := <-r.toRecv:
r.recvPacket(rp.bs, rp.sinfo)
case p := <-r.in:
r.handleIn(p)
//case p := <-r.send:
// r.sendPacket(p)
case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info)
case <-r.reset:
@ -356,54 +315,6 @@ func (r *router) sendPacket(bs []byte) {
}
*/
// Called for incoming traffic by the session worker for that connection.
// Checks that the IP address is correct (matches the session) and passes the packet to the adapter.
func (r *router) recvPacket(bs []byte, sinfo *sessionInfo) {
// Note: called directly by the session worker, not the router goroutine
if len(bs) < 24 {
util.PutBytes(bs)
return
}
var sourceAddr address.Address
var dest address.Address
var snet address.Subnet
var addrlen int
if bs[0]&0xf0 == 0x60 {
// IPv6 address
addrlen = 16
copy(sourceAddr[:addrlen], bs[8:])
copy(dest[:addrlen], bs[24:])
copy(snet[:addrlen/2], bs[8:])
} else if bs[0]&0xf0 == 0x40 {
// IPv4 address
addrlen = 4
copy(sourceAddr[:addrlen], bs[12:])
copy(dest[:addrlen], bs[16:])
} else {
// Unknown address length
return
}
// Check that the packet is destined for either our Yggdrasil address or
// subnet, or that it matches one of the crypto-key routing source routes
if !r.cryptokey.isValidSource(dest, addrlen) {
util.PutBytes(bs)
return
}
// See whether the packet they sent should have originated from this session
switch {
case sourceAddr.IsValid() && sourceAddr == sinfo.theirAddr:
case snet.IsValid() && snet == sinfo.theirSubnet:
default:
key, err := r.cryptokey.getPublicKeyForAddress(sourceAddr, addrlen)
if err != nil || key != sinfo.theirPermPub {
util.PutBytes(bs)
return
}
}
//go func() { r.recv<-bs }()
r.recv <- bs
}
// Checks incoming traffic type and passes it to the appropriate handler.
func (r *router) handleIn(packet []byte) {
pType, pTypeLen := wire_decode_uint64(packet)