diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 1828b556..fa05fff0 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -3,12 +3,12 @@ package yggdrasil import ( "errors" "fmt" - "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) // ConnError implements the net.Error interface @@ -54,10 +54,10 @@ func (e *ConnError) Closed() bool { } type Conn struct { + phony.Actor core *Core - readDeadline atomic.Value // time.Time // TODO timer - writeDeadline atomic.Value // time.Time // TODO timer - mutex sync.RWMutex // protects the below + readDeadline *time.Time + writeDeadline *time.Time nodeID *crypto.NodeID nodeMask *crypto.NodeID session *sessionInfo @@ -75,9 +75,9 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session } func (c *Conn) String() string { - c.mutex.RLock() - defer c.mutex.RUnlock() - return fmt.Sprintf("conn=%p", c) + var s string + <-c.SyncExec(func() { s = fmt.Sprintf("conn=%p", c) }) + return s } // This should never be called from the router goroutine, used in the dial functions @@ -137,10 +137,10 @@ func (c *Conn) doSearch() { go c.core.router.doAdmin(routerWork) } -func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation, bool) { - if deadline, ok := value.Load().(time.Time); ok { +func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) { + if t != nil { // A deadline is set, so return a Cancellation that uses it - c := util.CancellationWithDeadline(c.session.cancel, deadline) + c := util.CancellationWithDeadline(c.session.cancel, *t) return c, true } else { // No deadline was set, so just return the existinc cancellation and a dummy value @@ -150,7 +150,9 @@ func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation, // Used internally by Read, the caller is responsible for util.PutBytes when they're done. func (c *Conn) ReadNoCopy() ([]byte, error) { - cancel, doCancel := c.getDeadlineCancellation(&c.readDeadline) + var cancel util.Cancellation + var doCancel bool + <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) }) if doCancel { defer cancel.Cancel(nil) } @@ -210,7 +212,9 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { } c.session.doFunc(sessionFunc) if err == nil { - cancel, doCancel := c.getDeadlineCancellation(&c.writeDeadline) + var cancel util.Cancellation + var doCancel bool + <-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) }) if doCancel { defer cancel.Cancel(nil) } @@ -240,14 +244,14 @@ func (c *Conn) Write(b []byte) (int, error) { } func (c *Conn) Close() (err error) { - c.mutex.Lock() - defer c.mutex.Unlock() - if c.session != nil { - // Close the session, if it hasn't been closed already - if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { - err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + <-c.SyncExec(func() { + if c.session != nil { + // Close the session, if it hasn't been closed already + if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil { + err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0} + } } - } + }) return } @@ -256,9 +260,9 @@ func (c *Conn) LocalAddr() crypto.NodeID { } func (c *Conn) RemoteAddr() crypto.NodeID { - c.mutex.RLock() - defer c.mutex.RUnlock() - return *c.nodeID + var n crypto.NodeID + <-c.SyncExec(func() { n = *c.nodeID }) + return n } func (c *Conn) SetDeadline(t time.Time) error { @@ -268,11 +272,11 @@ func (c *Conn) SetDeadline(t time.Time) error { } func (c *Conn) SetReadDeadline(t time.Time) error { - c.readDeadline.Store(t) + <-c.SyncExec(func() { c.readDeadline = &t }) return nil } func (c *Conn) SetWriteDeadline(t time.Time) error { - c.writeDeadline.Store(t) + <-c.SyncExec(func() { c.writeDeadline = &t }) return nil }