a couple race fixes and use timer.AfterFunc instead of sleeping goroutines or ticker in a few places

This commit is contained in:
Arceliar 2019-08-25 17:00:02 -05:00
parent a3d4d8125b
commit 502f2937a9
11 changed files with 66 additions and 98 deletions

View File

@ -23,7 +23,7 @@ type tunConn struct {
addr address.Address addr address.Address
snet address.Subnet snet address.Subnet
stop chan struct{} stop chan struct{}
alive chan struct{} alive *time.Timer // From calling time.AfterFunc
} }
func (s *tunConn) close() { func (s *tunConn) close() {
@ -40,10 +40,6 @@ func (s *tunConn) _close_nomutex() {
defer func() { recover() }() defer func() { recover() }()
close(s.stop) // Closes reader/writer goroutines close(s.stop) // Closes reader/writer goroutines
}() }()
func() {
defer func() { recover() }()
close(s.alive) // Closes timeout goroutine
}()
} }
func (s *tunConn) _read(bs []byte) (err error) { func (s *tunConn) _read(bs []byte) (err error) {
@ -228,27 +224,8 @@ func (s *tunConn) _write(bs []byte) (err error) {
} }
func (s *tunConn) stillAlive() { func (s *tunConn) stillAlive() {
defer func() { recover() }() if s.alive != nil {
select { s.alive.Stop()
case s.alive <- struct{}{}:
default:
}
}
func (s *tunConn) checkForTimeouts() error {
timer := time.NewTimer(tunConnTimeout)
defer util.TimerStop(timer)
defer s.close()
for {
select {
case _, ok := <-s.alive:
if !ok {
return errors.New("connection closed")
}
util.TimerStop(timer)
timer.Reset(tunConnTimeout)
case <-timer.C:
return errors.New("timed out")
}
} }
s.alive = time.AfterFunc(tunConnTimeout, s.close)
} }

View File

@ -230,7 +230,6 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
tun: tun, tun: tun,
conn: conn, conn: conn,
stop: make(chan struct{}), stop: make(chan struct{}),
alive: make(chan struct{}, 1),
} }
c = &s c = &s
// Get the remote address and subnet of the other side // Get the remote address and subnet of the other side
@ -255,13 +254,13 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
// we receive a packet through the interface for this address // we receive a packet through the interface for this address
tun.addrToConn[s.addr] = &s tun.addrToConn[s.addr] = &s
tun.subnetToConn[s.snet] = &s tun.subnetToConn[s.snet] = &s
// Set the read callback and start the timeout goroutine // Set the read callback and start the timeout
conn.SetReadCallback(func(bs []byte) { conn.SetReadCallback(func(bs []byte) {
s.RecvFrom(conn, func() { s.RecvFrom(conn, func() {
s._read(bs) s._read(bs)
}) })
}) })
go s.checkForTimeouts() s.RecvFrom(nil, s.stillAlive)
// Return // Return
return c, err return c, err
} }

View File

@ -344,10 +344,8 @@ func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool)
c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
} }
c.router.doAdmin(sendNodeInfoRequest) c.router.doAdmin(sendNodeInfoRequest)
go func() { timer := time.AfterFunc(6*time.Second, func() { close(response) })
time.Sleep(6 * time.Second) defer timer.Stop()
close(response)
}()
for res := range response { for res := range response {
return *res, nil return *res, nil
} }

View File

@ -552,7 +552,7 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
func DEBUG_simLinkPeers(p, q *peer) { func DEBUG_simLinkPeers(p, q *peer) {
// Sets q.out() to point to p and starts p.linkLoop() // Sets q.out() to point to p and starts p.start()
goWorkers := func(source, dest *peer) { goWorkers := func(source, dest *peer) {
source.linkOut = make(chan []byte, 1) source.linkOut = make(chan []byte, 1)
send := make(chan []byte, 1) send := make(chan []byte, 1)
@ -561,7 +561,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
send <- bs send <- bs
} }
} }
go source.linkLoop() go source.start()
go func() { go func() {
var packets [][]byte var packets [][]byte
for { for {

View File

@ -216,7 +216,7 @@ func (intf *linkInterface) handler() error {
intf.link.core.log.Infof("Connected %s: %s, source %s", intf.link.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start the link loop // Start the link loop
go intf.peer.linkLoop() go intf.peer.start()
// Start the writer // Start the writer
signalReady := make(chan struct{}, 1) signalReady := make(chan struct{}, 1)
signalSent := make(chan bool, 1) signalSent := make(chan bool, 1)

View File

@ -47,8 +47,8 @@ func (m *nodeinfo) init(core *Core) {
m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback) m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback)
m.cache = make(map[crypto.BoxPubKey]nodeinfoCached) m.cache = make(map[crypto.BoxPubKey]nodeinfoCached)
go func() { var f func()
for { f = func() {
m.callbacksMutex.Lock() m.callbacksMutex.Lock()
for boxPubKey, callback := range m.callbacks { for boxPubKey, callback := range m.callbacks {
if time.Since(callback.created) > time.Minute { if time.Since(callback.created) > time.Minute {
@ -63,9 +63,9 @@ func (m *nodeinfo) init(core *Core) {
} }
} }
m.cacheMutex.Unlock() m.cacheMutex.Unlock()
time.Sleep(time.Second * 30) time.AfterFunc(time.Second*30, f)
} }
}() go f()
} }
// Add a callback for a nodeinfo lookup // Add a callback for a nodeinfo lookup

View File

@ -184,18 +184,21 @@ func (ps *peers) sendSwitchMsgs(from phony.Actor) {
// This must be launched in a separate goroutine by whatever sets up the peer struct. // This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic. // It handles link protocol traffic.
func (p *peer) linkLoop() { func (p *peer) start() {
tick := time.NewTicker(time.Second) var updateDHT func()
defer tick.Stop() updateDHT = func() {
<-p.SyncExec(p._sendSwitchMsg) // Startup message <-p.SyncExec(func() {
for {
select { select {
case <-p.done: case <-p.done:
return default:
case _ = <-tick.C: p._updateDHT()
<-p.SyncExec(p._updateDHT) time.AfterFunc(time.Second, updateDHT)
} }
})
} }
updateDHT()
// Just for good measure, immediately send a switch message to this peer when we start
<-p.SyncExec(p._sendSwitchMsg)
} }
func (p *peer) _updateDHT() { func (p *peer) _updateDHT() {

View File

@ -92,7 +92,7 @@ func (r *router) reconfigure(e chan error) {
// Starts the tickerLoop goroutine. // Starts the tickerLoop goroutine.
func (r *router) start() error { func (r *router) start() error {
r.core.log.Infoln("Starting router") r.core.log.Infoln("Starting router")
go r.tickerLoop() go r.doMaintenance()
return nil return nil
} }
@ -122,18 +122,14 @@ func (r *router) reset(from phony.Actor) {
// TODO remove reconfigure so this is just a ticker loop // TODO remove reconfigure so this is just a ticker loop
// and then find something better than a ticker loop to schedule things... // and then find something better than a ticker loop to schedule things...
func (r *router) tickerLoop() { func (r *router) doMaintenance() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
<-r.SyncExec(func() { <-r.SyncExec(func() {
// Any periodic maintenance stuff goes here // Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance()
r.dht.doMaintenance() r.dht.doMaintenance()
r.sessions.cleanup() r.sessions.cleanup()
}) })
} time.AfterFunc(time.Second, r.doMaintenance)
} }
// Checks incoming traffic type and passes it to the appropriate handler. // Checks incoming traffic type and passes it to the appropriate handler.

View File

@ -152,18 +152,16 @@ func (sinfo *searchInfo) continueSearch() {
// In case the search dies, try to spawn another thread later // In case the search dies, try to spawn another thread later
// Note that this will spawn multiple parallel searches as time passes // Note that this will spawn multiple parallel searches as time passes
// Any that die aren't restarted, but a new one will start later // Any that die aren't restarted, but a new one will start later
retryLater := func() { time.AfterFunc(search_RETRY_TIME, func() {
sinfo.searches.router.RecvFrom(nil, func() {
// FIXME this keeps the search alive forever if not for the searches map, fix that // FIXME this keeps the search alive forever if not for the searches map, fix that
newSearchInfo := sinfo.searches.searches[sinfo.dest] newSearchInfo := sinfo.searches.searches[sinfo.dest]
if newSearchInfo != sinfo { if newSearchInfo != sinfo {
return return
} }
sinfo.continueSearch() sinfo.continueSearch()
} })
go func() { })
time.Sleep(search_RETRY_TIME)
sinfo.searches.router.doAdmin(retryLater)
}()
} }
// Calls create search, and initializes the iterative search parts of the struct before returning it. // Calls create search, and initializes the iterative search parts of the struct before returning it.

View File

@ -262,7 +262,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.sessions.removeSession(&sinfo) sinfo.sessions.removeSession(&sinfo)
}) })
}() }()
//go sinfo.startWorkers()
return &sinfo return &sinfo
} }

View File

@ -200,10 +200,8 @@ func (t *switchTable) init(core *Core) {
} }
func (t *switchTable) reconfigure(e chan error) { func (t *switchTable) reconfigure(e chan error) {
go func() {
defer close(e) defer close(e)
// This is where reconfiguration would go, if we had anything useful to do. // This is where reconfiguration would go, if we had anything useful to do.
}()
} }
// Safely gets a copy of this node's locator. // Safely gets a copy of this node's locator.