Fix Conn.Read/Conn.Write behavior after Conn.Close, get rid of second TUN/TAP conn reader goroutine, no longer use deadlines

This commit is contained in:
Neil Alexander 2019-07-17 21:42:17 +01:00
parent 1bf1c6eb36
commit 307b24d8cb
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 54 additions and 61 deletions

View File

@ -43,7 +43,7 @@ func (s *tunConn) _close_nomutex() {
}() }()
} }
func (s *tunConn) reader() error { func (s *tunConn) reader() (err error) {
select { select {
case _, ok := <-s.stop: case _, ok := <-s.stop:
if !ok { if !ok {
@ -51,55 +51,29 @@ func (s *tunConn) reader() error {
} }
default: default:
} }
s.tun.log.Debugln("Starting conn reader for", s) s.tun.log.Debugln("Starting conn reader for", s.conn.String())
defer s.tun.log.Debugln("Stopping conn reader for", s) defer s.tun.log.Debugln("Stopping conn reader for", s.conn.String())
var n int var n int
var err error
read := make(chan bool)
b := make([]byte, 65535) b := make([]byte, 65535)
go func() {
s.tun.log.Debugln("Starting conn reader helper for", s)
defer s.tun.log.Debugln("Stopping conn reader helper for", s)
for {
s.conn.SetReadDeadline(time.Now().Add(tunConnTimeout))
if n, err = s.conn.Read(b); err != nil {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err)
if e, eok := err.(yggdrasil.ConnError); eok {
s.tun.log.Debugln("Conn reader helper", s, "error:", e)
switch {
case e.Temporary():
fallthrough
case e.Timeout():
read <- false
continue
case e.Closed():
fallthrough
default:
s.close()
return
}
}
read <- false
}
read <- true
}
}()
for { for {
select { select {
case r, ok := <-read:
if r && n > 0 {
bs := append(util.GetBytes(), b[:n]...)
select {
case s.tun.send <- bs:
default:
util.PutBytes(bs)
}
}
if ok {
s.stillAlive() // TODO? Only stay alive if we read >0 bytes?
}
case <-s.stop: case <-s.stop:
return nil return nil
default:
}
if n, err = s.conn.Read(b); err != nil {
if e, eok := err.(yggdrasil.ConnError); eok && !e.Temporary() {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err)
return e
}
} else if n > 0 {
bs := append(util.GetBytes(), b[:n]...)
select {
case s.tun.send <- bs:
default:
util.PutBytes(bs)
}
s.stillAlive()
} }
} }
} }
@ -112,8 +86,8 @@ func (s *tunConn) writer() error {
} }
default: default:
} }
s.tun.log.Debugln("Starting conn writer for", s) s.tun.log.Debugln("Starting conn writer for", s.conn.String())
defer s.tun.log.Debugln("Stopping conn writer for", s) defer s.tun.log.Debugln("Stopping conn writer for", s.conn.String())
for { for {
select { select {
case <-s.stop: case <-s.stop:

View File

@ -49,7 +49,7 @@ type Conn struct {
nodeID *crypto.NodeID nodeID *crypto.NodeID
nodeMask *crypto.NodeID nodeMask *crypto.NodeID
mutex sync.RWMutex mutex sync.RWMutex
closed bool close chan bool
session *sessionInfo session *sessionInfo
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
@ -62,6 +62,7 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
session: session, session: session,
close: make(chan bool),
} }
return &conn return &conn
} }
@ -127,12 +128,14 @@ func (c *Conn) Read(b []byte) (int, error) {
for { for {
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
select { select {
case <-c.close:
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, false, 0} return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
case p, ok := <-sinfo.recv: case p, ok := <-sinfo.recv:
// If the session is closed then do nothing // If the session is closed then do nothing
if !ok { if !ok {
return 0, ConnError{errors.New("session is closed"), false, false, true, 0} return 0, ConnError{errors.New("session closed"), false, false, true, 0}
} }
defer util.PutBytes(p.Payload) defer util.PutBytes(p.Payload)
var err error var err error
@ -167,16 +170,26 @@ func (c *Conn) Read(b []byte) (int, error) {
// Hand over to the session worker // Hand over to the session worker
defer func() { defer func() {
if recover() != nil { if recover() != nil {
err = errors.New("read failed, session already closed") err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0}
close(done) close(done)
} }
}() // In case we're racing with a close }() // In case we're racing with a close
select { // Send to worker // Send to worker
select {
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-c.close:
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, false, 0} return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
}
// Wait for the worker to finish
select {
case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
case <-c.close:
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
case <-timer.C:
return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
} }
<-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
// Something went wrong in the session worker so abort // Something went wrong in the session worker so abort
if err != nil { if err != nil {
if ce, ok := err.(*ConnError); ok && ce.Temporary() { if ce, ok := err.(*ConnError); ok && ce.Temporary() {
@ -257,7 +270,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, false, 0} return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
} }
// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<-done <-done
@ -269,16 +282,21 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
return written, err return written, err
} }
func (c *Conn) Close() error { func (c *Conn) Close() (err error) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.session != nil { if c.session != nil {
// Close the session, if it hasn't been closed already // Close the session, if it hasn't been closed already
c.core.router.doAdmin(c.session.close) c.core.router.doAdmin(c.session.close)
} }
// This can't fail yet - TODO? func() {
c.closed = true defer func() {
return nil recover()
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
}()
close(c.close) // Closes reader/writer goroutines
}()
return
} }
func (c *Conn) LocalAddr() crypto.NodeID { func (c *Conn) LocalAddr() crypto.NodeID {

View File

@ -439,14 +439,14 @@ func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) {
*/ */
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/*
func (c *Core) DEBUG_addPeer(addr string) { func (c *Core) DEBUG_addPeer(addr string) {
err := c.admin.addPeer(addr, "") err := c.admin.addPeer(addr, "")
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
*/
/* /*
func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) { func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) {
go func() { go func() {
@ -541,13 +541,14 @@ func (c *Core) DEBUG_setIfceExpr(expr *regexp.Regexp) {
c.log.Println("DEBUG_setIfceExpr no longer implemented") c.log.Println("DEBUG_setIfceExpr no longer implemented")
} }
/*
func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) { func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {
err := c.admin.addAllowedEncryptionPublicKey(boxStr) err := c.admin.addAllowedEncryptionPublicKey(boxStr)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
*/
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
func DEBUG_simLinkPeers(p, q *peer) { func DEBUG_simLinkPeers(p, q *peer) {