make removePeers work for TCP connections and minor admin cleanup

This commit is contained in:
Arceliar 2018-05-05 17:14:03 -05:00
parent d34e0f92c8
commit cdedd304af
5 changed files with 64 additions and 95 deletions

View File

@ -5,6 +5,7 @@ import "os"
import "bytes" import "bytes"
import "errors" import "errors"
import "fmt" import "fmt"
import "net/url"
import "sort" import "sort"
import "strings" import "strings"
import "strconv" import "strconv"
@ -57,18 +58,18 @@ func (a *admin) init(c *Core, listenaddr string) {
a.addHandler("getSessions", nil, func(out *[]byte, _ ...string) { a.addHandler("getSessions", nil, func(out *[]byte, _ ...string) {
*out = []byte(a.printInfos(a.getData_getSessions())) *out = []byte(a.printInfos(a.getData_getSessions()))
}) })
a.addHandler("addPeer", []string{"<peer>"}, func(out *[]byte, saddr ...string) { a.addHandler("addPeer", []string{"<port>"}, func(out *[]byte, saddr ...string) {
if a.addPeer(saddr[0]) == nil { if a.addPeer(saddr[0]) == nil {
*out = []byte("Adding peer: " + saddr[0] + "\n") *out = []byte("Adding peer: " + saddr[0] + "\n")
} else { } else {
*out = []byte("Failed to add peer: " + saddr[0] + "\n") *out = []byte("Failed to add peer: " + saddr[0] + "\n")
} }
}) })
a.addHandler("removePeer", []string{"<peer>"}, func(out *[]byte, saddr ...string) { a.addHandler("removePeer", []string{"<port>"}, func(out *[]byte, sport ...string) {
if a.removePeer(saddr[0]) == nil { if a.removePeer(sport[0]) == nil {
*out = []byte("Removing peer: " + saddr[0] + "\n") *out = []byte("Removing peer: " + sport[0] + "\n")
} else { } else {
*out = []byte("Failed to remove peer: " + saddr[0] + "\n") *out = []byte("Failed to remove peer: " + sport[0] + "\n")
} }
}) })
a.addHandler("setTunTap", []string{"<ifname|auto|none>", "[<tun|tap>]", "[<mtu>]"}, func(out *[]byte, ifparams ...string) { a.addHandler("setTunTap", []string{"<ifname|auto|none>", "[<tun|tap>]", "[<mtu>]"}, func(out *[]byte, ifparams ...string) {
@ -191,60 +192,43 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
return strings.Join(out, "\n") return strings.Join(out, "\n")
} }
func (a *admin) addPeer(p string) error { func (a *admin) addPeer(addr string) error {
pAddr := p u, err := url.Parse(addr)
if p[:4] == "tcp:" || p[:4] == "udp:" { if err == nil {
pAddr = p[4:] switch strings.ToLower(u.Scheme) {
} case "tcp":
switch { a.core.DEBUG_addTCPConn(u.Host)
case len(p) >= 4 && p[:4] == "udp:": case "udp":
// Connect to peer over UDP a.core.DEBUG_maybeSendUDPKeys(u.Host)
udpAddr, err := net.ResolveUDPAddr("udp", pAddr) case "socks":
if err != nil { a.core.DEBUG_addSOCKSConn(u.Host, u.Path[1:])
return err default:
return errors.New("invalid peer: " + addr)
} }
var addr connAddr } else {
addr.fromUDPAddr(udpAddr) // no url scheme provided
a.core.udp.mutex.RLock() addr = strings.ToLower(addr)
_, isIn := a.core.udp.conns[addr] if strings.HasPrefix(addr, "udp:") {
a.core.udp.mutex.RUnlock() a.core.DEBUG_maybeSendUDPKeys(addr[4:])
if !isIn { return nil
a.core.udp.sendKeys(addr) } else {
if strings.HasPrefix(addr, "tcp:") {
addr = addr[4:]
}
a.core.DEBUG_addTCPConn(addr)
return nil
} }
return nil return errors.New("invalid peer: " + addr)
case len(p) >= 4 && p[:4] == "tcp:":
default:
// Connect to peer over TCP
_, err := net.ResolveTCPAddr("tcp", pAddr)
if err != nil {
return err
}
a.core.tcp.call(p)
} }
return nil return nil
} }
func (a *admin) removePeer(p string) error { func (a *admin) removePeer(p string) error {
pAddr := p iport, err := strconv.Atoi(p)
if p[:4] == "tcp:" || p[:4] == "udp:" { if err != nil {
pAddr = p[4:] return err
}
switch {
case len(p) >= 4 && p[:4] == "udp:":
// Connect to peer over UDP
udpAddr, err := net.ResolveUDPAddr("udp", pAddr)
if err != nil {
return err
}
var addr connAddr
addr.fromUDPAddr(udpAddr)
a.core.udp.sendClose(addr)
return nil
case len(p) >= 4 && p[:4] == "tcp:":
default:
// Connect to peer over TCP
return errors.New("Removing TCP peer not yet supported")
} }
a.core.peers.removePeer(switchPort(iport))
return nil return nil
} }

View File

@ -12,9 +12,7 @@ import "golang.org/x/net/proxy"
import "fmt" import "fmt"
import "net" import "net"
import "net/url"
import "log" import "log"
import "strings"
import "regexp" import "regexp"
// Core // Core
@ -313,29 +311,9 @@ func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
func (c *Core) DEBUG_addPeer(addr string) { func (c *Core) DEBUG_addPeer(addr string) {
u, err := url.Parse(addr) err := c.admin.addPeer(addr)
if err == nil { if err != nil {
switch strings.ToLower(u.Scheme) { panic(err)
case "tcp":
c.DEBUG_addTCPConn(u.Host)
case "udp":
c.DEBUG_maybeSendUDPKeys(u.Host)
case "socks":
c.DEBUG_addSOCKSConn(u.Host, u.Path[1:])
default:
panic("invalid peer: " + addr)
}
} else {
// no url scheme provided
addr = strings.ToLower(addr)
if strings.HasPrefix(addr, "udp:") {
c.DEBUG_maybeSendUDPKeys(addr[4:])
} else {
if strings.HasPrefix(addr, "tcp:") {
addr = addr[4:]
}
c.DEBUG_addTCPConn(addr)
}
} }
} }

View File

@ -73,6 +73,8 @@ type peer struct {
// Specifically, processing switch messages, signing, and verifying sigs // Specifically, processing switch messages, signing, and verifying sigs
// Resets at the start of each tick // Resets at the start of each tick
throttle uint8 throttle uint8
// Called when a peer is removed, to close the underlying connection, or via admin api
close func()
} }
const peer_Throttle = 1 const peer_Throttle = 1
@ -123,6 +125,26 @@ func (ps *peers) newPeer(box *boxPubKey,
return &p return &p
} }
func (ps *peers) removePeer(port switchPort) {
// TODO? store linkIn in the peer struct, close it here? (once)
if port == 0 {
return
} // Can't remove self peer
ps.mutex.Lock()
oldPorts := ps.getPorts()
p, isIn := oldPorts[port]
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
delete(newPorts, port)
ps.putPorts(newPorts)
ps.mutex.Unlock()
if isIn && p.close != nil {
p.close()
}
}
func (p *peer) linkLoop(in <-chan []byte) { func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()

View File

@ -238,19 +238,12 @@ func (iface *tcpInterface) handler(sock net.Conn) {
util_putBytes(msg) util_putBytes(msg)
} }
} }
p.close = func() { sock.Close() }
setNoDelay(sock, true) setNoDelay(sock, true)
go p.linkLoop(linkIn) go p.linkLoop(linkIn)
defer func() { defer func() {
// Put all of our cleanup here... // Put all of our cleanup here...
p.core.peers.mutex.Lock() p.core.peers.removePeer(p.port)
oldPorts := p.core.peers.getPorts()
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
delete(newPorts, p.port)
p.core.peers.putPorts(newPorts)
p.core.peers.mutex.Unlock()
close(linkIn) close(linkIn)
}() }()
them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String())

View File

@ -115,20 +115,11 @@ func (iface *udpInterface) startConn(info *connInfo) {
iface.mutex.Lock() iface.mutex.Lock()
delete(iface.conns, info.addr) delete(iface.conns, info.addr)
iface.mutex.Unlock() iface.mutex.Unlock()
iface.core.peers.mutex.Lock() iface.core.peers.removePeer(info.peer.port)
oldPorts := iface.core.peers.getPorts()
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
delete(newPorts, info.peer.port)
iface.core.peers.putPorts(newPorts)
iface.core.peers.mutex.Unlock()
close(info.linkIn) close(info.linkIn)
close(info.keysIn) close(info.keysIn)
close(info.closeIn) close(info.closeIn)
close(info.out) close(info.out)
iface.sendClose(info.addr)
iface.core.log.Println("Removing peer:", info.name) iface.core.log.Println("Removing peer:", info.name)
}() }()
for { for {
@ -296,6 +287,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
} }
}() }()
//*/ //*/
conn.peer.close = func() { iface.sendClose(conn.addr) }
iface.mutex.Lock() iface.mutex.Lock()
iface.conns[addr] = conn iface.conns[addr] = conn
iface.mutex.Unlock() iface.mutex.Unlock()