From be8db0c120b48c233e29318138cb62cda94a2296 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Mar 2019 17:52:57 +0000 Subject: [PATCH] Support multiple TCP listeners --- cmd/yggdrasil/main.go | 10 ++- src/config/config.go | 6 +- src/yggdrasil/link.go | 9 ++- src/yggdrasil/tcp.go | 155 +++++++++++++++++++++--------------- src/yggdrasil/tcp_darwin.go | 2 +- src/yggdrasil/tcp_other.go | 2 +- 6 files changed, 110 insertions(+), 74 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index aa5a7494..e0c764e7 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -134,12 +134,20 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo } } } + // Do a quick check for old-format Listen statement so that mapstructure + // doesn't fail and crash + if listen, ok := dat["Listen"].(string); ok { + if strings.HasPrefix(listen, "tcp://") { + dat["Listen"] = []string{listen} + } else { + dat["Listen"] = []string{"tcp://" + listen} + } + } // Overlay our newly mapped configuration onto the autoconf node config that // we generated above. if err = mapstructure.Decode(dat, &cfg); err != nil { panic(err) } - return cfg } diff --git a/src/config/config.go b/src/config/config.go index 14b16490..807ce25b 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -12,7 +12,7 @@ import ( // NodeConfig defines all configuration values needed to run a signle yggdrasil node type NodeConfig struct { - Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` + Listen []string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` @@ -79,10 +79,10 @@ func GenerateConfig(isAutoconf bool) *NodeConfig { // Create a node configuration and populate it. cfg := NodeConfig{} if isAutoconf { - cfg.Listen = "[::]:0" + cfg.Listen = []string{"tcp://[::]:0"} } else { r1 := rand.New(rand.NewSource(time.Now().UnixNano())) - cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768) + cfg.Listen = []string{fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768)} } cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:]) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 8c03e086..277f24c0 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -21,8 +21,9 @@ type link struct { core *Core mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface - awdl awdl // AWDL interface support - tcp tcpInterface // TCP interface support + handlers map[string]linkListener + awdl awdl // AWDL interface support + tcp tcp // TCP interface support // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -34,6 +35,10 @@ type linkInfo struct { remote string // Remote name or address } +type linkListener interface { + init(*link) error +} + type linkInterfaceMsgIO interface { readMsg() ([]byte, error) writeMsg([]byte) (int, error) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 989480db..45b15f94 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -31,13 +31,12 @@ const default_timeout = 6 * time.Second const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. -type tcpInterface struct { +type tcp struct { link *link reconfigure chan chan error - serv net.Listener stop chan bool - addr string mutex sync.Mutex // Protecting the below + listeners map[string]net.Listener calls map[string]struct{} conns map[tcpInfo](chan struct{}) } @@ -52,7 +51,7 @@ type tcpInfo struct { } // Wrapper function to set additional options for specific connection types. -func (iface *tcpInterface) setExtraOptions(c net.Conn) { +func (t *tcp) setExtraOptions(c net.Conn) { switch sock := c.(type) { case *net.TCPConn: sock.SetNoDelay(true) @@ -62,62 +61,81 @@ func (iface *tcpInterface) setExtraOptions(c net.Conn) { } // Returns the address of the listener. -func (iface *tcpInterface) getAddr() *net.TCPAddr { - return iface.serv.Addr().(*net.TCPAddr) +func (t *tcp) getAddr() *net.TCPAddr { + for _, listener := range t.listeners { + return listener.Addr().(*net.TCPAddr) + } + return nil } // Attempts to initiate a connection to the provided address. -func (iface *tcpInterface) connect(addr string, intf string) { - iface.call(addr, nil, intf) +func (t *tcp) connect(addr string, intf string) { + t.call(addr, nil, intf) } // Attempst to initiate a connection to the provided address, viathe provided socks proxy address. -func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { - iface.call(peeraddr, &socksaddr, "") +func (t *tcp) connectSOCKS(socksaddr, peeraddr string) { + t.call(peeraddr, &socksaddr, "") } // Initializes the struct. -func (iface *tcpInterface) init(l *link) (err error) { - iface.link = l - iface.stop = make(chan bool, 1) - iface.reconfigure = make(chan chan error, 1) +func (t *tcp) init(l *link) error { + t.link = l + t.stop = make(chan bool, 1) + t.reconfigure = make(chan chan error, 1) + go func() { for { - e := <-iface.reconfigure - iface.link.core.configMutex.RLock() - updated := iface.link.core.config.Listen != iface.link.core.configOld.Listen - iface.link.core.configMutex.RUnlock() + e := <-t.reconfigure + t.link.core.configMutex.RLock() + //updated := t.link.core.config.Listen != t.link.core.configOld.Listen + updated := false + t.link.core.configMutex.RUnlock() if updated { - iface.stop <- true - iface.serv.Close() - e <- iface.listen() + /* t.stop <- true + for _, listener := range t.listeners { + listener.Close() + } + e <- t.listen() */ } else { e <- nil } } }() - return iface.listen() + t.mutex.Lock() + t.calls = make(map[string]struct{}) + t.conns = make(map[tcpInfo](chan struct{})) + t.listeners = make(map[string]net.Listener) + t.mutex.Unlock() + + t.link.core.configMutex.RLock() + defer t.link.core.configMutex.RUnlock() + for _, listenaddr := range t.link.core.config.Listen { + if listenaddr[:6] != "tcp://" { + continue + } + if err := t.listen(listenaddr[6:]); err != nil { + return err + } + } + + return nil } -func (iface *tcpInterface) listen() error { +func (t *tcp) listen(listenaddr string) error { var err error - iface.link.core.configMutex.RLock() - iface.addr = iface.link.core.config.Listen - iface.link.core.configMutex.RUnlock() - ctx := context.Background() lc := net.ListenConfig{ - Control: iface.tcpContext, + Control: t.tcpContext, } - iface.serv, err = lc.Listen(ctx, "tcp", iface.addr) + listener, err := lc.Listen(ctx, "tcp", listenaddr) if err == nil { - iface.mutex.Lock() - iface.calls = make(map[string]struct{}) - iface.conns = make(map[tcpInfo](chan struct{})) - iface.mutex.Unlock() - go iface.listener() + t.mutex.Lock() + t.listeners[listenaddr] = listener + t.mutex.Unlock() + go t.listener(listenaddr) return nil } @@ -125,41 +143,46 @@ func (iface *tcpInterface) listen() error { } // Runs the listener, which spawns off goroutines for incoming connections. -func (iface *tcpInterface) listener() { - defer iface.serv.Close() - iface.link.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String()) +func (t *tcp) listener(listenaddr string) { + listener, ok := t.listeners[listenaddr] + if !ok { + t.link.core.log.Errorln("Tried to start TCP listener for", listenaddr, "which doesn't exist") + return + } + defer listener.Close() + t.link.core.log.Infoln("Listening for TCP on:", listener.Addr().String()) for { - sock, err := iface.serv.Accept() + sock, err := listener.Accept() if err != nil { - iface.link.core.log.Errorln("Failed to accept connection:", err) + t.link.core.log.Errorln("Failed to accept connection:", err) return } select { - case <-iface.stop: - iface.link.core.log.Errorln("Stopping listener") + case <-t.stop: + t.link.core.log.Errorln("Stopping listener") return default: if err != nil { panic(err) } - go iface.handler(sock, true) + go t.handler(sock, true) } } } // Checks if we already have a connection to this node -func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool { - iface.mutex.Lock() - defer iface.mutex.Unlock() - _, isIn := iface.conns[info] +func (t *tcp) isAlreadyConnected(info tcpInfo) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + _, isIn := t.conns[info] return isIn } // Checks if we already are calling this address -func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { - iface.mutex.Lock() - defer iface.mutex.Unlock() - _, isIn := iface.calls[saddr] +func (t *tcp) isAlreadyCalling(saddr string) bool { + t.mutex.Lock() + defer t.mutex.Unlock() + _, isIn := t.calls[saddr] return isIn } @@ -168,25 +191,25 @@ func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { // If the dial is successful, it launches the handler. // When finished, it removes the outgoing call, so reconnection attempts can be made later. // This all happens in a separate goroutine that it spawns. -func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { +func (t *tcp) call(saddr string, socksaddr *string, sintf string) { go func() { callname := saddr if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if iface.isAlreadyCalling(callname) { + if t.isAlreadyCalling(callname) { return } - iface.mutex.Lock() - iface.calls[callname] = struct{}{} - iface.mutex.Unlock() + t.mutex.Lock() + t.calls[callname] = struct{}{} + t.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios time.Sleep(default_timeout) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - iface.mutex.Lock() - delete(iface.calls, callname) - iface.mutex.Unlock() + t.mutex.Lock() + delete(t.calls, callname) + t.mutex.Unlock() }() var conn net.Conn var err error @@ -212,7 +235,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { } } else { dialer := net.Dialer{ - Control: iface.tcpContext, + Control: t.tcpContext, } if sintf != "" { ief, err := net.InterfaceByName(sintf) @@ -267,25 +290,25 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { return } } - iface.handler(conn, false) + t.handler(conn, false) }() } -func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { +func (t *tcp) handler(sock net.Conn, incoming bool) { defer sock.Close() - iface.setExtraOptions(sock) + t.setExtraOptions(sock) stream := stream{} stream.init(sock) local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() name := "tcp://" + sock.RemoteAddr().String() - link, err := iface.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) + link, err := t.link.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) if err != nil { - iface.link.core.log.Println(err) + t.link.core.log.Println(err) panic(err) } - iface.link.core.log.Debugln("DEBUG: starting handler for", name) + t.link.core.log.Debugln("DEBUG: starting handler for", name) err = link.handler() - iface.link.core.log.Debugln("DEBUG: stopped handler for", name, err) + t.link.core.log.Debugln("DEBUG: stopped handler for", name, err) } diff --git a/src/yggdrasil/tcp_darwin.go b/src/yggdrasil/tcp_darwin.go index 6483ef86..9d55a1db 100644 --- a/src/yggdrasil/tcp_darwin.go +++ b/src/yggdrasil/tcp_darwin.go @@ -10,7 +10,7 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { +func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { var control error var recvanyif error diff --git a/src/yggdrasil/tcp_other.go b/src/yggdrasil/tcp_other.go index 5d62b530..47bd772c 100644 --- a/src/yggdrasil/tcp_other.go +++ b/src/yggdrasil/tcp_other.go @@ -8,6 +8,6 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { +func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { return nil }