diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 36e29658..709f7051 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -2,6 +2,12 @@ package tuntap // This manages the tun driver to send/recv packets to/from applications +// TODO: Crypto-key routing +// TODO: Set MTU of session properly +// TODO: Reject packets that exceed session MTU +// TODO: Connection timeouts (call Close() when done) +// TODO: Keep packet that was used to set up a session and send it when done + import ( "encoding/hex" "errors" @@ -38,9 +44,9 @@ type TunAdapter struct { icmpv6 ICMPv6 mtu int iface *water.Interface - mutex sync.RWMutex // Protects the below - addrToConn map[address.Address]*yggdrasil.Conn - subnetToConn map[address.Subnet]*yggdrasil.Conn + mutex sync.RWMutex // Protects the below + addrToConn map[address.Address]*yggdrasil.Conn // Managed by connReader + subnetToConn map[address.Subnet]*yggdrasil.Conn // Managed by connReader isOpen bool } @@ -312,7 +318,7 @@ func (tun *TunAdapter) ifaceReader() error { } } // If we have a connection now, try writing to it - if conn != nil { + if isIn && conn != nil { // If we have an open connection, either because we already had one or // because we opened one above, try writing the packet to it w, err := conn.Write(bs[:n]) diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index a5702d33..4ffa4b17 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -110,21 +110,24 @@ func (c *Conn) Read(b []byte) (int, error) { c.mutex.RLock() sinfo := c.session c.mutex.RUnlock() + // If there is a search in progress then wait for the result + if searching, ok := c.searching.Load().(bool); ok && searching { + <-c.searchwait + } // If the session is not initialised, do nothing. Currently in this instance // in a write, we would trigger a new session, but it doesn't make sense for // us to block forever here if the session will not reopen. // TODO: should this return an error or just a zero-length buffer? if sinfo == nil || !sinfo.init { - // block - <-c.searchwait - // return 0, errors.New("session is closed") + time.Sleep(time.Second) + return 0, errors.New("session is closed") } // Wait for some traffic to come through from the session - fmt.Println("Start select") + fmt.Println(c.String(), "Start select") select { // TODO... case p, ok := <-c.recv: - fmt.Println("Finish select") + fmt.Println(c.String(), "Finish select") // If the session is closed then do nothing if !ok { return 0, errors.New("session is closed") @@ -176,8 +179,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.mutex.RLock() sinfo := c.session c.mutex.RUnlock() - // A search is already taking place so wait for it to finish - if sinfo == nil || !sinfo.init { + // If there is a search in progress then wait for the result + if searching, ok := c.searching.Load().(bool); ok && searching { + <-c.searchwait } // If the session doesn't exist, or isn't initialised (which probably means // that the search didn't complete successfully) then try to search again diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 348a1ed6..68700723 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -24,6 +24,7 @@ package yggdrasil import ( //"bytes" + "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -38,43 +39,12 @@ type router struct { reconfigure chan chan error addr address.Address subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - toRecv chan router_recvPacket // packets to handle via recvPacket() - recv chan<- []byte // place where the adapter pulls received packets from - //send <-chan []byte // place where the adapter puts outgoing packets - reject chan<- RejectedPacket // place where we send error packets back to adapter - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff - cryptokey cryptokey - nodeinfo nodeinfo -} - -// Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the adapter. -type router_recvPacket struct { - bs []byte - sinfo *sessionInfo -} - -// RejectedPacketReason is the type code used to represent the reason that a -// packet was rejected. -type RejectedPacketReason int - -const ( - // The router rejected the packet because it exceeds the session MTU for the - // given destination. In TUN/TAP, this results in the generation of an ICMPv6 - // Packet Too Big message. - PacketTooBig = 1 + iota -) - -// RejectedPacket represents a rejected packet from the router. This is passed -// back to the adapter so that the adapter can respond appropriately, e.g. in -// the case of TUN/TAP, a "PacketTooBig" reason can be used to generate an -// ICMPv6 Packet Too Big response. -type RejectedPacket struct { - Reason RejectedPacketReason - Packet []byte - Detail interface{} + in <-chan []byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff + cryptokey cryptokey + nodeinfo nodeinfo } // Initializes the router struct, which includes setting up channels to/from the adapter. @@ -121,13 +91,6 @@ func (r *router) init(core *Core) { } }() r.out = func(packet []byte) { out2 <- packet } - r.toRecv = make(chan router_recvPacket, 32) - //recv := make(chan []byte, 32) - //send := make(chan []byte, 32) - reject := make(chan RejectedPacket, 32) - //r.recv = recv - //r.send = send - r.reject = reject r.reset = make(chan struct{}, 1) r.admin = make(chan func(), 32) r.nodeinfo.init(r.core) @@ -153,12 +116,8 @@ func (r *router) mainLoop() { defer ticker.Stop() for { select { - case rp := <-r.toRecv: - r.recvPacket(rp.bs, rp.sinfo) case p := <-r.in: r.handleIn(p) - //case p := <-r.send: - // r.sendPacket(p) case info := <-r.core.dht.peers: r.core.dht.insertPeer(info) case <-r.reset: @@ -356,54 +315,6 @@ func (r *router) sendPacket(bs []byte) { } */ -// Called for incoming traffic by the session worker for that connection. -// Checks that the IP address is correct (matches the session) and passes the packet to the adapter. -func (r *router) recvPacket(bs []byte, sinfo *sessionInfo) { - // Note: called directly by the session worker, not the router goroutine - if len(bs) < 24 { - util.PutBytes(bs) - return - } - var sourceAddr address.Address - var dest address.Address - var snet address.Subnet - var addrlen int - if bs[0]&0xf0 == 0x60 { - // IPv6 address - addrlen = 16 - copy(sourceAddr[:addrlen], bs[8:]) - copy(dest[:addrlen], bs[24:]) - copy(snet[:addrlen/2], bs[8:]) - } else if bs[0]&0xf0 == 0x40 { - // IPv4 address - addrlen = 4 - copy(sourceAddr[:addrlen], bs[12:]) - copy(dest[:addrlen], bs[16:]) - } else { - // Unknown address length - return - } - // Check that the packet is destined for either our Yggdrasil address or - // subnet, or that it matches one of the crypto-key routing source routes - if !r.cryptokey.isValidSource(dest, addrlen) { - util.PutBytes(bs) - return - } - // See whether the packet they sent should have originated from this session - switch { - case sourceAddr.IsValid() && sourceAddr == sinfo.theirAddr: - case snet.IsValid() && snet == sinfo.theirSubnet: - default: - key, err := r.cryptokey.getPublicKeyForAddress(sourceAddr, addrlen) - if err != nil || key != sinfo.theirPermPub { - util.PutBytes(bs) - return - } - } - //go func() { r.recv<-bs }() - r.recv <- bs -} - // Checks incoming traffic type and passes it to the appropriate handler. func (r *router) handleIn(packet []byte) { pType, pTypeLen := wire_decode_uint64(packet)