Centralise call/listen functions in link.go

This commit is contained in:
Neil Alexander 2019-03-04 22:45:35 +00:00
parent 61774aed3b
commit 88925d3e06
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
4 changed files with 45 additions and 41 deletions

View File

@ -573,18 +573,9 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
// addPeer triggers a connection attempt to a node. // addPeer triggers a connection attempt to a node.
func (a *admin) addPeer(addr string, sintf string) error { func (a *admin) addPeer(addr string, sintf string) error {
u, err := url.Parse(addr) err := a.core.link.call(addr, sintf)
if err == nil { if err != nil {
switch strings.ToLower(u.Scheme) { return err
case "tcp":
a.core.link.tcp.connect(u.Host, sintf)
case "socks":
a.core.link.tcp.connectSOCKS(u.Host, u.Path[1:])
default:
return errors.New("invalid peer: " + addr)
}
} else {
return errors.New("invalid peer: " + addr)
} }
return nil return nil
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"net/url"
"strings" "strings"
"sync" "sync"
@ -68,21 +69,20 @@ func (l *link) init(c *Core) error {
} }
if err := l.awdl.init(l); err != nil { if err := l.awdl.init(l); err != nil {
l.core.log.Errorln("Failed to start AWDL interface") c.log.Errorln("Failed to start AWDL interface")
return err return err
} }
go func() { go func() {
for { for {
e := <-l.reconfigure e := <-l.reconfigure
tcpresponse := make(chan error) response := make(chan error)
awdlresponse := make(chan error) l.tcp.reconfigure <- response
l.tcp.reconfigure <- tcpresponse if err := <-response; err != nil {
l.awdl.reconfigure <- awdlresponse
if err := <-tcpresponse; err != nil {
e <- err e <- err
} }
if err := <-awdlresponse; err != nil { l.awdl.reconfigure <- response
if err := <-response; err != nil {
e <- err e <- err
} }
e <- nil e <- nil
@ -92,6 +92,36 @@ func (l *link) init(c *Core) error {
return nil return nil
} }
func (l *link) call(uri string, sintf string) error {
u, err := url.Parse(uri)
if err != nil {
return err
}
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
switch u.Scheme {
case "tcp":
l.tcp.call(u.Host, nil, sintf)
case "socks":
l.tcp.call(pathtokens[0], &u.Host, sintf)
default:
return errors.New("unknown call scheme: " + u.Scheme)
}
return nil
}
func (l *link) listen(uri string) error {
u, err := url.Parse(uri)
if err != nil {
return err
}
switch u.Scheme {
case "tcp":
return l.tcp.listen(u.Host)
default:
return errors.New("unknown listen scheme: " + u.Scheme)
}
}
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) {
// Technically anything unique would work for names, but lets pick something human readable, just for debugging // Technically anything unique would work for names, but lets pick something human readable, just for debugging
intf := linkInterface{ intf := linkInterface{

View File

@ -183,6 +183,6 @@ func (m *multicast) listen() {
} }
addr.Zone = from.Zone addr.Zone = from.Zone
saddr := addr.String() saddr := addr.String()
m.core.link.tcp.connect(saddr, addr.Zone) m.core.link.call("tcp://"+saddr, addr.Zone)
} }
} }

View File

@ -64,16 +64,6 @@ func (t *tcp) getAddr() *net.TCPAddr {
return nil return nil
} }
// Attempts to initiate a connection to the provided address.
func (t *tcp) connect(addr string, intf string) {
t.call(addr, nil, intf)
}
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address.
func (t *tcp) connectSOCKS(socksaddr, peeraddr string) {
t.call(peeraddr, &socksaddr, "")
}
// Initializes the struct. // Initializes the struct.
func (t *tcp) init(l *link) error { func (t *tcp) init(l *link) error {
t.link = l t.link = l
@ -104,14 +94,6 @@ func (t *tcp) init(l *link) error {
} }
for _, delete := range deleted { for _, delete := range deleted {
t.link.core.log.Warnln("Removing listener", delete, "not currently implemented") t.link.core.log.Warnln("Removing listener", delete, "not currently implemented")
/*t.mutex.Lock()
if listener, ok := t.listeners[delete]; ok {
listener.Close()
}
if listener, ok := t.listenerstops[delete]; ok {
listener <- true
}
t.mutex.Unlock()*/
} }
e <- nil e <- nil
} else { } else {
@ -202,7 +184,7 @@ func (t *tcp) isAlreadyCalling(saddr string) bool {
// If the dial is successful, it launches the handler. // If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later. // When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns. // This all happens in a separate goroutine that it spawns.
func (t *tcp) call(saddr string, socksaddr *string, sintf string) { func (t *tcp) call(saddr string, options interface{}, sintf string) {
go func() { go func() {
callname := saddr callname := saddr
if sintf != "" { if sintf != "" {
@ -224,12 +206,13 @@ func (t *tcp) call(saddr string, socksaddr *string, sintf string) {
}() }()
var conn net.Conn var conn net.Conn
var err error var err error
if socksaddr != nil { socksaddr, issocks := options.(string)
if issocks {
if sintf != "" { if sintf != "" {
return return
} }
var dialer proxy.Dialer var dialer proxy.Dialer
dialer, err = proxy.SOCKS5("tcp", *socksaddr, nil, proxy.Direct) dialer, err = proxy.SOCKS5("tcp", socksaddr, nil, proxy.Direct)
if err != nil { if err != nil {
return return
} }