From df1239b054aee0c124193daff5e6f3f0b231af34 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 25 Nov 2020 02:44:13 -0600 Subject: [PATCH 1/3] attempting to debug/fix a possible goroutine leak --- src/yggdrasil/link.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 5222644b..779c435d 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -460,9 +460,6 @@ func (intf *link) notifyStalled() { // reset the close timer func (intf *link) notifyReading() { intf.Act(&intf.reader, func() { - if intf.closeTimer != nil { - intf.closeTimer.Stop() - } intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() }) }) } @@ -470,6 +467,7 @@ func (intf *link) notifyReading() { // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic func (intf *link) notifyRead(size int) { intf.Act(&intf.reader, func() { + intf.closeTimer.Stop() if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil From 1daf3e7bd70491f763d45905bcf43ffb1b95426e Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 13 Dec 2020 16:16:14 -0600 Subject: [PATCH 2/3] remove link.go block on oldIntf if we already have a connection to the same node, this spams connections, so it's not a good long-term fix if that's where the goroutine leak is --- src/yggdrasil/link.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 779c435d..63c9a309 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -252,16 +252,12 @@ func (intf *link) handler() error { intf.info.box = meta.box intf.info.sig = meta.sig intf.links.mutex.Lock() - if oldIntf, isIn := intf.links.links[intf.info]; isIn { + if _, isIn := intf.links.links[intf.info]; isIn { intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.msgIO.close() - if !intf.incoming { - // Block outgoing connection attempts until the existing connection closes - <-oldIntf.closed - } return nil } else { intf.closed = make(chan struct{}) From a8810c7ee9bff05e8fd74d68ab83f9b944b5c622 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 13 Dec 2020 16:29:03 -0600 Subject: [PATCH 3/3] if the link handler exits early due to an existing connection, then have it return a channel to that connection which closes when the connection is closed, so we can choose to block on that to avoid spamming connection attempts with dial --- src/yggdrasil/link.go | 28 ++++++++++++++-------------- src/yggdrasil/tcp.go | 17 +++++++++++------ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 63c9a309..2ee2f312 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -187,7 +187,7 @@ func (l *links) stop() error { return nil } -func (intf *link) handler() error { +func (intf *link) handler() (chan struct{}, error) { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later go func() { for bss := range intf.writer.worker { @@ -207,38 +207,38 @@ func (intf *link) handler() error { // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) var err error if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) { - return errors.New("timeout on metadata send") + return nil, errors.New("timeout on metadata send") } if err != nil { - return err + return nil, err } if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) { - return errors.New("timeout on metadata recv") + return nil, errors.New("timeout on metadata recv") } if err != nil { - return err + return nil, err } meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { - return errors.New("failed to decode metadata") + return nil, errors.New("failed to decode metadata") } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) - return errors.New("failed to connect: wrong version") + return nil, errors.New("failed to connect: wrong version") } // Check if the remote side matches the keys we expected. This is a bit of a weak // check - in future versions we really should check a signature or something like that. if pinned := intf.options.pinnedCurve25519Keys; pinned != nil { if _, allowed := pinned[meta.box]; !allowed { intf.links.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name) - return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys") + return nil, fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys") } } if pinned := intf.options.pinnedEd25519Keys; pinned != nil { if _, allowed := pinned[meta.sig]; !allowed { intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name) - return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys") + return nil, fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys") } } // Check if we're authorized to connect to this key / IP @@ -246,19 +246,19 @@ func (intf *link) handler() error { intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) intf.msgIO.close() - return nil + return nil, nil } // Check if we already have a link to this node intf.info.box = meta.box intf.info.sig = meta.sig intf.links.mutex.Lock() - if _, isIn := intf.links.links[intf.info]; isIn { + if oldIntf, isIn := intf.links.links[intf.info]; isIn { intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name) intf.msgIO.close() - return nil + return oldIntf.closed, nil } else { intf.closed = make(chan struct{}) intf.links.links[intf.info] = intf @@ -278,7 +278,7 @@ func (intf *link) handler() error { intf.peer = intf.links.core.peers._newPeer(&meta.box, &meta.sig, shared, intf) }) if intf.peer == nil { - return errors.New("failed to create peer") + return nil, errors.New("failed to create peer") } defer func() { // More cleanup can go here @@ -316,7 +316,7 @@ func (intf *link) handler() error { intf.links.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } - return err + return nil, err } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index ad9d872e..10c2b1eb 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -299,7 +299,9 @@ func (t *tcp) call(saddr string, options tcpOptions, sintf string) { } t.waitgroup.Add(1) options.socksPeerAddr = conn.RemoteAddr().String() - t.handler(conn, false, options) + if ch := t.handler(conn, false, options); ch != nil { + <-ch + } } else { dst, err := net.ResolveTCPAddr("tcp", saddr) if err != nil { @@ -365,12 +367,14 @@ func (t *tcp) call(saddr string, options tcpOptions, sintf string) { return } t.waitgroup.Add(1) - t.handler(conn, false, options) + if ch := t.handler(conn, false, options); ch != nil { + <-ch + } } }() } -func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) { +func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) chan struct{} { defer t.waitgroup.Done() // Happens after sock.close defer sock.Close() t.setExtraOptions(sock) @@ -379,7 +383,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) { var err error if sock, err = options.upgrade.upgrade(sock); err != nil { t.links.core.log.Errorln("TCP handler upgrade failed:", err) - return + return nil } upgraded = true } @@ -415,7 +419,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) { // Maybe dial/listen at the application level // Then pass a net.Conn to the core library (after these kinds of checks are done) t.links.core.log.Debugln("Dropping ygg-tunneled connection", local, remote) - return + return nil } } force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() @@ -425,6 +429,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) { panic(err) } t.links.core.log.Debugln("DEBUG: starting handler for", name) - err = link.handler() + ch, err := link.handler() t.links.core.log.Debugln("DEBUG: stopped handler for", name, err) + return ch }