diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index 50860f36..ce4645f3 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -3,6 +3,7 @@ package tuntap import ( "errors" + "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" ) @@ -10,11 +11,21 @@ import ( type tunConn struct { tun *TunAdapter conn *yggdrasil.Conn + addr address.Address + snet address.Subnet send chan []byte stop chan interface{} } func (s *tunConn) close() { + s.tun.mutex.Lock() + s._close_nomutex() + s.tun.mutex.Unlock() +} + +func (s *tunConn) _close_nomutex() { + delete(s.tun.addrToConn, s.addr) + delete(s.tun.subnetToConn, s.snet) close(s.stop) } @@ -32,6 +43,7 @@ func (s *tunConn) reader() error { b := make([]byte, 65535) for { go func() { + // TODO read timeout and close if n, err = s.conn.Read(b); err != nil { s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err) return @@ -72,6 +84,7 @@ func (s *tunConn) writer() error { if !ok { return errors.New("send closed") } + // TODO write timeout and close if _, err := s.conn.Write(b); err != nil { s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err) } diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 310e4211..b9ff04ec 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -236,26 +236,26 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { } // Get the remote address and subnet of the other side remoteNodeID := conn.RemoteAddr() - remoteAddr := address.AddrForNodeID(&remoteNodeID) - remoteSubnet := address.SubnetForNodeID(&remoteNodeID) + s.addr = *address.AddrForNodeID(&remoteNodeID) + s.snet = *address.SubnetForNodeID(&remoteNodeID) // Work out if this is already a destination we already know about tun.mutex.Lock() defer tun.mutex.Unlock() - atc, aok := tun.addrToConn[*remoteAddr] - stc, sok := tun.subnetToConn[*remoteSubnet] + atc, aok := tun.addrToConn[s.addr] + stc, sok := tun.subnetToConn[s.snet] // If we know about a connection for this destination already then assume it // is no longer valid and close it if aok { - atc.close() + atc._close_nomutex() err = errors.New("replaced connection for address") } else if sok { - stc.close() + stc._close_nomutex() err = errors.New("replaced connection for subnet") } // Save the session wrapper so that we can look it up quickly next time // we receive a packet through the interface for this address - tun.addrToConn[*remoteAddr] = &s - tun.subnetToConn[*remoteSubnet] = &s + tun.addrToConn[s.addr] = &s + tun.subnetToConn[s.snet] = &s // Start the connection goroutines go s.reader() go s.writer() diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index aad1dcdd..1290ad0d 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -37,6 +37,7 @@ type Conn struct { writeDeadline atomic.Value // time.Time // TODO timer searching atomic.Value // bool searchwait chan struct{} // Never reset this, it's only used for the initial search + writebuf [][]byte // Packets to be sent if/when the search finishes } // TODO func NewConn() that initializes additional fields as needed @@ -60,23 +61,13 @@ func (c *Conn) String() string { func (c *Conn) startSearch() { // The searchCompleted callback is given to the search searchCompleted := func(sinfo *sessionInfo, err error) { + defer c.searching.Store(false) // If the search failed for some reason, e.g. it hit a dead end or timed // out, then do nothing if err != nil { c.core.log.Debugln(c.String(), "DHT search failed:", err) - go func() { - time.Sleep(time.Second) - c.mutex.RLock() - closed := c.closed - c.mutex.RUnlock() - if !closed { - // Restart the search, or else Write can stay blocked forever - c.core.router.admin <- c.startSearch - } - }() return } - defer c.searching.Store(false) // Take the connection mutex c.mutex.Lock() defer c.mutex.Unlock() @@ -102,6 +93,16 @@ func (c *Conn) startSearch() { // Things were closed before the search returned // Go ahead and close it again to make sure the session is cleaned up go c.Close() + } else { + // Send any messages we may have buffered + var msgs [][]byte + msgs, c.writebuf = c.writebuf, nil + go func() { + for _, msg := range msgs { + c.Write(msg) + util.PutBytes(msg) + } + }() } } // doSearch will be called below in response to one or more conditions @@ -238,8 +239,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.mutex.RLock() sinfo := c.session c.mutex.RUnlock() - timer := getDeadlineTimer(&c.writeDeadline) - defer util.TimerStop(timer) // If the session doesn't exist, or isn't initialised (which probably means // that the search didn't complete successfully) then we may need to wait for // the search to complete or start the search again @@ -249,22 +248,15 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { // No search was already taking place so start a new one c.core.router.doAdmin(c.startSearch) } - // Wait for the search to complete - select { - case <-c.searchwait: - case <-timer.C: - return 0, ConnError{errors.New("Timeout"), true, false} - } - // Retrieve our session info again - c.mutex.RLock() - sinfo = c.session - c.mutex.RUnlock() - // If sinfo is still nil at this point then the search failed and the - // searchwait channel has been recreated, so might as well give up and - // return an error code - if sinfo == nil { - return 0, errors.New("search failed") + // Buffer the packet to be sent if/when the search is finished + c.mutex.Lock() + defer c.mutex.Unlock() + c.writebuf = append(c.writebuf, append(util.GetBytes(), b...)) + for len(c.writebuf) > 32 { + util.PutBytes(c.writebuf[0]) + c.writebuf = c.writebuf[1:] } + return len(b), nil } var packet []byte done := make(chan struct{}) @@ -283,13 +275,17 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { packet = p.encode() sinfo.bytesSent += uint64(len(b)) } + // Set up a timer so this doesn't block forever + timer := getDeadlineTimer(&c.writeDeadline) + defer util.TimerStop(timer) // Hand over to the session worker select { // Send to worker case sinfo.worker <- workerFunc: case <-timer.C: return 0, ConnError{errors.New("Timeout"), true, false} } - <-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) + // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) + <-done // Give the packet to the router sinfo.core.router.out(packet) // Finally return the number of bytes we wrote