diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 9e237436..18c0b880 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -62,15 +62,18 @@ type Conn struct { nodeMask *crypto.NodeID session *sessionInfo mtu uint16 + readCallback func([]byte) + readBuffer chan []byte } // TODO func NewConn() that initializes additional fields as needed func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { conn := Conn{ - core: core, - nodeID: nodeID, - nodeMask: nodeMask, - session: session, + core: core, + nodeID: nodeID, + nodeMask: nodeMask, + session: session, + readBuffer: make(chan []byte, 1024), } return &conn } @@ -154,6 +157,45 @@ func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) } } +// SetReadCallback sets a callback which will be called whenever a packet is received. +// Note that calls to Read will fail if the callback has been set to a non-nil value. +func (c *Conn) SetReadCallback(callback func([]byte)) { + c.EnqueueFrom(nil, func() { + c._setReadCallback(callback) + }) +} + +func (c *Conn) _setReadCallback(callback func([]byte)) { + c.readCallback = callback + c._drainReadBuffer() +} + +func (c *Conn) _drainReadBuffer() { + if c.readCallback == nil { + return + } + select { + case bs := <-c.readBuffer: + c.readCallback(bs) + c.EnqueueFrom(nil, c._drainReadBuffer) // In case there's more + default: + } +} + +// Called by the session to pass a new message to the Conn +func (c *Conn) recvMsg(from phony.IActor, msg []byte) { + c.EnqueueFrom(from, func() { + if c.readCallback != nil { + c.readCallback(msg) + } else { + select { + case c.readBuffer <- msg: + default: + } + } + }) +} + // Used internally by Read, the caller is responsible for util.PutBytes when they're done. func (c *Conn) ReadNoCopy() ([]byte, error) { var cancel util.Cancellation @@ -170,7 +212,7 @@ func (c *Conn) ReadNoCopy() ([]byte, error) { } else { return nil, ConnError{errors.New("session closed"), false, false, true, 0} } - case bs := <-c.session.toConn: + case bs := <-c.readBuffer: return bs, nil } } @@ -278,6 +320,7 @@ func (c *Conn) LocalAddr() crypto.NodeID { } func (c *Conn) RemoteAddr() crypto.NodeID { + // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... var n crypto.NodeID <-c.SyncExec(func() { n = *c.nodeID }) return n @@ -290,11 +333,13 @@ func (c *Conn) SetDeadline(t time.Time) error { } func (c *Conn) SetReadDeadline(t time.Time) error { + // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... <-c.SyncExec(func() { c.readDeadline = &t }) return nil } func (c *Conn) SetWriteDeadline(t time.Time) error { + // TODO warn that this can block while waiting for the Conn actor to run, so don't call it from other actors... <-c.SyncExec(func() { c.writeDeadline = &t }) return nil } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 854eb24d..a483a059 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -70,7 +70,6 @@ type sessionInfo struct { bytesRecvd uint64 // Bytes of real traffic received in this session init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use cancel util.Cancellation // Used to terminate workers - toConn chan []byte // Decrypted packets go here, picked up by the associated Conn conn *Conn // The associated Conn object callbacks []chan func() // Finished work from crypto workers } @@ -254,7 +253,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.toConn = make(chan []byte, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle go func() { @@ -505,15 +503,10 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { util.PutBytes(p.Payload) return } - switch { - case sinfo._nonceIsOK(&p.Nonce): - case len(sinfo.toConn) < cap(sinfo.toConn): - default: - // We're either full or don't like this nonce + if !sinfo._nonceIsOK(&p.Nonce) { util.PutBytes(p.Payload) return } - k := sinfo.sharedSesKey var isOK bool var bs []byte @@ -530,16 +523,7 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { sinfo._updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) - select { - case sinfo.toConn <- bs: - case <-sinfo.cancel.Finished(): - util.PutBytes(bs) - default: - // We seem to have filled up the buffer in the mean time - // Since we need to not block, but the conn isn't an actor, we need to drop this packet - // TODO find some nicer way to interact with the Conn... - util.PutBytes(bs) - } + sinfo.conn.recvMsg(sinfo, bs) } ch <- callback sinfo.checkCallbacks()