Link state tracking tweaks and improved shutdown

This commit is contained in:
Neil Alexander 2024-08-11 10:42:25 +01:00
parent ef989bef63
commit b1283e15f6
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
5 changed files with 40 additions and 50 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -41,6 +42,7 @@ type links struct {
wss *linkWSS // WSS interface support wss *linkWSS // WSS interface support
// _links can only be modified safely from within the links actor // _links can only be modified safely from within the links actor
_links map[linkInfo]*link // *link is nil if connection in progress _links map[linkInfo]*link // *link is nil if connection in progress
_listeners map[*Listener]context.CancelFunc
} }
type linkProtocol interface { type linkProtocol interface {
@ -85,13 +87,6 @@ func (l *Listener) Addr() net.Addr {
return l.listener.Addr() return l.listener.Addr()
} }
func (l *Listener) Close() error {
l.Cancel()
err := l.listener.Close()
<-l.ctx.Done()
return err
}
func (l *links) init(c *Core) error { func (l *links) init(c *Core) error {
l.core = c l.core = c
l.tcp = l.newLinkTCP() l.tcp = l.newLinkTCP()
@ -102,32 +97,18 @@ func (l *links) init(c *Core) error {
l.ws = l.newLinkWS() l.ws = l.newLinkWS()
l.wss = l.newLinkWSS() l.wss = l.newLinkWSS()
l._links = make(map[linkInfo]*link) l._links = make(map[linkInfo]*link)
l._listeners = make(map[*Listener]context.CancelFunc)
var listeners []ListenAddress
phony.Block(c, func() {
listeners = make([]ListenAddress, 0, len(c.config._listeners))
for listener := range c.config._listeners {
listeners = append(listeners, listener)
}
})
return nil return nil
} }
func (l *links) shutdown() { func (l *links) shutdown() {
phony.Block(l.tcp, func() { phony.Block(l, func() {
for l := range l.tcp._listeners { for listener := range l._listeners {
_ = l.Close() _ = listener.listener.Close()
} }
}) for _, link := range l._links {
phony.Block(l.tls, func() { _ = link._conn.Close()
for l := range l.tls._listeners {
_ = l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
_ = l.Close()
} }
}) })
} }
@ -457,11 +438,18 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
options.password = []byte(p) options.password = []byte(p)
} }
phony.Block(l, func() {
l._listeners[li] = cancel
})
go func() { go func() {
l.core.log.Infof("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr()) l.core.log.Infof("%s listener started on %s", strings.ToUpper(u.Scheme), li.listener.Addr())
defer l.core.log.Infof("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr()) defer l.core.log.Infof("%s listener stopped on %s", strings.ToUpper(u.Scheme), li.listener.Addr())
defer phony.Block(l, func() {
delete(l._listeners, li)
})
for { for {
conn, err := listener.Accept() conn, err := li.listener.Accept()
if err != nil { if err != nil {
return return
} }
@ -517,13 +505,22 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
// Store the state of the link so that it can be queried later. // Store the state of the link so that it can be queried later.
l._links[info] = state l._links[info] = state
}) })
defer phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})
if lc == nil { if lc == nil {
return return
} }
// Give the connection to the handler. The handler will block // Give the connection to the handler. The handler will block
// for the lifetime of the connection. // for the lifetime of the connection.
if err = l.handler(linkTypeIncoming, options, lc, nil); err != nil && err != io.EOF { switch err = l.handler(linkTypeIncoming, options, lc, nil); {
case err == nil:
case errors.Is(err, io.EOF):
case errors.Is(err, net.ErrClosed):
default:
l.core.log.Debugf("Link %s error: %s\n", u.Host, err) l.core.log.Debugf("Link %s error: %s\n", u.Host, err)
} }
@ -531,11 +528,6 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
// try to close the underlying socket just in case and then // try to close the underlying socket just in case and then
// drop the link state. // drop the link state.
_ = lc.Close() _ = lc.Close()
phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})
}(conn) }(conn)
} }
}() }()

View File

@ -15,7 +15,6 @@ type linkTCP struct {
phony.Inbox phony.Inbox
*links *links
listenconfig *net.ListenConfig listenconfig *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
} }
func (l *links) newLinkTCP() *linkTCP { func (l *links) newLinkTCP() *linkTCP {
@ -24,7 +23,6 @@ func (l *links) newLinkTCP() *linkTCP {
listenconfig: &net.ListenConfig{ listenconfig: &net.ListenConfig{
KeepAlive: -1, KeepAlive: -1,
}, },
_listeners: map[*Listener]context.CancelFunc{},
} }
lt.listenconfig.Control = lt.tcpContext lt.listenconfig.Control = lt.tcpContext
return lt return lt

View File

@ -16,7 +16,6 @@ type linkTLS struct {
tcp *linkTCP tcp *linkTCP
listener *net.ListenConfig listener *net.ListenConfig
config *tls.Config config *tls.Config
_listeners map[*Listener]context.CancelFunc
} }
func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS { func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS {
@ -28,7 +27,6 @@ func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS {
KeepAlive: -1, KeepAlive: -1,
}, },
config: l.core.config.tls.Clone(), config: l.core.config.tls.Clone(),
_listeners: map[*Listener]context.CancelFunc{},
} }
return lt return lt
} }

View File

@ -14,7 +14,6 @@ type linkUNIX struct {
*links *links
dialer *net.Dialer dialer *net.Dialer
listener *net.ListenConfig listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
} }
func (l *links) newLinkUNIX() *linkUNIX { func (l *links) newLinkUNIX() *linkUNIX {
@ -27,7 +26,6 @@ func (l *links) newLinkUNIX() *linkUNIX {
listener: &net.ListenConfig{ listener: &net.ListenConfig{
KeepAlive: -1, KeepAlive: -1,
}, },
_listeners: map[*Listener]context.CancelFunc{},
} }
return lt return lt
} }

View File

@ -14,6 +14,7 @@ import (
type linkWS struct { type linkWS struct {
phony.Inbox phony.Inbox
*links *links
listenconfig *net.ListenConfig
} }
type linkWSConn struct { type linkWSConn struct {
@ -78,6 +79,9 @@ func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (l *links) newLinkWS() *linkWS { func (l *links) newLinkWS() *linkWS {
lt := &linkWS{ lt := &linkWS{
links: l, links: l,
listenconfig: &net.ListenConfig{
KeepAlive: -1,
},
} }
return lt return lt
} }
@ -95,7 +99,7 @@ func (l *linkWS) dial(ctx context.Context, url *url.URL, info linkInfo, options
} }
func (l *linkWS) listen(ctx context.Context, url *url.URL, _ string) (net.Listener, error) { func (l *linkWS) listen(ctx context.Context, url *url.URL, _ string) (net.Listener, error) {
nl, err := net.Listen("tcp", url.Host) nl, err := l.listenconfig.Listen(ctx, "tcp", url.Host)
if err != nil { if err != nil {
return nil, err return nil, err
} }