mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-27 12:05:23 +00:00
Merge pull request #360 from neilalexander/multilink
Support for multiple listeners
This commit is contained in:
commit
a0e6edd219
@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
|||||||
- in case of vulnerabilities.
|
- in case of vulnerabilities.
|
||||||
-->
|
-->
|
||||||
|
|
||||||
## [0.3.3] - 2018-02-18
|
## [0.3.3] - 2019-02-18
|
||||||
### Added
|
### Added
|
||||||
- Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported)
|
- Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported)
|
||||||
- Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available
|
- Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available
|
||||||
|
@ -134,12 +134,20 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Do a quick check for old-format Listen statement so that mapstructure
|
||||||
|
// doesn't fail and crash
|
||||||
|
if listen, ok := dat["Listen"].(string); ok {
|
||||||
|
if strings.HasPrefix(listen, "tcp://") {
|
||||||
|
dat["Listen"] = []string{listen}
|
||||||
|
} else {
|
||||||
|
dat["Listen"] = []string{"tcp://" + listen}
|
||||||
|
}
|
||||||
|
}
|
||||||
// Overlay our newly mapped configuration onto the autoconf node config that
|
// Overlay our newly mapped configuration onto the autoconf node config that
|
||||||
// we generated above.
|
// we generated above.
|
||||||
if err = mapstructure.Decode(dat, &cfg); err != nil {
|
if err = mapstructure.Decode(dat, &cfg); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
|
|
||||||
// NodeConfig defines all configuration values needed to run a signle yggdrasil node
|
// NodeConfig defines all configuration values needed to run a signle yggdrasil node
|
||||||
type NodeConfig struct {
|
type NodeConfig struct {
|
||||||
Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."`
|
Listen []string `comment:"Listen addresses for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."`
|
||||||
AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."`
|
AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."`
|
||||||
Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."`
|
Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."`
|
||||||
InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."`
|
InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."`
|
||||||
@ -30,13 +30,6 @@ type NodeConfig struct {
|
|||||||
SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."`
|
SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."`
|
||||||
NodeInfoPrivacy bool `comment:"By default, nodeinfo contains some defaults including the platform,\narchitecture and Yggdrasil version. These can help when surveying\nthe network and diagnosing network routing problems. Enabling\nnodeinfo privacy prevents this, so that only items specified in\n\"NodeInfo\" are sent back if specified."`
|
NodeInfoPrivacy bool `comment:"By default, nodeinfo contains some defaults including the platform,\narchitecture and Yggdrasil version. These can help when surveying\nthe network and diagnosing network routing problems. Enabling\nnodeinfo privacy prevents this, so that only items specified in\n\"NodeInfo\" are sent back if specified."`
|
||||||
NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."`
|
NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."`
|
||||||
//Net NetConfig `comment:"Extended options for connecting to peers over other networks."`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NetConfig defines network/proxy related configuration values
|
|
||||||
type NetConfig struct {
|
|
||||||
Tor TorConfig `comment:"Experimental options for configuring peerings over Tor."`
|
|
||||||
I2P I2PConfig `comment:"Experimental options for configuring peerings over I2P."`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionFirewall controls the session firewall configuration
|
// SessionFirewall controls the session firewall configuration
|
||||||
@ -71,18 +64,16 @@ type SwitchOptions struct {
|
|||||||
// isAutoconf is that the TCP and UDP ports will likely end up with different
|
// isAutoconf is that the TCP and UDP ports will likely end up with different
|
||||||
// port numbers.
|
// port numbers.
|
||||||
func GenerateConfig(isAutoconf bool) *NodeConfig {
|
func GenerateConfig(isAutoconf bool) *NodeConfig {
|
||||||
// Create a new core.
|
|
||||||
//core := Core{}
|
|
||||||
// Generate encryption keys.
|
// Generate encryption keys.
|
||||||
bpub, bpriv := crypto.NewBoxKeys()
|
bpub, bpriv := crypto.NewBoxKeys()
|
||||||
spub, spriv := crypto.NewSigKeys()
|
spub, spriv := crypto.NewSigKeys()
|
||||||
// Create a node configuration and populate it.
|
// Create a node configuration and populate it.
|
||||||
cfg := NodeConfig{}
|
cfg := NodeConfig{}
|
||||||
if isAutoconf {
|
if isAutoconf {
|
||||||
cfg.Listen = "[::]:0"
|
cfg.Listen = []string{"tcp://[::]:0"}
|
||||||
} else {
|
} else {
|
||||||
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
|
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)
|
cfg.Listen = []string{fmt.Sprintf("tcp://[::]:%d", r1.Intn(65534-32768)+32768)}
|
||||||
}
|
}
|
||||||
cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen
|
cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen
|
||||||
cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:])
|
cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:])
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
package config
|
|
||||||
|
|
||||||
// I2PConfig is the configuration structure for i2p related configuration
|
|
||||||
type I2PConfig struct {
|
|
||||||
Keyfile string // private key file or empty string for ephemeral keys
|
|
||||||
Addr string // address of i2p api connector
|
|
||||||
Enabled bool
|
|
||||||
}
|
|
@ -1,8 +0,0 @@
|
|||||||
package config
|
|
||||||
|
|
||||||
// TorConfig is the configuration structure for Tor Proxy related values
|
|
||||||
type TorConfig struct {
|
|
||||||
OnionKeyfile string // hidden service private key for ADD_ONION (currently unimplemented)
|
|
||||||
ControlAddr string // tor control port address
|
|
||||||
Enabled bool
|
|
||||||
}
|
|
@ -76,3 +76,20 @@ func FuncTimeout(f func(), timeout time.Duration) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This calculates the difference between two arrays and returns items
|
||||||
|
// that appear in A but not in B - useful somewhat when reconfiguring
|
||||||
|
// and working out what configuration items changed
|
||||||
|
func Difference(a, b []string) []string {
|
||||||
|
ab := []string{}
|
||||||
|
mb := map[string]bool{}
|
||||||
|
for _, x := range b {
|
||||||
|
mb[x] = true
|
||||||
|
}
|
||||||
|
for _, x := range a {
|
||||||
|
if !mb[x] {
|
||||||
|
ab = append(ab, x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ab
|
||||||
|
}
|
||||||
|
@ -591,18 +591,9 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
|
|||||||
|
|
||||||
// addPeer triggers a connection attempt to a node.
|
// addPeer triggers a connection attempt to a node.
|
||||||
func (a *admin) addPeer(addr string, sintf string) error {
|
func (a *admin) addPeer(addr string, sintf string) error {
|
||||||
u, err := url.Parse(addr)
|
err := a.core.link.call(addr, sintf)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
switch strings.ToLower(u.Scheme) {
|
return err
|
||||||
case "tcp":
|
|
||||||
a.core.tcp.connect(u.Host, sintf)
|
|
||||||
case "socks":
|
|
||||||
a.core.tcp.connectSOCKS(u.Host, u.Path[1:])
|
|
||||||
default:
|
|
||||||
return errors.New("invalid peer: " + addr)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return errors.New("invalid peer: " + addr)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -684,7 +675,8 @@ func (a *admin) getData_getPeers() []admin_nodeInfo {
|
|||||||
{"uptime", int(time.Since(p.firstSeen).Seconds())},
|
{"uptime", int(time.Since(p.firstSeen).Seconds())},
|
||||||
{"bytes_sent", atomic.LoadUint64(&p.bytesSent)},
|
{"bytes_sent", atomic.LoadUint64(&p.bytesSent)},
|
||||||
{"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)},
|
{"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)},
|
||||||
{"endpoint", p.endpoint},
|
{"proto", p.intf.info.linkType},
|
||||||
|
{"endpoint", p.intf.name},
|
||||||
{"box_pub_key", hex.EncodeToString(p.box[:])},
|
{"box_pub_key", hex.EncodeToString(p.box[:])},
|
||||||
}
|
}
|
||||||
peerInfos = append(peerInfos, info)
|
peerInfos = append(peerInfos, info)
|
||||||
@ -710,7 +702,8 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
|
|||||||
{"port", elem.port},
|
{"port", elem.port},
|
||||||
{"bytes_sent", atomic.LoadUint64(&peer.bytesSent)},
|
{"bytes_sent", atomic.LoadUint64(&peer.bytesSent)},
|
||||||
{"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)},
|
{"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)},
|
||||||
{"endpoint", peer.endpoint},
|
{"proto", peer.intf.info.linkType},
|
||||||
|
{"endpoint", peer.intf.info.remote},
|
||||||
{"box_pub_key", hex.EncodeToString(peer.box[:])},
|
{"box_pub_key", hex.EncodeToString(peer.box[:])},
|
||||||
}
|
}
|
||||||
peerInfos = append(peerInfos, info)
|
peerInfos = append(peerInfos, info)
|
||||||
|
@ -7,9 +7,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type awdl struct {
|
type awdl struct {
|
||||||
link *link
|
link *link
|
||||||
mutex sync.RWMutex // protects interfaces below
|
reconfigure chan chan error
|
||||||
interfaces map[string]*awdlInterface
|
mutex sync.RWMutex // protects interfaces below
|
||||||
|
interfaces map[string]*awdlInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
type awdlInterface struct {
|
type awdlInterface struct {
|
||||||
@ -49,8 +50,15 @@ func (a *awdl) init(l *link) error {
|
|||||||
a.link = l
|
a.link = l
|
||||||
a.mutex.Lock()
|
a.mutex.Lock()
|
||||||
a.interfaces = make(map[string]*awdlInterface)
|
a.interfaces = make(map[string]*awdlInterface)
|
||||||
|
a.reconfigure = make(chan chan error, 1)
|
||||||
a.mutex.Unlock()
|
a.mutex.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for e := range a.reconfigure {
|
||||||
|
e <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +44,6 @@ type Core struct {
|
|||||||
admin admin
|
admin admin
|
||||||
searches searches
|
searches searches
|
||||||
multicast multicast
|
multicast multicast
|
||||||
tcp tcpInterface
|
|
||||||
link link
|
link link
|
||||||
log *log.Logger
|
log *log.Logger
|
||||||
}
|
}
|
||||||
@ -144,7 +143,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
|
|||||||
c.router.tun.reconfigure,
|
c.router.tun.reconfigure,
|
||||||
c.router.cryptokey.reconfigure,
|
c.router.cryptokey.reconfigure,
|
||||||
c.switchTable.reconfigure,
|
c.switchTable.reconfigure,
|
||||||
c.tcp.reconfigure,
|
c.link.reconfigure,
|
||||||
c.multicast.reconfigure,
|
c.multicast.reconfigure,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,11 +204,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
|
|||||||
|
|
||||||
c.init()
|
c.init()
|
||||||
|
|
||||||
if err := c.tcp.init(c); err != nil {
|
|
||||||
c.log.Errorln("Failed to start TCP interface")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.link.init(c); err != nil {
|
if err := c.link.init(c); err != nil {
|
||||||
c.log.Errorln("Failed to start link interfaces")
|
c.log.Errorln("Failed to start link interfaces")
|
||||||
return err
|
return err
|
||||||
|
@ -97,7 +97,15 @@ func (c *Core) DEBUG_getPeers() *peers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer {
|
func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer {
|
||||||
return ps.newPeer(&box, &sig, &link, "(simulator)", nil)
|
sim := linkInterface{
|
||||||
|
name: "(simulator)",
|
||||||
|
info: linkInfo{
|
||||||
|
local: "(simulator)",
|
||||||
|
remote: "(simulator)",
|
||||||
|
linkType: "sim",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return ps.newPeer(&box, &sig, &link, &sim, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -449,19 +457,19 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) {
|
|||||||
|
|
||||||
//*
|
//*
|
||||||
func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) {
|
func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) {
|
||||||
c.config.Listen = addrport
|
c.config.Listen = []string{addrport}
|
||||||
if err := c.tcp.init(c /*, addrport, 0*/); err != nil {
|
if err := c.link.init(c); err != nil {
|
||||||
c.log.Println("Failed to start TCP interface:", err)
|
c.log.Println("Failed to start interfaces:", err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr {
|
func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr {
|
||||||
return c.tcp.serv.Addr().(*net.TCPAddr)
|
return c.link.tcp.getAddr()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) DEBUG_addTCPConn(saddr string) {
|
func (c *Core) DEBUG_addTCPConn(saddr string) {
|
||||||
c.tcp.call(saddr, nil, "")
|
c.link.tcp.call(saddr, nil, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
//*/
|
//*/
|
||||||
|
@ -6,8 +6,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
//"sync/atomic"
|
//"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -17,10 +19,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type link struct {
|
type link struct {
|
||||||
core *Core
|
core *Core
|
||||||
mutex sync.RWMutex // protects interfaces below
|
reconfigure chan chan error
|
||||||
interfaces map[linkInfo]*linkInterface
|
mutex sync.RWMutex // protects interfaces below
|
||||||
awdl awdl // AWDL interface support
|
interfaces map[linkInfo]*linkInterface
|
||||||
|
awdl awdl // AWDL interface support
|
||||||
|
tcp tcp // TCP interface support
|
||||||
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,16 +60,72 @@ func (l *link) init(c *Core) error {
|
|||||||
l.core = c
|
l.core = c
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
l.interfaces = make(map[linkInfo]*linkInterface)
|
l.interfaces = make(map[linkInfo]*linkInterface)
|
||||||
|
l.reconfigure = make(chan chan error)
|
||||||
l.mutex.Unlock()
|
l.mutex.Unlock()
|
||||||
|
|
||||||
if err := l.awdl.init(l); err != nil {
|
if err := l.tcp.init(l); err != nil {
|
||||||
l.core.log.Errorln("Failed to start AWDL interface")
|
c.log.Errorln("Failed to start TCP interface")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := l.awdl.init(l); err != nil {
|
||||||
|
c.log.Errorln("Failed to start AWDL interface")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
e := <-l.reconfigure
|
||||||
|
tcpresponse := make(chan error)
|
||||||
|
awdlresponse := make(chan error)
|
||||||
|
l.tcp.reconfigure <- tcpresponse
|
||||||
|
if err := <-tcpresponse; err != nil {
|
||||||
|
e <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
l.awdl.reconfigure <- awdlresponse
|
||||||
|
if err := <-awdlresponse; err != nil {
|
||||||
|
e <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
e <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *link) call(uri string, sintf string) error {
|
||||||
|
u, err := url.Parse(uri)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||||
|
switch u.Scheme {
|
||||||
|
case "tcp":
|
||||||
|
l.tcp.call(u.Host, nil, sintf)
|
||||||
|
case "socks":
|
||||||
|
l.tcp.call(pathtokens[0], u.Host, sintf)
|
||||||
|
default:
|
||||||
|
return errors.New("unknown call scheme: " + u.Scheme)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *link) listen(uri string) error {
|
||||||
|
u, err := url.Parse(uri)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch u.Scheme {
|
||||||
|
case "tcp":
|
||||||
|
_, err := l.tcp.listen(u.Host)
|
||||||
|
return err
|
||||||
|
default:
|
||||||
|
return errors.New("unknown listen scheme: " + u.Scheme)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) {
|
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) {
|
||||||
// Technically anything unique would work for names, but lets pick something human readable, just for debugging
|
// Technically anything unique would work for names, but lets pick something human readable, just for debugging
|
||||||
intf := linkInterface{
|
intf := linkInterface{
|
||||||
@ -147,7 +207,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
intf.link.mutex.Unlock()
|
intf.link.mutex.Unlock()
|
||||||
// Create peer
|
// Create peer
|
||||||
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
||||||
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() })
|
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() })
|
||||||
if intf.peer == nil {
|
if intf.peer == nil {
|
||||||
return errors.New("failed to create peer")
|
return errors.New("failed to create peer")
|
||||||
}
|
}
|
||||||
@ -201,11 +261,11 @@ func (intf *linkInterface) handler() error {
|
|||||||
// Now block until something is ready or the timer triggers keepalive traffic
|
// Now block until something is ready or the timer triggers keepalive traffic
|
||||||
select {
|
select {
|
||||||
case <-tcpTimer.C:
|
case <-tcpTimer.C:
|
||||||
intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s",
|
intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
send(nil)
|
send(nil)
|
||||||
case <-sendAck:
|
case <-sendAck:
|
||||||
intf.link.core.log.Debugf("Sending ack to %s: %s, source %s",
|
intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
send(nil)
|
send(nil)
|
||||||
case msg := <-intf.peer.linkOut:
|
case msg := <-intf.peer.linkOut:
|
||||||
@ -220,7 +280,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
case signalReady <- struct{}{}:
|
case signalReady <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
//intf.link.core.log.Debugf("Sending packet to %s: %s, source %s",
|
//intf.link.core.log.Tracef("Sending packet to %s: %s, source %s",
|
||||||
// strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
// strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -271,7 +331,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
sendTimerRunning = true
|
sendTimerRunning = true
|
||||||
}
|
}
|
||||||
if !gotMsg {
|
if !gotMsg {
|
||||||
intf.link.core.log.Debugf("Received ack from %s: %s, source %s",
|
intf.link.core.log.Tracef("Received ack from %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
}
|
}
|
||||||
case sentMsg, ok := <-signalSent:
|
case sentMsg, ok := <-signalSent:
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/ipv6"
|
"golang.org/x/net/ipv6"
|
||||||
@ -16,19 +15,16 @@ type multicast struct {
|
|||||||
reconfigure chan chan error
|
reconfigure chan chan error
|
||||||
sock *ipv6.PacketConn
|
sock *ipv6.PacketConn
|
||||||
groupAddr string
|
groupAddr string
|
||||||
myAddr *net.TCPAddr
|
listeners map[string]*tcpListener
|
||||||
myAddrMutex sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multicast) init(core *Core) {
|
func (m *multicast) init(core *Core) {
|
||||||
m.core = core
|
m.core = core
|
||||||
m.reconfigure = make(chan chan error, 1)
|
m.reconfigure = make(chan chan error, 1)
|
||||||
|
m.listeners = make(map[string]*tcpListener)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
e := <-m.reconfigure
|
e := <-m.reconfigure
|
||||||
m.myAddrMutex.Lock()
|
|
||||||
m.myAddr = m.core.tcp.getAddr()
|
|
||||||
m.myAddrMutex.Unlock()
|
|
||||||
e <- nil
|
e <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -68,13 +64,13 @@ func (m *multicast) start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multicast) interfaces() []net.Interface {
|
func (m *multicast) interfaces() map[string]net.Interface {
|
||||||
// Get interface expressions from config
|
// Get interface expressions from config
|
||||||
m.core.configMutex.RLock()
|
m.core.configMutex.RLock()
|
||||||
exprs := m.core.config.MulticastInterfaces
|
exprs := m.core.config.MulticastInterfaces
|
||||||
m.core.configMutex.RUnlock()
|
m.core.configMutex.RUnlock()
|
||||||
// Ask the system for network interfaces
|
// Ask the system for network interfaces
|
||||||
var interfaces []net.Interface
|
interfaces := make(map[string]net.Interface)
|
||||||
allifaces, err := net.Interfaces()
|
allifaces, err := net.Interfaces()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -94,12 +90,14 @@ func (m *multicast) interfaces() []net.Interface {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, expr := range exprs {
|
for _, expr := range exprs {
|
||||||
|
// Compile each regular expression
|
||||||
e, err := regexp.Compile(expr)
|
e, err := regexp.Compile(expr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
// Does the interface match the regular expression? Store it if so
|
||||||
if e.MatchString(iface.Name) {
|
if e.MatchString(iface.Name) {
|
||||||
interfaces = append(interfaces, iface)
|
interfaces[iface.Name] = iface
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -107,10 +105,6 @@ func (m *multicast) interfaces() []net.Interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *multicast) announce() {
|
func (m *multicast) announce() {
|
||||||
var anAddr net.TCPAddr
|
|
||||||
m.myAddrMutex.Lock()
|
|
||||||
m.myAddr = m.core.tcp.getAddr()
|
|
||||||
m.myAddrMutex.Unlock()
|
|
||||||
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
|
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -120,33 +114,66 @@ func (m *multicast) announce() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
for _, iface := range m.interfaces() {
|
interfaces := m.interfaces()
|
||||||
m.sock.JoinGroup(&iface, groupAddr)
|
// There might be interfaces that we configured listeners for but are no
|
||||||
|
// longer up - if that's the case then we should stop the listeners
|
||||||
|
for name, listener := range m.listeners {
|
||||||
|
if _, ok := interfaces[name]; !ok {
|
||||||
|
listener.stop <- true
|
||||||
|
delete(m.listeners, name)
|
||||||
|
m.core.log.Debugln("No longer multicasting on", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Now that we have a list of valid interfaces from the operating system,
|
||||||
|
// we can start checking if we can send multicasts on them
|
||||||
|
for _, iface := range interfaces {
|
||||||
|
// Find interface addresses
|
||||||
addrs, err := iface.Addrs()
|
addrs, err := iface.Addrs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
m.myAddrMutex.RLock()
|
|
||||||
anAddr.Port = m.myAddr.Port
|
|
||||||
m.myAddrMutex.RUnlock()
|
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
addrIP, _, _ := net.ParseCIDR(addr.String())
|
addrIP, _, _ := net.ParseCIDR(addr.String())
|
||||||
|
// Ignore IPv4 addresses
|
||||||
if addrIP.To4() != nil {
|
if addrIP.To4() != nil {
|
||||||
continue
|
continue
|
||||||
} // IPv6 only
|
}
|
||||||
|
// Ignore non-link-local addresses
|
||||||
if !addrIP.IsLinkLocalUnicast() {
|
if !addrIP.IsLinkLocalUnicast() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
anAddr.IP = addrIP
|
// Join the multicast group
|
||||||
anAddr.Zone = iface.Name
|
m.sock.JoinGroup(&iface, groupAddr)
|
||||||
destAddr.Zone = iface.Name
|
// Try and see if we already have a TCP listener for this interface
|
||||||
msg := []byte(anAddr.String())
|
var listener *tcpListener
|
||||||
m.sock.WriteTo(msg, nil, destAddr)
|
if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil {
|
||||||
|
// No listener was found - let's create one
|
||||||
|
listenaddr := fmt.Sprintf("[%s%%%s]:0", addrIP, iface.Name)
|
||||||
|
if li, err := m.core.link.tcp.listen(listenaddr); err == nil {
|
||||||
|
m.core.log.Debugln("Started multicasting on", iface.Name)
|
||||||
|
// Store the listener so that we can stop it later if needed
|
||||||
|
m.listeners[iface.Name] = li
|
||||||
|
listener = li
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// An existing listener was found
|
||||||
|
listener = m.listeners[iface.Name]
|
||||||
|
}
|
||||||
|
// Make sure nothing above failed for some reason
|
||||||
|
if listener == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Get the listener details and construct the multicast beacon
|
||||||
|
lladdr := listener.listener.Addr().String()
|
||||||
|
if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil {
|
||||||
|
destAddr.Zone = iface.Name
|
||||||
|
msg := []byte(a.String())
|
||||||
|
m.sock.WriteTo(msg, nil, destAddr)
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second * 15)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,6 +210,6 @@ func (m *multicast) listen() {
|
|||||||
}
|
}
|
||||||
addr.Zone = from.Zone
|
addr.Zone = from.Zone
|
||||||
saddr := addr.String()
|
saddr := addr.String()
|
||||||
m.core.tcp.connect(saddr, addr.Zone)
|
m.core.link.call("tcp://"+saddr, addr.Zone)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,7 @@ type peer struct {
|
|||||||
bytesRecvd uint64 // To track bandwidth usage for getPeers
|
bytesRecvd uint64 // To track bandwidth usage for getPeers
|
||||||
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
|
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
|
||||||
core *Core
|
core *Core
|
||||||
|
intf *linkInterface
|
||||||
port switchPort
|
port switchPort
|
||||||
box crypto.BoxPubKey
|
box crypto.BoxPubKey
|
||||||
sig crypto.SigPubKey
|
sig crypto.SigPubKey
|
||||||
@ -113,18 +114,19 @@ type peer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
||||||
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer {
|
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
p := peer{box: *box,
|
p := peer{box: *box,
|
||||||
sig: *sig,
|
sig: *sig,
|
||||||
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
|
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
|
||||||
linkShared: *linkShared,
|
linkShared: *linkShared,
|
||||||
endpoint: endpoint,
|
|
||||||
firstSeen: now,
|
firstSeen: now,
|
||||||
doSend: make(chan struct{}, 1),
|
doSend: make(chan struct{}, 1),
|
||||||
dinfo: make(chan *dhtInfo, 1),
|
dinfo: make(chan *dhtInfo, 1),
|
||||||
close: closer,
|
close: closer,
|
||||||
core: ps.core}
|
core: ps.core,
|
||||||
|
intf: intf,
|
||||||
|
}
|
||||||
ps.mutex.Lock()
|
ps.mutex.Lock()
|
||||||
defer ps.mutex.Unlock()
|
defer ps.mutex.Unlock()
|
||||||
oldPorts := ps.getPorts()
|
oldPorts := ps.getPorts()
|
||||||
|
@ -67,7 +67,15 @@ func (r *router) init(core *Core) {
|
|||||||
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
|
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
|
||||||
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
|
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
|
||||||
in := make(chan []byte, 1) // TODO something better than this...
|
in := make(chan []byte, 1) // TODO something better than this...
|
||||||
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil)
|
self := linkInterface{
|
||||||
|
name: "(self)",
|
||||||
|
info: linkInfo{
|
||||||
|
local: "(self)",
|
||||||
|
remote: "(self)",
|
||||||
|
linkType: "self",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
|
||||||
p.out = func(packet []byte) { in <- packet }
|
p.out = func(packet []byte) { in <- packet }
|
||||||
r.in = in
|
r.in = in
|
||||||
out := make(chan []byte, 32)
|
out := make(chan []byte, 32)
|
||||||
|
@ -24,35 +24,29 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/net/proxy"
|
"golang.org/x/net/proxy"
|
||||||
|
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const default_timeout = 6 * time.Second
|
const default_timeout = 6 * time.Second
|
||||||
const tcp_ping_interval = (default_timeout * 2 / 3)
|
const tcp_ping_interval = (default_timeout * 2 / 3)
|
||||||
|
|
||||||
// The TCP listener and information about active TCP connections, to avoid duplication.
|
// The TCP listener and information about active TCP connections, to avoid duplication.
|
||||||
type tcpInterface struct {
|
type tcp struct {
|
||||||
core *Core
|
link *link
|
||||||
reconfigure chan chan error
|
reconfigure chan chan error
|
||||||
serv net.Listener
|
|
||||||
stop chan bool
|
|
||||||
addr string
|
|
||||||
mutex sync.Mutex // Protecting the below
|
mutex sync.Mutex // Protecting the below
|
||||||
|
listeners map[string]*tcpListener
|
||||||
calls map[string]struct{}
|
calls map[string]struct{}
|
||||||
conns map[tcpInfo](chan struct{})
|
conns map[linkInfo](chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
|
type tcpListener struct {
|
||||||
// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable).
|
listener net.Listener
|
||||||
type tcpInfo struct {
|
stop chan bool
|
||||||
box crypto.BoxPubKey
|
|
||||||
sig crypto.SigPubKey
|
|
||||||
localAddr string
|
|
||||||
remoteAddr string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrapper function to set additional options for specific connection types.
|
// Wrapper function to set additional options for specific connection types.
|
||||||
func (iface *tcpInterface) setExtraOptions(c net.Conn) {
|
func (t *tcp) setExtraOptions(c net.Conn) {
|
||||||
switch sock := c.(type) {
|
switch sock := c.(type) {
|
||||||
case *net.TCPConn:
|
case *net.TCPConn:
|
||||||
sock.SetNoDelay(true)
|
sock.SetNoDelay(true)
|
||||||
@ -62,104 +56,152 @@ func (iface *tcpInterface) setExtraOptions(c net.Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns the address of the listener.
|
// Returns the address of the listener.
|
||||||
func (iface *tcpInterface) getAddr() *net.TCPAddr {
|
func (t *tcp) getAddr() *net.TCPAddr {
|
||||||
return iface.serv.Addr().(*net.TCPAddr)
|
// TODO: Fix this, because this will currently only give a single address
|
||||||
}
|
// to multicast.go, which obviously is not great, but right now multicast.go
|
||||||
|
// doesn't have the ability to send more than one address in a packet either
|
||||||
// Attempts to initiate a connection to the provided address.
|
t.mutex.Lock()
|
||||||
func (iface *tcpInterface) connect(addr string, intf string) {
|
defer t.mutex.Unlock()
|
||||||
iface.call(addr, nil, intf)
|
for _, l := range t.listeners {
|
||||||
}
|
return l.listener.Addr().(*net.TCPAddr)
|
||||||
|
}
|
||||||
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address.
|
return nil
|
||||||
func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) {
|
|
||||||
iface.call(peeraddr, &socksaddr, "")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the struct.
|
// Initializes the struct.
|
||||||
func (iface *tcpInterface) init(core *Core) (err error) {
|
func (t *tcp) init(l *link) error {
|
||||||
iface.core = core
|
t.link = l
|
||||||
iface.stop = make(chan bool, 1)
|
t.reconfigure = make(chan chan error, 1)
|
||||||
iface.reconfigure = make(chan chan error, 1)
|
t.mutex.Lock()
|
||||||
|
t.calls = make(map[string]struct{})
|
||||||
|
t.conns = make(map[linkInfo](chan struct{}))
|
||||||
|
t.listeners = make(map[string]*tcpListener)
|
||||||
|
t.mutex.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
e := <-iface.reconfigure
|
e := <-t.reconfigure
|
||||||
iface.core.configMutex.RLock()
|
t.link.core.configMutex.RLock()
|
||||||
updated := iface.core.config.Listen != iface.core.configOld.Listen
|
added := util.Difference(t.link.core.config.Listen, t.link.core.configOld.Listen)
|
||||||
iface.core.configMutex.RUnlock()
|
deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen)
|
||||||
if updated {
|
t.link.core.configMutex.RUnlock()
|
||||||
iface.stop <- true
|
if len(added) > 0 || len(deleted) > 0 {
|
||||||
iface.serv.Close()
|
for _, a := range added {
|
||||||
e <- iface.listen()
|
if a[:6] != "tcp://" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := t.listen(a[6:]); err != nil {
|
||||||
|
e <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, d := range deleted {
|
||||||
|
if d[:6] != "tcp://" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.mutex.Lock()
|
||||||
|
if listener, ok := t.listeners[d[6:]]; ok {
|
||||||
|
t.mutex.Unlock()
|
||||||
|
listener.stop <- true
|
||||||
|
} else {
|
||||||
|
t.mutex.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e <- nil
|
||||||
} else {
|
} else {
|
||||||
e <- nil
|
e <- nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return iface.listen()
|
t.link.core.configMutex.RLock()
|
||||||
|
defer t.link.core.configMutex.RUnlock()
|
||||||
|
for _, listenaddr := range t.link.core.config.Listen {
|
||||||
|
if listenaddr[:6] != "tcp://" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := t.listen(listenaddr[6:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iface *tcpInterface) listen() error {
|
func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
iface.core.configMutex.RLock()
|
|
||||||
iface.addr = iface.core.config.Listen
|
|
||||||
iface.core.configMutex.RUnlock()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
lc := net.ListenConfig{
|
lc := net.ListenConfig{
|
||||||
Control: iface.tcpContext,
|
Control: t.tcpContext,
|
||||||
}
|
}
|
||||||
iface.serv, err = lc.Listen(ctx, "tcp", iface.addr)
|
listener, err := lc.Listen(ctx, "tcp", listenaddr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
iface.mutex.Lock()
|
l := tcpListener{
|
||||||
iface.calls = make(map[string]struct{})
|
listener: listener,
|
||||||
iface.conns = make(map[tcpInfo](chan struct{}))
|
stop: make(chan bool),
|
||||||
iface.mutex.Unlock()
|
}
|
||||||
go iface.listener()
|
go t.listener(&l, listenaddr)
|
||||||
return nil
|
return &l, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs the listener, which spawns off goroutines for incoming connections.
|
// Runs the listener, which spawns off goroutines for incoming connections.
|
||||||
func (iface *tcpInterface) listener() {
|
func (t *tcp) listener(l *tcpListener, listenaddr string) {
|
||||||
defer iface.serv.Close()
|
if l == nil {
|
||||||
iface.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String())
|
return
|
||||||
|
}
|
||||||
|
// Track the listener so that we can find it again in future
|
||||||
|
t.mutex.Lock()
|
||||||
|
if _, isIn := t.listeners[listenaddr]; isIn {
|
||||||
|
t.mutex.Unlock()
|
||||||
|
l.listener.Close()
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
t.listeners[listenaddr] = l
|
||||||
|
t.mutex.Unlock()
|
||||||
|
}
|
||||||
|
// And here we go!
|
||||||
|
accepted := make(chan bool)
|
||||||
|
defer func() {
|
||||||
|
t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String())
|
||||||
|
l.listener.Close()
|
||||||
|
t.mutex.Lock()
|
||||||
|
delete(t.listeners, listenaddr)
|
||||||
|
t.mutex.Unlock()
|
||||||
|
}()
|
||||||
|
t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String())
|
||||||
for {
|
for {
|
||||||
sock, err := iface.serv.Accept()
|
var sock net.Conn
|
||||||
if err != nil {
|
var err error
|
||||||
iface.core.log.Errorln("Failed to accept connection:", err)
|
// Listen in a separate goroutine, as that way it does not block us from
|
||||||
return
|
// receiving "stop" events
|
||||||
}
|
go func() {
|
||||||
|
sock, err = l.listener.Accept()
|
||||||
|
accepted <- true
|
||||||
|
}()
|
||||||
|
// Wait for either an accepted connection, or a message telling us to stop
|
||||||
|
// the TCP listener
|
||||||
select {
|
select {
|
||||||
case <-iface.stop:
|
case <-accepted:
|
||||||
iface.core.log.Errorln("Stopping listener")
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
t.link.core.log.Errorln("Failed to accept connection:", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
go iface.handler(sock, true)
|
go t.handler(sock, true, nil)
|
||||||
|
case <-l.stop:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if we already have a connection to this node
|
|
||||||
func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool {
|
|
||||||
iface.mutex.Lock()
|
|
||||||
defer iface.mutex.Unlock()
|
|
||||||
_, isIn := iface.conns[info]
|
|
||||||
return isIn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checks if we already are calling this address
|
// Checks if we already are calling this address
|
||||||
func (iface *tcpInterface) isAlreadyCalling(saddr string) bool {
|
func (t *tcp) isAlreadyCalling(saddr string) bool {
|
||||||
iface.mutex.Lock()
|
t.mutex.Lock()
|
||||||
defer iface.mutex.Unlock()
|
defer t.mutex.Unlock()
|
||||||
_, isIn := iface.calls[saddr]
|
_, isIn := t.calls[saddr]
|
||||||
return isIn
|
return isIn
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,34 +210,39 @@ func (iface *tcpInterface) isAlreadyCalling(saddr string) bool {
|
|||||||
// If the dial is successful, it launches the handler.
|
// If the dial is successful, it launches the handler.
|
||||||
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
|
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
|
||||||
// This all happens in a separate goroutine that it spawns.
|
// This all happens in a separate goroutine that it spawns.
|
||||||
func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
|
func (t *tcp) call(saddr string, options interface{}, sintf string) {
|
||||||
go func() {
|
go func() {
|
||||||
callname := saddr
|
callname := saddr
|
||||||
if sintf != "" {
|
if sintf != "" {
|
||||||
callname = fmt.Sprintf("%s/%s", saddr, sintf)
|
callname = fmt.Sprintf("%s/%s", saddr, sintf)
|
||||||
}
|
}
|
||||||
if iface.isAlreadyCalling(callname) {
|
if t.isAlreadyCalling(callname) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
iface.mutex.Lock()
|
t.mutex.Lock()
|
||||||
iface.calls[callname] = struct{}{}
|
t.calls[callname] = struct{}{}
|
||||||
iface.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
defer func() {
|
defer func() {
|
||||||
// Block new calls for a little while, to mitigate livelock scenarios
|
// Block new calls for a little while, to mitigate livelock scenarios
|
||||||
time.Sleep(default_timeout)
|
time.Sleep(default_timeout)
|
||||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
||||||
iface.mutex.Lock()
|
t.mutex.Lock()
|
||||||
delete(iface.calls, callname)
|
delete(t.calls, callname)
|
||||||
iface.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
}()
|
}()
|
||||||
var conn net.Conn
|
var conn net.Conn
|
||||||
var err error
|
var err error
|
||||||
if socksaddr != nil {
|
socksaddr, issocks := options.(string)
|
||||||
|
if issocks {
|
||||||
if sintf != "" {
|
if sintf != "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
dialerdst, er := net.ResolveTCPAddr("tcp", socksaddr)
|
||||||
|
if er != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
var dialer proxy.Dialer
|
var dialer proxy.Dialer
|
||||||
dialer, err = proxy.SOCKS5("tcp", *socksaddr, nil, proxy.Direct)
|
dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), nil, proxy.Direct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -210,9 +257,10 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
|
|||||||
addr: saddr,
|
addr: saddr,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
t.handler(conn, false, dialerdst.String())
|
||||||
} else {
|
} else {
|
||||||
dialer := net.Dialer{
|
dialer := net.Dialer{
|
||||||
Control: iface.tcpContext,
|
Control: t.tcpContext,
|
||||||
}
|
}
|
||||||
if sintf != "" {
|
if sintf != "" {
|
||||||
ief, err := net.InterfaceByName(sintf)
|
ief, err := net.InterfaceByName(sintf)
|
||||||
@ -266,26 +314,34 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
t.handler(conn, false, nil)
|
||||||
}
|
}
|
||||||
iface.handler(conn, false)
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
|
func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) {
|
||||||
defer sock.Close()
|
defer sock.Close()
|
||||||
iface.setExtraOptions(sock)
|
t.setExtraOptions(sock)
|
||||||
stream := stream{}
|
stream := stream{}
|
||||||
stream.init(sock)
|
stream.init(sock)
|
||||||
local, _, _ := net.SplitHostPort(sock.LocalAddr().String())
|
local, _, _ := net.SplitHostPort(sock.LocalAddr().String())
|
||||||
remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
|
remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
|
||||||
remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast()
|
remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast()
|
||||||
name := "tcp://" + sock.RemoteAddr().String()
|
var name string
|
||||||
link, err := iface.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal)
|
var proto string
|
||||||
|
if socksaddr, issocks := options.(string); issocks {
|
||||||
|
name = "socks://" + socksaddr + "/" + sock.RemoteAddr().String()
|
||||||
|
proto = "socks"
|
||||||
|
} else {
|
||||||
|
name = "tcp://" + sock.RemoteAddr().String()
|
||||||
|
proto = "tcp"
|
||||||
|
}
|
||||||
|
link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, remotelinklocal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
iface.core.log.Println(err)
|
t.link.core.log.Println(err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
iface.core.log.Debugln("DEBUG: starting handler for", name)
|
t.link.core.log.Debugln("DEBUG: starting handler for", name)
|
||||||
err = link.handler()
|
err = link.handler()
|
||||||
iface.core.log.Debugln("DEBUG: stopped handler for", name, err)
|
t.link.core.log.Debugln("DEBUG: stopped handler for", name, err)
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
||||||
|
|
||||||
func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error {
|
func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
||||||
var control error
|
var control error
|
||||||
var recvanyif error
|
var recvanyif error
|
||||||
|
|
||||||
|
@ -8,6 +8,6 @@ import (
|
|||||||
|
|
||||||
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
|
||||||
|
|
||||||
func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error {
|
func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user