Merge pull request #1036 from yggdrasil-network/neil/linktweaks

Tweak link state locking, add comments, listener priority, other fixes
This commit is contained in:
Neil 2023-05-21 00:06:43 +01:00 committed by GitHub
commit 52709696a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 150 additions and 100 deletions

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/Arceliar/ironwood/network" "github.com/Arceliar/ironwood/network"
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
) )
@ -76,19 +75,19 @@ func (c *Core) GetPeers() []PeerInfo {
for info, state := range c.links._links { for info, state := range c.links._links {
var peerinfo PeerInfo var peerinfo PeerInfo
var conn net.Conn var conn net.Conn
phony.Block(state, func() { state.RLock()
peerinfo.URI = info.uri peerinfo.URI = info.uri
peerinfo.LastError = state._err peerinfo.LastError = state._err
peerinfo.LastErrorTime = state._errtime peerinfo.LastErrorTime = state._errtime
if c := state._conn; c != nil { if c := state._conn; c != nil {
conn = c conn = c
peerinfo.Up = true peerinfo.Up = true
peerinfo.Inbound = info.linkType == linkTypeIncoming peerinfo.Inbound = state.linkType == linkTypeIncoming
peerinfo.RXBytes = c.rx peerinfo.RXBytes = c.rx
peerinfo.TXBytes = c.tx peerinfo.TXBytes = c.tx
peerinfo.Uptime = time.Since(c.up) peerinfo.Uptime = time.Since(c.up)
} }
}) state.RUnlock()
if p, ok := conns[conn]; ok { if p, ok := conns[conn]; ok {
peerinfo.Key = p.Key peerinfo.Key = p.Key
peerinfo.Root = p.Root peerinfo.Root = p.Root

View File

@ -46,23 +46,19 @@ type linkProtocol interface {
// linkInfo is used as a map key // linkInfo is used as a map key
type linkInfo struct { type linkInfo struct {
uri string // Peering URI in complete form uri string // Peering URI in complete form
sintf string // Peering source interface (i.e. from InterfacePeers) sintf string // Peering source interface (i.e. from InterfacePeers)
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
} }
// link tracks the state of a connection, either persistent or non-persistent // link tracks the state of a connection, either persistent or non-persistent
type link struct { type link struct {
phony.Inbox kick chan struct{} // Attempt to reconnect now, if backing off
ctx context.Context // linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
cancel context.CancelFunc // linkProto string // Protocol carrier of link, e.g. TCP, AWDL
kick chan struct{} // Attempt to reconnect now, if backing off sync.RWMutex // Protects the below
info linkInfo // _conn *linkConn // Connected link, if any, nil if not connected
linkProto string // Protocol carrier of link, e.g. TCP, AWDL _err error // Last error on the connection, if any
_conn *linkConn // Connected link, if any, nil if not connected _errtime time.Time // Last time an error occured
_err error // Last error on the connection, if any
_errtime time.Time // Last time an error occured
} }
type linkOptions struct { type linkOptions struct {
@ -125,16 +121,6 @@ func (l *links) shutdown() {
}) })
} }
func (l *links) isConnectedTo(info linkInfo) bool {
l.RLock()
link, ok := l._links[info]
l.RUnlock()
if !ok {
return false
}
return link._conn != nil
}
type linkError string type linkError string
func (e linkError) Error() string { return string(e) } func (e linkError) Error() string { return string(e) }
@ -149,30 +135,8 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// have an open peering to this peer. // have an open peering to this peer.
lu := urlForLinkInfo(*u) lu := urlForLinkInfo(*u)
info := linkInfo{ info := linkInfo{
uri: lu.String(), uri: lu.String(),
sintf: sintf, sintf: sintf,
linkType: linkType,
}
l.RLock()
state, ok := l._links[info]
l.RUnlock()
if ok && state != nil {
select {
case state.kick <- struct{}{}:
default:
}
return ErrLinkAlreadyConfigured
}
// Create the link entry. This will contain the connection
// in progress (if any), any error details and a context that
// lets the link be cancelled later.
ctx, cancel := context.WithCancel(l.core.ctx)
state = &link{
info: info,
linkProto: strings.ToUpper(u.Scheme),
ctx: ctx,
cancel: cancel,
} }
// Collect together the link options, these are global options // Collect together the link options, these are global options
@ -198,9 +162,31 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
options.priority = uint8(pi) options.priority = uint8(pi)
} }
// Store the state of the link, try to connect and then run // If we think we're already connected to this peer, load up
// the handler. // the existing peer state. Try to kick the peer if possible,
// which will cause an immediate connection attempt if it is
// backing off for some reason.
l.Lock() l.Lock()
state, ok := l._links[info]
if ok && state != nil {
select {
case state.kick <- struct{}{}:
default:
}
l.Unlock()
return ErrLinkAlreadyConfigured
}
// Create the link entry. This will contain the connection
// in progress (if any), any error details and a context that
// lets the link be cancelled later.
state = &link{
linkType: linkType,
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
// Store the state of the link so that it can be queried later.
l._links[info] = state l._links[info] = state
l.Unlock() l.Unlock()
@ -223,7 +209,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
return true return true
case <-state.kick: case <-state.kick:
return true return true
case <-ctx.Done(): case <-l.core.ctx.Done():
return false return false
} }
} }
@ -238,44 +224,75 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
defer l.Unlock() defer l.Unlock()
delete(l._links, info) delete(l._links, info)
}() }()
// This loop will run each and every time we want to attempt
// a connection to this peer.
for { for {
conn, err := l.connect(u, info, options) conn, err := l.connect(u, info, options)
if err != nil { if err != nil {
if linkType == linkTypePersistent { if linkType == linkTypePersistent {
phony.Block(state, func() { // If the link is a persistent configured peering,
state._err = err // store information about the connection error so
state._errtime = time.Now() // that we can report it through the admin socket.
}) state.Lock()
state._conn = nil
state._err = err
state._errtime = time.Now()
state.Unlock()
// Back off for a bit. If true is returned here, we
// can continue onto the next loop iteration to try
// the next connection.
if backoffNow() { if backoffNow() {
continue continue
} else { } else {
return return
} }
} else { } else {
// Ephemeral and incoming connections don't remain
// after a connection failure, so exit out of the
// loop and clean up the link entry.
break break
} }
} }
// The linkConn wrapper allows us to track the number of
// bytes written to and read from this connection without
// the help of ironwood.
lc := &linkConn{ lc := &linkConn{
Conn: conn, Conn: conn,
up: time.Now(), up: time.Now(),
} }
phony.Block(state, func() {
state._conn = lc // Update the link state with our newly wrapped connection.
state._err = nil // Clear the error state.
state._errtime = time.Time{} state.Lock()
}) state._conn = lc
if err = l.handler(&info, options, lc); err != nil && err != io.EOF { state._err = nil
state._errtime = time.Time{}
state.Unlock()
// Give the connection to the handler. The handler will block
// for the lifetime of the connection.
if err = l.handler(linkType, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", info.uri, err) l.core.log.Debugf("Link %s error: %s\n", info.uri, err)
} else { } else {
backoff = 0 backoff = 0
} }
_ = conn.Close()
phony.Block(state, func() { // The handler has stopped running so the connection is dead,
state._conn = nil // try to close the underlying socket just in case and then
if state._err = err; state._err != nil { // update the link state.
state._errtime = time.Now() _ = lc.Close()
} state.Lock()
}) state._conn = nil
if state._err = err; state._err != nil {
state._errtime = time.Now()
}
state.Unlock()
// If the link is persistently configured, back off if needed
// and then try reconnecting. Otherwise, exit out.
if linkType == linkTypePersistent { if linkType == linkTypePersistent {
if backoffNow() { if backoffNow() {
continue continue
@ -314,6 +331,16 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
ctx: ctx, ctx: ctx,
Cancel: cancel, Cancel: cancel,
} }
var options linkOptions
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
return nil, ErrLinkPriorityInvalid
}
options.priority = uint8(pi)
}
go func() { go func() {
l.core.log.Printf("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr()) l.core.log.Printf("%s listener started on %s", strings.ToUpper(u.Scheme), listener.Addr())
defer l.core.log.Printf("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr()) defer l.core.log.Printf("%s listener stopped on %s", strings.ToUpper(u.Scheme), listener.Addr())
@ -324,41 +351,65 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
} }
go func(conn net.Conn) { go func(conn net.Conn) {
defer conn.Close() defer conn.Close()
// In order to populate a somewhat sane looking connection
// URI in the admin socket, we need to replace the host in
// the listener URL with the remote address.
pu := *u pu := *u
pu.Host = conn.RemoteAddr().String() pu.Host = conn.RemoteAddr().String()
lu := urlForLinkInfo(pu) lu := urlForLinkInfo(pu)
info := linkInfo{ info := linkInfo{
uri: lu.String(), uri: lu.String(),
sintf: sintf, sintf: sintf,
linkType: linkTypeEphemeral, // TODO: should be incoming
} }
if l.isConnectedTo(info) {
// If there's an existing link state for this link, get it.
// If this node is already connected to us, just drop the
// connection. This prevents duplicate peerings.
l.Lock()
state, ok := l._links[info]
if ok && state != nil && state._conn != nil {
l.Unlock()
return return
} }
l.RLock()
state, ok := l._links[info]
l.RUnlock()
if !ok || state == nil { if !ok || state == nil {
state = &link{ state = &link{
info: info, linkType: linkTypeIncoming,
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
} }
} }
// The linkConn wrapper allows us to track the number of
// bytes written to and read from this connection without
// the help of ironwood.
lc := &linkConn{ lc := &linkConn{
Conn: conn, Conn: conn,
up: time.Now(), up: time.Now(),
} }
var options linkOptions
phony.Block(state, func() { // Update the link state with our newly wrapped connection.
state._conn = lc // Clear the error state.
state._err = nil state.Lock()
state.linkProto = strings.ToUpper(u.Scheme) state._conn = lc
}) state._err = nil
l.Lock() state._errtime = time.Time{}
state.Unlock()
// Store the state of the link so that it can be queried later.
l._links[info] = state l._links[info] = state
l.Unlock() l.Unlock()
if err = l.handler(&info, options, lc); err != nil && err != io.EOF {
// Give the connection to the handler. The handler will block
// for the lifetime of the connection.
if err = l.handler(linkTypeIncoming, options, lc); err != nil && err != io.EOF {
l.core.log.Debugf("Link %s error: %s\n", u.Host, err) l.core.log.Debugf("Link %s error: %s\n", u.Host, err)
} }
// The handler has stopped running so the connection is dead,
// try to close the underlying socket just in case and then
// drop the link state.
_ = lc.Close()
l.Lock() l.Lock()
delete(l._links, info) delete(l._links, info)
l.Unlock() l.Unlock()
@ -401,7 +452,7 @@ func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Con
return dialer.dial(u, info, options) return dialer.dial(u, info, options)
} }
func (l *links) handler(info *linkInfo, options linkOptions, conn net.Conn) error { func (l *links) handler(linkType linkType, options linkOptions, conn net.Conn) error {
meta := version_getBaseMetadata() meta := version_getBaseMetadata()
meta.publicKey = l.core.public meta.publicKey = l.core.public
metaBytes := meta.encode() metaBytes := meta.encode()
@ -453,12 +504,12 @@ func (l *links) handler(info *linkInfo, options linkOptions, conn net.Conn) erro
break break
} }
} }
if info.linkType == linkTypeIncoming && !isallowed { if linkType == linkTypeIncoming && !isallowed {
return fmt.Errorf("node public key %q is not in AllowedPublicKeys", hex.EncodeToString(meta.publicKey)) return fmt.Errorf("node public key %q is not in AllowedPublicKeys", hex.EncodeToString(meta.publicKey))
} }
dir := "outbound" dir := "outbound"
if info.linkType == linkTypeIncoming { if linkType == linkTypeIncoming {
dir = "inbound" dir = "inbound"
} }
remoteAddr := net.IP(address.AddrForKey(meta.publicKey)[:]).String() remoteAddr := net.IP(address.AddrForKey(meta.publicKey)[:]).String()