From 493208fb378b76760c8cf6b7625ccbe0b60185d3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 3 Sep 2022 11:42:05 +0100 Subject: [PATCH] Refactor multicast setup (isolated config, etc) --- cmd/yggdrasil/main.go | 76 +++++++++------ contrib/mobile/mobile.go | 75 +++++++++------ src/core/api.go | 10 +- src/core/core.go | 8 +- src/core/core_test.go | 5 +- src/core/options.go | 16 ++-- src/multicast/multicast.go | 131 ++++++++++++-------------- src/multicast/multicast_darwin_cgo.go | 2 +- src/multicast/options.go | 28 ++++++ src/util/util.go | 14 +++ 10 files changed, 215 insertions(+), 150 deletions(-) create mode 100644 src/multicast/options.go diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 826df007..71453aaa 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -12,6 +12,7 @@ import ( "net" "os" "os/signal" + "regexp" "strings" "syscall" @@ -325,40 +326,59 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { default: } - // Setup the Yggdrasil node itself. The node{} type includes a Core, so we - // don't need to create this manually. - sk, err := hex.DecodeString(cfg.PrivateKey) - if err != nil { - panic(err) - } - options := []core.SetupOption{ - core.IfName(cfg.IfName), - core.IfMTU(cfg.IfMTU), - } - for _, peer := range cfg.Peers { - options = append(options, core.Peer{URI: peer}) - } - for intf, peers := range cfg.InterfacePeers { - for _, peer := range peers { - options = append(options, core.Peer{URI: peer, SourceInterface: intf}) - } - } - for _, allowed := range cfg.AllowedPublicKeys { - k, err := hex.DecodeString(allowed) + n := &node{config: cfg} + + // Setup the Yggdrasil node itself. + { + sk, err := hex.DecodeString(cfg.PrivateKey) + if err != nil { + panic(err) + } + options := []core.SetupOption{ + core.IfName(cfg.IfName), + core.IfMTU(cfg.IfMTU), + } + for _, peer := range cfg.Peers { + options = append(options, core.Peer{URI: peer}) + } + for intf, peers := range cfg.InterfacePeers { + for _, peer := range peers { + options = append(options, core.Peer{URI: peer, SourceInterface: intf}) + } + } + for _, allowed := range cfg.AllowedPublicKeys { + k, err := hex.DecodeString(allowed) + if err != nil { + panic(err) + } + options = append(options, core.AllowedPublicKey(k[:])) + } + n.core, err = core.New(sk[:], logger, options...) if err != nil { panic(err) } - options = append(options, core.AllowedPublicKey(k[:])) } - n := node{config: cfg} - n.core, err = core.New(sk[:], options...) - if err != nil { - panic(err) + + // Setup the multicast module. + { + options := []multicast.SetupOption{} + for _, intf := range cfg.MulticastInterfaces { + options = append(options, multicast.MulticastInterface{ + Regex: regexp.MustCompile(intf.Regex), + Beacon: intf.Beacon, + Listen: intf.Listen, + Port: intf.Port, + }) + } + n.multicast, err = multicast.New(n.core, logger, options...) + if err != nil { + panic(err) + } } + // Register the session firewall gatekeeper function // Allocate our modules n.admin = &admin.AdminSocket{} - n.multicast = &multicast.Multicast{} n.tuntap = &tuntap.TunAdapter{} // Start the admin socket if err := n.admin.Init(n.core, cfg, logger, nil); err != nil { @@ -368,10 +388,8 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { } n.admin.SetupAdminHandlers() // Start the multicast interface - if err := n.multicast.Init(n.core, cfg, logger, nil); err != nil { + if n.multicast, err = multicast.New(n.core, logger, nil); err != nil { logger.Errorln("An error occurred initialising multicast:", err) - } else if err := n.multicast.Start(); err != nil { - logger.Errorln("An error occurred starting multicast:", err) } n.multicast.SetupAdminHandlers(n.admin) // Start the TUN/TAP interface diff --git a/contrib/mobile/mobile.go b/contrib/mobile/mobile.go index 85535ee8..d7da03de 100644 --- a/contrib/mobile/mobile.go +++ b/contrib/mobile/mobile.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "regexp" "github.com/gologme/log" @@ -28,7 +29,7 @@ type Yggdrasil struct { core *core.Core iprwc *ipv6rwc.ReadWriteCloser config *config.NodeConfig - multicast multicast.Multicast + multicast *multicast.Multicast log MobileLogger } @@ -49,46 +50,60 @@ func (m *Yggdrasil) StartJSON(configjson []byte) error { return err } // Setup the Yggdrasil node itself. - sk, err := hex.DecodeString(m.config.PrivateKey) - if err != nil { - panic(err) - } - options := []core.SetupOption{ - core.IfName("none"), - core.IfMTU(m.config.IfMTU), - } - for _, peer := range m.config.Peers { - options = append(options, core.Peer{URI: peer}) - } - for intf, peers := range m.config.InterfacePeers { - for _, peer := range peers { - options = append(options, core.Peer{URI: peer, SourceInterface: intf}) - } - } - for _, allowed := range m.config.AllowedPublicKeys { - k, err := hex.DecodeString(allowed) + { + sk, err := hex.DecodeString(m.config.PrivateKey) + if err != nil { + panic(err) + } + options := []core.SetupOption{ + core.IfName("none"), + core.IfMTU(m.config.IfMTU), + } + for _, peer := range m.config.Peers { + options = append(options, core.Peer{URI: peer}) + } + for intf, peers := range m.config.InterfacePeers { + for _, peer := range peers { + options = append(options, core.Peer{URI: peer, SourceInterface: intf}) + } + } + for _, allowed := range m.config.AllowedPublicKeys { + k, err := hex.DecodeString(allowed) + if err != nil { + panic(err) + } + options = append(options, core.AllowedPublicKey(k[:])) + } + m.core, err = core.New(sk[:], logger, options...) if err != nil { panic(err) } - options = append(options, core.AllowedPublicKey(k[:])) } - m.core, err = core.New(sk[:], options...) - if err != nil { - panic(err) + + // Setup the multicast module. + if len(m.config.MulticastInterfaces) > 0 { + var err error + options := []multicast.SetupOption{} + for _, intf := range m.config.MulticastInterfaces { + options = append(options, multicast.MulticastInterface{ + Regex: regexp.MustCompile(intf.Regex), + Beacon: intf.Beacon, + Listen: intf.Listen, + Port: intf.Port, + }) + } + m.multicast, err = multicast.New(m.core, logger, options...) + if err != nil { + panic(err) + } } + mtu := m.config.IfMTU m.iprwc = ipv6rwc.NewReadWriteCloser(m.core) if m.iprwc.MaxMTU() < mtu { mtu = m.iprwc.MaxMTU() } m.iprwc.SetMTU(mtu) - if len(m.config.MulticastInterfaces) > 0 { - if err := m.multicast.Init(m.core, m.config, logger, nil); err != nil { - logger.Errorln("An error occurred initialising multicast:", err) - } else if err := m.multicast.Start(); err != nil { - logger.Errorln("An error occurred starting multicast:", err) - } - } return nil } diff --git a/src/core/api.go b/src/core/api.go index 3ab26ee5..679d43d5 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -15,8 +15,8 @@ import ( //"sort" //"time" - "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/address" + "github.com/yggdrasil-network/yggdrasil-go/src/util" //"github.com/yggdrasil-network/yggdrasil-go/src/crypto" //"github.com/Arceliar/phony" ) @@ -159,7 +159,7 @@ func (c *Core) Subnet() net.IPNet { // may be useful if you want to redirect the output later. Note that this // expects a Logger from the github.com/gologme/log package and not from Go's // built-in log package. -func (c *Core) SetLogger(log *log.Logger) { +func (c *Core) SetLogger(log util.Logger) { c.log = log } @@ -239,8 +239,10 @@ func (c *Core) RemovePeer(addr string, sintf string) error { // CallPeer calls a peer once. This should be specified in the peer URI format, // e.g.: -// tcp://a.b.c.d:e -// socks://a.b.c.d:e/f.g.h.i:j +// +// tcp://a.b.c.d:e +// socks://a.b.c.d:e/f.g.h.i:j +// // This does not add the peer to the peer list, so if the connection drops, the // peer will not be called again automatically. func (c *Core) CallPeer(u *url.URL, sintf string) error { diff --git a/src/core/core.go b/src/core/core.go index bc7ac831..47389a83 100644 --- a/src/core/core.go +++ b/src/core/core.go @@ -7,7 +7,6 @@ import ( "io" "net" "net/url" - "os" "time" iwe "github.com/Arceliar/ironwood/encrypted" @@ -15,6 +14,7 @@ import ( "github.com/Arceliar/phony" "github.com/gologme/log" + "github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/version" //"github.com/yggdrasil-network/yggdrasil-go/src/crypto" ) @@ -33,7 +33,7 @@ type Core struct { public ed25519.PublicKey links links proto protoHandler - log *log.Logger + log util.Logger addPeerTimer *time.Timer config struct { _peers map[Peer]struct{} // configurable after startup @@ -46,14 +46,14 @@ type Core struct { } } -func New(secret ed25519.PrivateKey, opts ...SetupOption) (*Core, error) { +func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*Core, error) { if len(secret) != ed25519.PrivateKeySize { return nil, fmt.Errorf("private key is incorrect length") } c := &Core{ secret: secret, public: secret.Public().(ed25519.PublicKey), - log: log.New(os.Stdout, "", 0), // TODO: not this + log: logger, } c.ctx, c.cancel = context.WithCancel(context.Background()) var err error diff --git a/src/core/core_test.go b/src/core/core_test.go index 823e5e95..ed5b4255 100644 --- a/src/core/core_test.go +++ b/src/core/core_test.go @@ -36,10 +36,11 @@ func CreateAndConnectTwo(t testing.TB, verbose bool) (nodeA *Core, nodeB *Core) if _, skB, err = ed25519.GenerateKey(nil); err != nil { t.Fatal(err) } - if nodeA, err = New(skA, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { + logger := GetLoggerWithPrefix("", false) + if nodeA, err = New(skA, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { t.Fatal(err) } - if nodeB, err = New(skB, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { + if nodeB, err = New(skB, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { t.Fatal(err) } diff --git a/src/core/options.go b/src/core/options.go index d46c9def..b3d06c63 100644 --- a/src/core/options.go +++ b/src/core/options.go @@ -30,7 +30,6 @@ type SetupOption interface { } type ListenAddress string -type AdminListenAddress string type Peer struct { URI string SourceInterface string @@ -41,11 +40,10 @@ type IfName string type IfMTU uint16 type AllowedPublicKey ed25519.PublicKey -func (a ListenAddress) isSetupOption() {} -func (a AdminListenAddress) isSetupOption() {} -func (a Peer) isSetupOption() {} -func (a NodeInfo) isSetupOption() {} -func (a NodeInfoPrivacy) isSetupOption() {} -func (a IfName) isSetupOption() {} -func (a IfMTU) isSetupOption() {} -func (a AllowedPublicKey) isSetupOption() {} +func (a ListenAddress) isSetupOption() {} +func (a Peer) isSetupOption() {} +func (a NodeInfo) isSetupOption() {} +func (a NodeInfoPrivacy) isSetupOption() {} +func (a IfName) isSetupOption() {} +func (a IfMTU) isSetupOption() {} +func (a AllowedPublicKey) isSetupOption() {} diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 9093e4cf..b6d290e1 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -9,13 +9,11 @@ import ( "fmt" "net" "net/url" - "regexp" "time" "github.com/Arceliar/phony" "github.com/gologme/log" - "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/core" "golang.org/x/net/ipv6" ) @@ -27,13 +25,15 @@ import ( type Multicast struct { phony.Inbox core *core.Core - config *config.NodeConfig log *log.Logger sock *ipv6.PacketConn - groupAddr string - listeners map[string]*listenerInfo - isOpen bool - _interfaces map[string]interfaceInfo + _isOpen bool + _listeners map[string]*listenerInfo + _interfaces map[string]*interfaceInfo + config struct { + _groupAddr GroupAddress + _interfaces map[MulticastInterface]struct{} + } } type interfaceInfo struct { @@ -51,40 +51,38 @@ type listenerInfo struct { port uint16 } -// Init prepares the multicast interface for use. -func (m *Multicast) Init(core *core.Core, nc *config.NodeConfig, log *log.Logger, options interface{}) error { - m.core = core - m.config = nc - m.log = log - m.listeners = make(map[string]*listenerInfo) - m._interfaces = make(map[string]interfaceInfo) - m.groupAddr = "[ff02::114]:9001" - return nil -} - // Start starts the multicast interface. This launches goroutines which will // listen for multicast beacons from other hosts and will advertise multicast // beacons out to the network. -func (m *Multicast) Start() error { +func New(core *core.Core, log *log.Logger, opts ...SetupOption) (*Multicast, error) { + m := &Multicast{ + core: core, + log: log, + _listeners: make(map[string]*listenerInfo), + _interfaces: make(map[string]*interfaceInfo), + } + m.config._interfaces = map[MulticastInterface]struct{}{} + m.config._groupAddr = GroupAddress("[ff02::114]:9001") + for _, opt := range opts { + m._applyOption(opt) + } var err error phony.Block(m, func() { err = m._start() }) - m.log.Debugln("Started multicast module") - return err + return m, err } func (m *Multicast) _start() error { - if m.isOpen { + if m._isOpen { return fmt.Errorf("multicast module is already started") } - m.config.RLock() - defer m.config.RUnlock() - if len(m.config.MulticastInterfaces) == 0 { + if len(m.config._interfaces) == 0 { return nil } m.log.Infoln("Starting multicast module") - addr, err := net.ResolveUDPAddr("udp", m.groupAddr) + defer m.log.Infoln("Started multicast module") + addr, err := net.ResolveUDPAddr("udp", string(m.config._groupAddr)) if err != nil { return err } @@ -101,7 +99,7 @@ func (m *Multicast) _start() error { // Windows can't set this flag, so we need to handle it in other ways } - m.isOpen = true + m._isOpen = true go m.listen() m.Act(nil, m._multicastStarted) m.Act(nil, m._announce) @@ -113,7 +111,7 @@ func (m *Multicast) _start() error { func (m *Multicast) IsStarted() bool { var isOpen bool phony.Block(m, func() { - isOpen = m.isOpen + isOpen = m._isOpen }) return isOpen } @@ -130,7 +128,7 @@ func (m *Multicast) Stop() error { func (m *Multicast) _stop() error { m.log.Infoln("Stopping multicast module") - m.isOpen = false + m._isOpen = false if m.sock != nil { m.sock.Close() } @@ -138,7 +136,7 @@ func (m *Multicast) _stop() error { } func (m *Multicast) _updateInterfaces() { - interfaces := m.getAllowedInterfaces() + interfaces := m._getAllowedInterfaces() for name, info := range interfaces { addrs, err := info.iface.Addrs() if err != nil { @@ -163,10 +161,8 @@ func (m *Multicast) Interfaces() map[string]net.Interface { } // getAllowedInterfaces returns the currently known/enabled multicast interfaces. -func (m *Multicast) getAllowedInterfaces() map[string]interfaceInfo { - interfaces := make(map[string]interfaceInfo) - // Get interface expressions from config - ifcfgs := m.config.MulticastInterfaces +func (m *Multicast) _getAllowedInterfaces() map[string]*interfaceInfo { + interfaces := make(map[string]*interfaceInfo) // Ask the system for network interfaces allifaces, err := net.Interfaces() if err != nil { @@ -176,62 +172,55 @@ func (m *Multicast) getAllowedInterfaces() map[string]interfaceInfo { } // Work out which interfaces to announce on for _, iface := range allifaces { - if iface.Flags&net.FlagUp == 0 { - // Ignore interfaces that are down - continue + switch { + case iface.Flags&net.FlagUp == 0: + continue // Ignore interfaces that are down + case iface.Flags&net.FlagMulticast == 0: + continue // Ignore non-multicast interfaces + case iface.Flags&net.FlagPointToPoint != 0: + continue // Ignore point-to-point interfaces } - if iface.Flags&net.FlagMulticast == 0 { - // Ignore non-multicast interfaces - continue - } - if iface.Flags&net.FlagPointToPoint != 0 { - // Ignore point-to-point interfaces - continue - } - for _, ifcfg := range ifcfgs { + for ifcfg := range m.config._interfaces { // Compile each regular expression - e, err := regexp.Compile(ifcfg.Regex) - if err != nil { - panic(err) - } // Does the interface match the regular expression? Store it if so - if e.MatchString(iface.Name) { - if ifcfg.Beacon || ifcfg.Listen { - info := interfaceInfo{ - iface: iface, - beacon: ifcfg.Beacon, - listen: ifcfg.Listen, - port: ifcfg.Port, - } - interfaces[iface.Name] = info - } - break + if !ifcfg.Beacon && !ifcfg.Listen { + continue } + if !ifcfg.Regex.MatchString(iface.Name) { + continue + } + interfaces[iface.Name] = &interfaceInfo{ + iface: iface, + beacon: ifcfg.Beacon, + listen: ifcfg.Listen, + port: ifcfg.Port, + } + break } } return interfaces } func (m *Multicast) _announce() { - if !m.isOpen { + if !m._isOpen { return } m._updateInterfaces() - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) + groupAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr)) if err != nil { panic(err) } - destAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) + destAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr)) if err != nil { panic(err) } // There might be interfaces that we configured listeners for but are no // longer up - if that's the case then we should stop the listeners - for name, info := range m.listeners { + for name, info := range m._listeners { // Prepare our stop function! stop := func() { info.listener.Stop() - delete(m.listeners, name) + delete(m._listeners, name) m.log.Debugln("No longer multicasting on", name) } // If the interface is no longer visible on the system then stop the @@ -290,7 +279,7 @@ func (m *Multicast) _announce() { } // Try and see if we already have a TCP listener for this interface var linfo *listenerInfo - if nfo, ok := m.listeners[iface.Name]; !ok || nfo.listener.Listener == nil { + if nfo, ok := m._listeners[iface.Name]; !ok || nfo.listener.Listener == nil { // No listener was found - let's create one urlString := fmt.Sprintf("tls://[%s]:%d", addrIP, info.port) u, err := url.Parse(urlString) @@ -301,13 +290,13 @@ func (m *Multicast) _announce() { m.log.Debugln("Started multicasting on", iface.Name) // Store the listener so that we can stop it later if needed linfo = &listenerInfo{listener: li, time: time.Now(), port: info.port} - m.listeners[iface.Name] = linfo + m._listeners[iface.Name] = linfo } else { m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) } } else { // An existing listener was found - linfo = m.listeners[iface.Name] + linfo = m._listeners[iface.Name] } // Make sure nothing above failed for some reason if linfo == nil { @@ -340,7 +329,7 @@ func (m *Multicast) _announce() { } func (m *Multicast) listen() { - groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) + groupAddr, err := net.ResolveUDPAddr("udp6", string(m.config._groupAddr)) if err != nil { panic(err) } @@ -388,7 +377,7 @@ func (m *Multicast) listen() { if !from.IP.Equal(addr.IP) { continue } - var interfaces map[string]interfaceInfo + var interfaces map[string]*interfaceInfo phony.Block(m, func() { interfaces = m._interfaces }) diff --git a/src/multicast/multicast_darwin_cgo.go b/src/multicast/multicast_darwin_cgo.go index b7d7358c..5c2af7ab 100644 --- a/src/multicast/multicast_darwin_cgo.go +++ b/src/multicast/multicast_darwin_cgo.go @@ -31,7 +31,7 @@ import ( ) func (m *Multicast) _multicastStarted() { - if !m.isOpen { + if !m._isOpen { return } C.StopAWDLBrowsing() diff --git a/src/multicast/options.go b/src/multicast/options.go new file mode 100644 index 00000000..a03b0677 --- /dev/null +++ b/src/multicast/options.go @@ -0,0 +1,28 @@ +package multicast + +import "regexp" + +func (m *Multicast) _applyOption(opt SetupOption) { + switch v := opt.(type) { + case MulticastInterface: + m.config._interfaces[v] = struct{}{} + case GroupAddress: + m.config._groupAddr = v + } +} + +type SetupOption interface { + isSetupOption() +} + +type MulticastInterface struct { + Regex *regexp.Regexp + Beacon bool + Listen bool + Port uint16 +} + +type GroupAddress string + +func (a MulticastInterface) isSetupOption() {} +func (a GroupAddress) isSetupOption() {} diff --git a/src/util/util.go b/src/util/util.go index 507426d0..e2e21464 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -8,6 +8,20 @@ import ( "time" ) +// Any logger that satisfies this interface is suitable for Yggdrasil. +type Logger interface { + Printf(string, ...interface{}) + Println(...interface{}) + Infof(string, ...interface{}) + Infoln(...interface{}) + Warnf(string, ...interface{}) + Warnln(...interface{}) + Errorf(string, ...interface{}) + Errorln(...interface{}) + Debugf(string, ...interface{}) + Debugln(...interface{}) +} + // TimerStop stops a timer and makes sure the channel is drained, returns true if the timer was stopped before firing. func TimerStop(t *time.Timer) bool { stopped := t.Stop()