Restore removePeer method

This commit is contained in:
Neil Alexander 2023-10-22 10:27:41 +01:00
parent 80e56eafcd
commit 73c6c25bd9
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 66 additions and 40 deletions

View File

@ -1,5 +1,10 @@
package admin
import (
"fmt"
"net/url"
)
type RemovePeerRequest struct {
Uri string `json:"uri"`
Sintf string `json:"interface,omitempty"`
@ -8,5 +13,9 @@ type RemovePeerRequest struct {
type RemovePeerResponse struct{}
func (a *AdminSocket) removePeerHandler(req *RemovePeerRequest, res *RemovePeerResponse) error {
return a.core.RemovePeer(req.Uri, req.Sintf)
u, err := url.Parse(req.Uri)
if err != nil {
return fmt.Errorf("unable to parse peering URI: %w", err)
}
return a.core.RemovePeer(u, req.Sintf)
}

View File

@ -3,7 +3,6 @@ package core
import (
"crypto/ed25519"
"encoding/json"
"fmt"
"net"
"net/url"
"sync/atomic"
@ -192,28 +191,8 @@ func (c *Core) AddPeer(u *url.URL, sintf string) error {
// RemovePeer removes a peer. The peer should be specified in URI format, see AddPeer.
// The peer is not disconnected immediately.
func (c *Core) RemovePeer(uri string, sourceInterface string) error {
return fmt.Errorf("not implemented yet")
/*
var err error
phony.Block(c, func() {
peer := Peer{uri, sourceInterface}
linkInfo, ok := c.config._peers[peer]
if !ok {
err = fmt.Errorf("peer not configured")
return
}
if ok && linkInfo != nil {
c.links.Act(nil, func() {
if link := c.links._links[*linkInfo]; link != nil {
_ = link.conn.Close()
}
})
}
delete(c.config._peers, peer)
})
return err
*/
func (c *Core) RemovePeer(u *url.URL, sintf string) error {
return c.links.remove(u, sintf, linkTypePersistent)
}
// CallPeer calls a peer once. This should be specified in the peer URI format,

View File

@ -53,6 +53,8 @@ type linkInfo struct {
// link tracks the state of a connection, either persistent or non-persistent
type link struct {
ctx context.Context // Connection context
cancel context.CancelFunc // Stop future redial attempts (when peer removed)
kick chan struct{} // Attempt to reconnect now, if backing off
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
@ -129,6 +131,7 @@ type linkError string
func (e linkError) Error() string { return string(e) }
const ErrLinkAlreadyConfigured = linkError("peer is already configured")
const ErrLinkNotConfigured = linkError("peer is not configured")
const ErrLinkPriorityInvalid = linkError("priority value is invalid")
const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid")
const ErrLinkPasswordInvalid = linkError("password is invalid")
@ -199,6 +202,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
state.ctx, state.cancel = context.WithCancel(l.core.ctx)
// Store the state of the link so that it can be queried later.
l._links[info] = state
@ -218,12 +222,14 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
backoff++
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
select {
case <-time.After(duration):
return true
case <-state.kick:
return true
case <-state.ctx.Done():
return false
case <-l.core.ctx.Done():
return false
case <-time.After(duration):
return true
}
}
@ -232,19 +238,25 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry.
go func() {
defer func() {
phony.Block(l, func() {
defer phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})
}()
// This loop will run each and every time we want to attempt
// a connection to this peer.
// TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed
for {
conn, err := l.connect(u, info, options)
select {
case <-state.ctx.Done():
// The peering context has been cancelled, so don't try
// to dial again.
return
default:
}
conn, err := l.connect(state.ctx, u, info, options)
if err != nil {
if linkType == linkTypePersistent {
// If the link is a persistent configured peering,
@ -319,13 +331,39 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
}
return
}
break
}
}()
})
return retErr
}
func (l *links) remove(u *url.URL, sintf string, linkType linkType) error {
var retErr error
phony.Block(l, func() {
// Generate the link info and see whether we think we already
// have an open peering to this peer.
lu := urlForLinkInfo(*u)
info := linkInfo{
uri: lu.String(),
sintf: sintf,
}
// If this peer is already configured then we will close the
// connection and stop it from retrying.
state, ok := l._links[info]
if ok && state != nil {
state.cancel()
if conn := state._conn; conn != nil {
retErr = conn.Close()
}
return
}
retErr = ErrLinkNotConfigured
})
return retErr
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx)
var protocol linkProtocol
@ -453,7 +491,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
return li, nil
}
func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
func (l *links) connect(ctx context.Context, u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
var dialer linkProtocol
switch strings.ToLower(u.Scheme) {
case "tcp":
@ -485,7 +523,7 @@ func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Con
default:
return nil, ErrLinkUnrecognisedSchema
}
return dialer.dial(l.core.ctx, u, info, options)
return dialer.dial(ctx, u, info, options)
}
func (l *links) handler(linkType linkType, options linkOptions, conn net.Conn) error {