From 6e528799e9fa6e9c8f547f71a2d1450a6faa0650 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 22 Apr 2019 22:38:37 +0100 Subject: [PATCH] Conn Read/Write operations will block while search completes --- src/tuntap/tun.go | 2 +- src/yggdrasil/conn.go | 23 +++++++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index c65c15e8..36e29658 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -216,7 +216,7 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error { for { n, err := conn.Read(b) if err != nil { - //tun.log.Errorln(conn.String(), "TUN/TAP conn read error:", err) + tun.log.Errorln(conn.String(), "TUN/TAP conn read error:", err) continue } if n == 0 { diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 903152d2..a5702d33 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -21,6 +21,7 @@ type Conn struct { readDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer searching atomic.Value // bool + searchwait chan interface{} } func (c *Conn) String() string { @@ -31,6 +32,8 @@ func (c *Conn) String() string { func (c *Conn) startSearch() { // The searchCompleted callback is given to the search searchCompleted := func(sinfo *sessionInfo, err error) { + // Make sure that any blocks on read/write operations are lifted + defer close(c.searchwait) // Update the connection with the fact that the search completed, which // allows another search to be triggered if necessary c.searching.Store(false) @@ -70,6 +73,8 @@ func (c *Conn) startSearch() { // Nothing was found, so create a new search sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) + // Allow writes/reads to block until the search is complete + c.searchwait = make(chan interface{}) } // Continue the search c.core.searches.continueSearch(sinfo) @@ -110,12 +115,16 @@ func (c *Conn) Read(b []byte) (int, error) { // us to block forever here if the session will not reopen. // TODO: should this return an error or just a zero-length buffer? if sinfo == nil || !sinfo.init { - return 0, errors.New("session is closed") + // block + <-c.searchwait + // return 0, errors.New("session is closed") } // Wait for some traffic to come through from the session + fmt.Println("Start select") select { // TODO... case p, ok := <-c.recv: + fmt.Println("Finish select") // If the session is closed then do nothing if !ok { return 0, errors.New("session is closed") @@ -167,6 +176,9 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.mutex.RLock() sinfo := c.session c.mutex.RUnlock() + // A search is already taking place so wait for it to finish + if sinfo == nil || !sinfo.init { + } // If the session doesn't exist, or isn't initialised (which probably means // that the search didn't complete successfully) then try to search again if sinfo == nil || !sinfo.init { @@ -176,10 +188,13 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { c.core.router.doAdmin(func() { c.startSearch() }) - return 0, errors.New("starting search") + //return 0, errors.New("starting search") } - // A search is already taking place so wait for it to finish - return 0, errors.New("waiting for search to complete") + <-c.searchwait + if sinfo == nil || !sinfo.init { + return 0, errors.New("search was failed") + } + //return 0, errors.New("waiting for search to complete") } // defer util.PutBytes(b) var packet []byte