It works, sort of, amazingly

This commit is contained in:
Neil Alexander 2019-04-22 23:58:59 +01:00
parent e1a2d666bf
commit d7a1c04748
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 41 additions and 46 deletions

View File

@ -15,7 +15,6 @@ type Conn struct {
core *Core core *Core
nodeID *crypto.NodeID nodeID *crypto.NodeID
nodeMask *crypto.NodeID nodeMask *crypto.NodeID
recv chan *wire_trafficPacket // Eventually gets attached to session.recv
mutex *sync.RWMutex mutex *sync.RWMutex
session *sessionInfo session *sessionInfo
readDeadline atomic.Value // time.Time // TODO timer readDeadline atomic.Value // time.Time // TODO timer
@ -33,10 +32,11 @@ func (c *Conn) startSearch() {
// The searchCompleted callback is given to the search // The searchCompleted callback is given to the search
searchCompleted := func(sinfo *sessionInfo, err error) { searchCompleted := func(sinfo *sessionInfo, err error) {
// Make sure that any blocks on read/write operations are lifted // Make sure that any blocks on read/write operations are lifted
defer close(c.searchwait) defer func() {
// Update the connection with the fact that the search completed, which
// allows another search to be triggered if necessary
c.searching.Store(false) c.searching.Store(false)
close(c.searchwait)
c.searchwait = make(chan interface{})
}()
// If the search failed for some reason, e.g. it hit a dead end or timed // If the search failed for some reason, e.g. it hit a dead end or timed
// out, then do nothing // out, then do nothing
if err != nil { if err != nil {
@ -64,8 +64,6 @@ func (c *Conn) startSearch() {
} }
// doSearch will be called below in response to one or more conditions // doSearch will be called below in response to one or more conditions
doSearch := func() { doSearch := func() {
// Store the fact that we're searching, so that we don't start additional
// searches until this one has completed
c.searching.Store(true) c.searching.Store(true)
// Check to see if there is a search already matching the destination // Check to see if there is a search already matching the destination
sinfo, isIn := c.core.searches.searches[*c.nodeID] sinfo, isIn := c.core.searches.searches[*c.nodeID]
@ -73,8 +71,6 @@ func (c *Conn) startSearch() {
// Nothing was found, so create a new search // Nothing was found, so create a new search
sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo)
// Allow writes/reads to block until the search is complete
c.searchwait = make(chan interface{})
} }
// Continue the search // Continue the search
c.core.searches.continueSearch(sinfo) c.core.searches.continueSearch(sinfo)
@ -111,23 +107,23 @@ func (c *Conn) Read(b []byte) (int, error) {
sinfo := c.session sinfo := c.session
c.mutex.RUnlock() c.mutex.RUnlock()
// If there is a search in progress then wait for the result // If there is a search in progress then wait for the result
if searching, ok := c.searching.Load().(bool); ok && searching { if sinfo == nil {
// Wait for the search to complete
<-c.searchwait <-c.searchwait
// Retrieve our session info again
c.mutex.RLock()
sinfo = c.session
c.mutex.RUnlock()
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0, errors.New("search failed")
} }
// If the session is not initialised, do nothing. Currently in this instance
// in a write, we would trigger a new session, but it doesn't make sense for
// us to block forever here if the session will not reopen.
// TODO: should this return an error or just a zero-length buffer?
if sinfo == nil || !sinfo.init {
time.Sleep(time.Second)
return 0, errors.New("session is closed")
} }
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
fmt.Println(c.String(), "Start select")
select { select {
// TODO... case p, ok := <-sinfo.recv:
case p, ok := <-c.recv:
fmt.Println(c.String(), "Finish select")
// If the session is closed then do nothing // If the session is closed then do nothing
if !ok { if !ok {
return 0, errors.New("session is closed") return 0, errors.New("session is closed")
@ -168,10 +164,6 @@ func (c *Conn) Read(b []byte) (int, error) {
// If we've reached this point then everything went to plan, return the // If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice // number of bytes we populated back into the given slice
return len(b), nil return len(b), nil
//case <-c.recvTimeout:
//case <-c.session.closed:
// c.expired = true
// return len(b), errors.New("session is closed")
} }
} }
@ -179,12 +171,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock() c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock() c.mutex.RUnlock()
// If there is a search in progress then wait for the result
if searching, ok := c.searching.Load().(bool); ok && searching {
<-c.searchwait
}
// If the session doesn't exist, or isn't initialised (which probably means // If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then try to search again // that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again
if sinfo == nil || !sinfo.init { if sinfo == nil || !sinfo.init {
// Is a search already taking place? // Is a search already taking place?
if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) { if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) {
@ -192,13 +181,19 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.core.router.doAdmin(func() { c.core.router.doAdmin(func() {
c.startSearch() c.startSearch()
}) })
//return 0, errors.New("starting search")
} }
// Wait for the search to complete
<-c.searchwait <-c.searchwait
if sinfo == nil || !sinfo.init { // Retrieve our session info again
return 0, errors.New("search was failed") c.mutex.RLock()
sinfo = c.session
c.mutex.RUnlock()
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0, errors.New("search failed")
} }
//return 0, errors.New("waiting for search to complete")
} }
// defer util.PutBytes(b) // defer util.PutBytes(b)
var packet []byte var packet []byte

View File

@ -63,7 +63,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (Conn, err
mutex: &sync.RWMutex{}, mutex: &sync.RWMutex{},
nodeID: nodeID, nodeID: nodeID,
nodeMask: nodeMask, nodeMask: nodeMask,
recv: make(chan *wire_trafficPacket, 32), searchwait: make(chan interface{}),
} }
return conn, nil return conn, nil
} }

View File

@ -463,7 +463,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
mutex: &sync.RWMutex{}, mutex: &sync.RWMutex{},
nodeID: crypto.GetNodeID(&sinfo.theirPermPub), nodeID: crypto.GetNodeID(&sinfo.theirPermPub),
nodeMask: &crypto.NodeID{}, nodeMask: &crypto.NodeID{},
recv: sinfo.recv, searchwait: make(chan interface{}),
} }
for i := range conn.nodeMask { for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF conn.nodeMask[i] = 0xFF