From 219fb96553a0f647036657f99fc7ed6cfbe00d18 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 29 Dec 2018 18:51:51 +0000 Subject: [PATCH] Support notifying components for config reload, listen for SIGHUP --- cmd/yggdrasil/main.go | 12 ++++++++- src/yggdrasil/admin.go | 24 ++++++++++++++--- src/yggdrasil/core.go | 53 ++++++++++++++++++++++++++++++++------ src/yggdrasil/dht.go | 23 +++++++++++++---- src/yggdrasil/multicast.go | 19 +++++++++++--- src/yggdrasil/peer.go | 13 ++++++++++ src/yggdrasil/router.go | 33 ++++++++++++++---------- src/yggdrasil/search.go | 17 ++++++++++-- src/yggdrasil/session.go | 24 +++++++++++++++++ src/yggdrasil/switch.go | 7 +++++ 10 files changed, 189 insertions(+), 36 deletions(-) diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 2b6d2f04..e98e6238 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -314,7 +314,9 @@ func main() { logger.Printf("Your IPv6 subnet is %s", subnet.String()) // Catch interrupts from the operating system to exit gracefully. c := make(chan os.Signal, 1) + r := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(r, os.Interrupt, syscall.SIGHUP) // Create a function to capture the service being stopped on Windows. winTerminate := func() { c <- os.Interrupt @@ -322,5 +324,13 @@ func main() { minwinsvc.SetOnExit(winTerminate) // Wait for the terminate/interrupt signal. Once a signal is received, the // deferred Stop function above will run which will shut down TUN/TAP. - <-c + for { + select { + case _ = <-r: + n.core.UpdateConfig(cfg) + case _ = <-c: + goto exit + } + } +exit: } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index bd3c9051..1c8c80e4 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -22,10 +22,11 @@ import ( // TODO: Add authentication type admin struct { - core *Core - listenaddr string - listener net.Listener - handlers []admin_handlerInfo + core *Core + reconfigure chan bool + listenaddr string + listener net.Listener + handlers []admin_handlerInfo } type admin_info map[string]interface{} @@ -53,6 +54,21 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info) // init runs the initial admin setup. func (a *admin) init(c *Core, listenaddr string) { a.core = c + a.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-a.reconfigure: + a.core.configMutex.RLock() + a.core.log.Println("Notified: admin") + if a.core.config.AdminListen != a.core.configOld.AdminListen { + a.core.log.Println("AdminListen has changed!") + } + a.core.configMutex.RUnlock() + continue + } + } + }() a.listenaddr = listenaddr a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) { handlers := make(map[string]interface{}) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index e38274fa..9e4bb628 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -7,6 +7,7 @@ import ( "log" "net" "regexp" + "sync" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -17,14 +18,26 @@ import ( var buildName string var buildVersion string +type module interface { + init(*config.NodeConfig) error + start() error +} + // The Core object represents the Yggdrasil node. You should create a Core // object for each Yggdrasil node you plan to run. type Core struct { // This is the main data structure that holds everything else for a node - boxPub crypto.BoxPubKey - boxPriv crypto.BoxPrivKey - sigPub crypto.SigPubKey - sigPriv crypto.SigPrivKey + // We're going to keep our own copy of the provided config - that way we can + // guarantee that it will be covered by the mutex + config config.NodeConfig // Active config + configOld config.NodeConfig // Previous config + configMutex sync.RWMutex // Protects both config and configOld + // Core-specific config + boxPub crypto.BoxPubKey + boxPriv crypto.BoxPrivKey + sigPub crypto.SigPubKey + sigPriv crypto.SigPrivKey + // Modules switchTable switchTable peers peers sessions sessions @@ -35,8 +48,9 @@ type Core struct { multicast multicast nodeinfo nodeinfo tcp tcpInterface - log *log.Logger - ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this + // Other bits + log *log.Logger + ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } func (c *Core) init(bpub *crypto.BoxPubKey, @@ -62,8 +76,26 @@ func (c *Core) init(bpub *crypto.BoxPubKey, c.switchTable.init(c, c.sigPub) // TODO move before peers? before router? } -// Get the current build name. This is usually injected if built from git, -// or returns "unknown" otherwise. +// UpdateConfig updates the configuration in Core and then signals the +// various module goroutines to reconfigure themselves if needed +func (c *Core) UpdateConfig(config *config.NodeConfig) { + c.configMutex.Lock() + c.configOld = c.config + c.config = *config + c.configMutex.Unlock() + + c.admin.reconfigure <- true + c.searches.reconfigure <- true + c.dht.reconfigure <- true + c.sessions.reconfigure <- true + c.multicast.reconfigure <- true + c.peers.reconfigure <- true + c.router.reconfigure <- true + c.switchTable.reconfigure <- true +} + +// GetBuildName gets the current build name. This is usually injected if built +// from git, or returns "unknown" otherwise. func GetBuildName() string { if buildName == "" { return "unknown" @@ -96,6 +128,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.log.Println("Starting up...") + c.configMutex.Lock() + c.config = *nc + c.configOld = c.config + c.configMutex.Unlock() + var boxPub crypto.BoxPubKey var boxPriv crypto.BoxPrivKey var sigPub crypto.SigPubKey diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b52a820b..3f2debdb 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -65,11 +65,12 @@ type dhtReqKey struct { // The main DHT struct. type dht struct { - core *Core - nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks + core *Core + reconfigure chan bool + nodeID crypto.NodeID + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -78,6 +79,18 @@ type dht struct { // Initializes the DHT. func (t *dht) init(c *Core) { t.core = c + t.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-t.reconfigure: + t.core.configMutex.RLock() + t.core.log.Println("Notified: dht") + t.core.configMutex.RUnlock() + continue + } + } + }() t.nodeID = *t.core.GetNodeID() t.peers = make(chan *dhtInfo, 1024) t.callbacks = make(map[dhtReqKey]dht_callbackInfo) diff --git a/src/yggdrasil/multicast.go b/src/yggdrasil/multicast.go index 749dfcdb..3d73237f 100644 --- a/src/yggdrasil/multicast.go +++ b/src/yggdrasil/multicast.go @@ -10,13 +10,26 @@ import ( ) type multicast struct { - core *Core - sock *ipv6.PacketConn - groupAddr string + core *Core + reconfigure chan bool + sock *ipv6.PacketConn + groupAddr string } func (m *multicast) init(core *Core) { m.core = core + m.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-m.reconfigure: + m.core.configMutex.RLock() + m.core.log.Println("Notified: multicast") + m.core.configMutex.RUnlock() + continue + } + } + }() m.groupAddr = "[ff02::114]:9001" // Check if we've been given any expressions if len(m.core.ifceExpr) == 0 { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index a2b94b67..502ea67e 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -19,6 +19,7 @@ import ( // In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. type peers struct { core *Core + reconfigure chan bool mutex sync.Mutex // Synchronize writes to atomic ports atomic.Value //map[switchPort]*peer, use CoW semantics authMutex sync.RWMutex @@ -31,6 +32,18 @@ func (ps *peers) init(c *Core) { defer ps.mutex.Unlock() ps.putPorts(make(map[switchPort]*peer)) ps.core = c + ps.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-ps.reconfigure: + ps.core.configMutex.RLock() + ps.core.log.Println("Notified: peers") + ps.core.configMutex.RUnlock() + continue + } + } + }() ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{}) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 87da8829..096a9785 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -37,19 +37,20 @@ import ( // The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { - core *Core - addr address.Address - subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - toRecv chan router_recvPacket // packets to handle via recvPacket() - tun tunAdapter // TUN/TAP adapter - adapters []Adapter // Other adapters - recv chan<- []byte // place where the tun pulls received packets from - send <-chan []byte // place where the tun puts outgoing packets - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff - cryptokey cryptokey + core *Core + reconfigure chan bool + addr address.Address + subnet address.Subnet + in <-chan []byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + toRecv chan router_recvPacket // packets to handle via recvPacket() + tun tunAdapter // TUN/TAP adapter + adapters []Adapter // Other adapters + recv chan<- []byte // place where the tun pulls received packets from + send <-chan []byte // place where the tun puts outgoing packets + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff + cryptokey cryptokey } // Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun. @@ -61,6 +62,7 @@ type router_recvPacket struct { // Initializes the router struct, which includes setting up channels to/from the tun/tap. func (r *router) init(core *Core) { r.core = core + r.reconfigure = make(chan bool, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... @@ -124,6 +126,11 @@ func (r *router) mainLoop() { } case f := <-r.admin: f() + case _ = <-r.reconfigure: + r.core.configMutex.RLock() + r.core.log.Println("Notified: router") + r.core.configMutex.RUnlock() + continue } } } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index c85b719c..f522d7b9 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -42,13 +42,26 @@ type searchInfo struct { // This stores a map of active searches. type searches struct { - core *Core - searches map[crypto.NodeID]*searchInfo + core *Core + reconfigure chan bool + searches map[crypto.NodeID]*searchInfo } // Intializes the searches struct. func (s *searches) init(core *Core) { s.core = core + s.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case _ = <-s.reconfigure: + s.core.configMutex.RLock() + s.core.log.Println("Notified: searches") + s.core.configMutex.RUnlock() + continue + } + } + }() s.searches = make(map[crypto.NodeID]*searchInfo) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 4f395b07..78b36ecd 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -18,6 +18,7 @@ import ( // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { core *Core + reconfigure chan bool theirAddr address.Address theirSubnet address.Subnet theirPermPub crypto.BoxPubKey @@ -101,6 +102,7 @@ func (s *sessionInfo) timedout() bool { // Additionally, stores maps of address/subnet onto keys, and keys onto handles. type sessions struct { core *Core + reconfigure chan bool lastCleanup time.Time // Maps known permanent keys to their shared key, used by DHT a lot permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey @@ -124,6 +126,22 @@ type sessions struct { // Initializes the session struct. func (ss *sessions) init(core *Core) { ss.core = core + ss.reconfigure = make(chan bool, 1) + go func() { + for { + select { + case newConfig := <-ss.reconfigure: + ss.core.configMutex.RLock() + ss.core.log.Println("Notified: sessions") + ss.core.configMutex.RUnlock() + + for _, sinfo := range ss.sinfos { + sinfo.reconfigure <- newConfig + } + continue + } + } + }() ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) ss.sinfos = make(map[crypto.Handle]*sessionInfo) ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle) @@ -271,6 +289,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { } sinfo := sessionInfo{} sinfo.core = ss.core + sinfo.reconfigure = make(chan bool, 1) sinfo.theirPermPub = *theirPermKey pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub @@ -539,6 +558,11 @@ func (sinfo *sessionInfo) doWorker() { } else { return } + case _ = <-sinfo.reconfigure: + sinfo.core.configMutex.RLock() + sinfo.core.log.Println("Notified: sessionInfo") + sinfo.core.configMutex.RUnlock() + continue } } } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 3c1dae61..420392b6 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -162,6 +162,7 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { core *Core + reconfigure chan bool key crypto.SigPubKey // Our own key time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root @@ -184,6 +185,7 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024 func (t *switchTable) init(core *Core, key crypto.SigPubKey) { now := time.Now() t.core = core + t.reconfigure = make(chan bool, 1) t.key = key locator := switchLocator{root: key, tstamp: now.Unix()} peers := make(map[switchPort]peerInfo) @@ -808,6 +810,11 @@ func (t *switchTable) doWorker() { } case f := <-t.admin: f() + case _ = <-t.reconfigure: + t.core.configMutex.RLock() + t.core.log.Println("Notified: switchTable") + t.core.configMutex.RUnlock() + continue } } }