diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 4ffa4b17..8d36a60a 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -15,7 +15,6 @@ type Conn struct { core *Core nodeID *crypto.NodeID nodeMask *crypto.NodeID - recv chan *wire_trafficPacket // Eventually gets attached to session.recv mutex *sync.RWMutex session *sessionInfo readDeadline atomic.Value // time.Time // TODO timer @@ -33,10 +32,11 @@ func (c *Conn) startSearch() { // The searchCompleted callback is given to the search searchCompleted := func(sinfo *sessionInfo, err error) { // Make sure that any blocks on read/write operations are lifted - defer close(c.searchwait) - // Update the connection with the fact that the search completed, which - // allows another search to be triggered if necessary - c.searching.Store(false) + defer func() { + c.searching.Store(false) + close(c.searchwait) + c.searchwait = make(chan interface{}) + }() // If the search failed for some reason, e.g. it hit a dead end or timed // out, then do nothing if err != nil { @@ -64,8 +64,6 @@ func (c *Conn) startSearch() { } // doSearch will be called below in response to one or more conditions doSearch := func() { - // Store the fact that we're searching, so that we don't start additional - // searches until this one has completed c.searching.Store(true) // Check to see if there is a search already matching the destination sinfo, isIn := c.core.searches.searches[*c.nodeID] @@ -73,8 +71,6 @@ func (c *Conn) startSearch() { // Nothing was found, so create a new search sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) - // Allow writes/reads to block until the search is complete - c.searchwait = make(chan interface{}) } // Continue the search c.core.searches.continueSearch(sinfo) @@ -111,23 +107,23 @@ func (c *Conn) Read(b []byte) (int, error) { sinfo := c.session c.mutex.RUnlock() // If there is a search in progress then wait for the result - if searching, ok := c.searching.Load().(bool); ok && searching { + if sinfo == nil { + // Wait for the search to complete <-c.searchwait - } - // If the session is not initialised, do nothing. Currently in this instance - // in a write, we would trigger a new session, but it doesn't make sense for - // us to block forever here if the session will not reopen. - // TODO: should this return an error or just a zero-length buffer? - if sinfo == nil || !sinfo.init { - time.Sleep(time.Second) - return 0, errors.New("session is closed") + // Retrieve our session info again + c.mutex.RLock() + sinfo = c.session + c.mutex.RUnlock() + // If sinfo is still nil at this point then the search failed and the + // searchwait channel has been recreated, so might as well give up and + // return an error code + if sinfo == nil { + return 0, errors.New("search failed") + } } // Wait for some traffic to come through from the session - fmt.Println(c.String(), "Start select") select { - // TODO... - case p, ok := <-c.recv: - fmt.Println(c.String(), "Finish select") + case p, ok := <-sinfo.recv: // If the session is closed then do nothing if !ok { return 0, errors.New("session is closed") @@ -168,10 +164,6 @@ func (c *Conn) Read(b []byte) (int, error) { // If we've reached this point then everything went to plan, return the // number of bytes we populated back into the given slice return len(b), nil - //case <-c.recvTimeout: - //case <-c.session.closed: - // c.expired = true - // return len(b), errors.New("session is closed") } } @@ -179,12 +171,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.mutex.RLock() sinfo := c.session c.mutex.RUnlock() - // If there is a search in progress then wait for the result - if searching, ok := c.searching.Load().(bool); ok && searching { - <-c.searchwait - } // If the session doesn't exist, or isn't initialised (which probably means - // that the search didn't complete successfully) then try to search again + // that the search didn't complete successfully) then we may need to wait for + // the search to complete or start the search again if sinfo == nil || !sinfo.init { // Is a search already taking place? if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) { @@ -192,13 +181,19 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.core.router.doAdmin(func() { c.startSearch() }) - //return 0, errors.New("starting search") } + // Wait for the search to complete <-c.searchwait - if sinfo == nil || !sinfo.init { - return 0, errors.New("search was failed") + // Retrieve our session info again + c.mutex.RLock() + sinfo = c.session + c.mutex.RUnlock() + // If sinfo is still nil at this point then the search failed and the + // searchwait channel has been recreated, so might as well give up and + // return an error code + if sinfo == nil { + return 0, errors.New("search failed") } - //return 0, errors.New("waiting for search to complete") } // defer util.PutBytes(b) var packet []byte diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 89016100..325c6b76 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -59,11 +59,11 @@ func (d *Dialer) Dial(network, address string) (Conn, error) { // NodeID parameters. func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (Conn, error) { conn := Conn{ - core: d.core, - mutex: &sync.RWMutex{}, - nodeID: nodeID, - nodeMask: nodeMask, - recv: make(chan *wire_trafficPacket, 32), + core: d.core, + mutex: &sync.RWMutex{}, + nodeID: nodeID, + nodeMask: nodeMask, + searchwait: make(chan interface{}), } return conn, nil } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 967d5f6b..44c83874 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -458,12 +458,12 @@ func (ss *sessions) handlePing(ping *sessionPing) { // TODO: this should not block if nothing is accepting if !ping.IsPong && ss.listener != nil { conn := &Conn{ - core: ss.core, - session: sinfo, - mutex: &sync.RWMutex{}, - nodeID: crypto.GetNodeID(&sinfo.theirPermPub), - nodeMask: &crypto.NodeID{}, - recv: sinfo.recv, + core: ss.core, + session: sinfo, + mutex: &sync.RWMutex{}, + nodeID: crypto.GetNodeID(&sinfo.theirPermPub), + nodeMask: &crypto.NodeID{}, + searchwait: make(chan interface{}), } for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF