Support notifying components for config reload, listen for SIGHUP

This commit is contained in:
Neil Alexander 2018-12-29 18:51:51 +00:00
parent b4a7dab34d
commit 219fb96553
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
10 changed files with 189 additions and 36 deletions

View File

@ -314,7 +314,9 @@ func main() {
logger.Printf("Your IPv6 subnet is %s", subnet.String())
// Catch interrupts from the operating system to exit gracefully.
c := make(chan os.Signal, 1)
r := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
signal.Notify(r, os.Interrupt, syscall.SIGHUP)
// Create a function to capture the service being stopped on Windows.
winTerminate := func() {
c <- os.Interrupt
@ -322,5 +324,13 @@ func main() {
minwinsvc.SetOnExit(winTerminate)
// Wait for the terminate/interrupt signal. Once a signal is received, the
// deferred Stop function above will run which will shut down TUN/TAP.
<-c
for {
select {
case _ = <-r:
n.core.UpdateConfig(cfg)
case _ = <-c:
goto exit
}
}
exit:
}

View File

@ -22,10 +22,11 @@ import (
// TODO: Add authentication
type admin struct {
core *Core
listenaddr string
listener net.Listener
handlers []admin_handlerInfo
core *Core
reconfigure chan bool
listenaddr string
listener net.Listener
handlers []admin_handlerInfo
}
type admin_info map[string]interface{}
@ -53,6 +54,21 @@ func (a *admin) addHandler(name string, args []string, handler func(admin_info)
// init runs the initial admin setup.
func (a *admin) init(c *Core, listenaddr string) {
a.core = c
a.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case _ = <-a.reconfigure:
a.core.configMutex.RLock()
a.core.log.Println("Notified: admin")
if a.core.config.AdminListen != a.core.configOld.AdminListen {
a.core.log.Println("AdminListen has changed!")
}
a.core.configMutex.RUnlock()
continue
}
}
}()
a.listenaddr = listenaddr
a.addHandler("list", []string{}, func(in admin_info) (admin_info, error) {
handlers := make(map[string]interface{})

View File

@ -7,6 +7,7 @@ import (
"log"
"net"
"regexp"
"sync"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
@ -17,14 +18,26 @@ import (
var buildName string
var buildVersion string
type module interface {
init(*config.NodeConfig) error
start() error
}
// The Core object represents the Yggdrasil node. You should create a Core
// object for each Yggdrasil node you plan to run.
type Core struct {
// This is the main data structure that holds everything else for a node
boxPub crypto.BoxPubKey
boxPriv crypto.BoxPrivKey
sigPub crypto.SigPubKey
sigPriv crypto.SigPrivKey
// We're going to keep our own copy of the provided config - that way we can
// guarantee that it will be covered by the mutex
config config.NodeConfig // Active config
configOld config.NodeConfig // Previous config
configMutex sync.RWMutex // Protects both config and configOld
// Core-specific config
boxPub crypto.BoxPubKey
boxPriv crypto.BoxPrivKey
sigPub crypto.SigPubKey
sigPriv crypto.SigPrivKey
// Modules
switchTable switchTable
peers peers
sessions sessions
@ -35,8 +48,9 @@ type Core struct {
multicast multicast
nodeinfo nodeinfo
tcp tcpInterface
log *log.Logger
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
// Other bits
log *log.Logger
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
}
func (c *Core) init(bpub *crypto.BoxPubKey,
@ -62,8 +76,26 @@ func (c *Core) init(bpub *crypto.BoxPubKey,
c.switchTable.init(c, c.sigPub) // TODO move before peers? before router?
}
// Get the current build name. This is usually injected if built from git,
// or returns "unknown" otherwise.
// UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.configMutex.Lock()
c.configOld = c.config
c.config = *config
c.configMutex.Unlock()
c.admin.reconfigure <- true
c.searches.reconfigure <- true
c.dht.reconfigure <- true
c.sessions.reconfigure <- true
c.multicast.reconfigure <- true
c.peers.reconfigure <- true
c.router.reconfigure <- true
c.switchTable.reconfigure <- true
}
// GetBuildName gets the current build name. This is usually injected if built
// from git, or returns "unknown" otherwise.
func GetBuildName() string {
if buildName == "" {
return "unknown"
@ -96,6 +128,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
c.log.Println("Starting up...")
c.configMutex.Lock()
c.config = *nc
c.configOld = c.config
c.configMutex.Unlock()
var boxPub crypto.BoxPubKey
var boxPriv crypto.BoxPrivKey
var sigPub crypto.SigPubKey

View File

@ -65,11 +65,12 @@ type dhtReqKey struct {
// The main DHT struct.
type dht struct {
core *Core
nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
core *Core
reconfigure chan bool
nodeID crypto.NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar...
table map[crypto.NodeID]*dhtInfo
imp []*dhtInfo
@ -78,6 +79,18 @@ type dht struct {
// Initializes the DHT.
func (t *dht) init(c *Core) {
t.core = c
t.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case _ = <-t.reconfigure:
t.core.configMutex.RLock()
t.core.log.Println("Notified: dht")
t.core.configMutex.RUnlock()
continue
}
}
}()
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey]dht_callbackInfo)

View File

@ -10,13 +10,26 @@ import (
)
type multicast struct {
core *Core
sock *ipv6.PacketConn
groupAddr string
core *Core
reconfigure chan bool
sock *ipv6.PacketConn
groupAddr string
}
func (m *multicast) init(core *Core) {
m.core = core
m.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case _ = <-m.reconfigure:
m.core.configMutex.RLock()
m.core.log.Println("Notified: multicast")
m.core.configMutex.RUnlock()
continue
}
}
}()
m.groupAddr = "[ff02::114]:9001"
// Check if we've been given any expressions
if len(m.core.ifceExpr) == 0 {

View File

@ -19,6 +19,7 @@ import (
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
type peers struct {
core *Core
reconfigure chan bool
mutex sync.Mutex // Synchronize writes to atomic
ports atomic.Value //map[switchPort]*peer, use CoW semantics
authMutex sync.RWMutex
@ -31,6 +32,18 @@ func (ps *peers) init(c *Core) {
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c
ps.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case _ = <-ps.reconfigure:
ps.core.configMutex.RLock()
ps.core.log.Println("Notified: peers")
ps.core.configMutex.RUnlock()
continue
}
}
}()
ps.allowedEncryptionPublicKeys = make(map[crypto.BoxPubKey]struct{})
}

View File

@ -37,19 +37,20 @@ import (
// The router struct has channels to/from the tun/tap device and a self peer (0), which is how messages are passed between this node and the peers/switch layer.
// The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions.
type router struct {
core *Core
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
toRecv chan router_recvPacket // packets to handle via recvPacket()
tun tunAdapter // TUN/TAP adapter
adapters []Adapter // Other adapters
recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
core *Core
reconfigure chan bool
addr address.Address
subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in"
toRecv chan router_recvPacket // packets to handle via recvPacket()
tun tunAdapter // TUN/TAP adapter
adapters []Adapter // Other adapters
recv chan<- []byte // place where the tun pulls received packets from
send <-chan []byte // place where the tun puts outgoing packets
reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff
cryptokey cryptokey
}
// Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the tun.
@ -61,6 +62,7 @@ type router_recvPacket struct {
// Initializes the router struct, which includes setting up channels to/from the tun/tap.
func (r *router) init(core *Core) {
r.core = core
r.reconfigure = make(chan bool, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this...
@ -124,6 +126,11 @@ func (r *router) mainLoop() {
}
case f := <-r.admin:
f()
case _ = <-r.reconfigure:
r.core.configMutex.RLock()
r.core.log.Println("Notified: router")
r.core.configMutex.RUnlock()
continue
}
}
}

View File

@ -42,13 +42,26 @@ type searchInfo struct {
// This stores a map of active searches.
type searches struct {
core *Core
searches map[crypto.NodeID]*searchInfo
core *Core
reconfigure chan bool
searches map[crypto.NodeID]*searchInfo
}
// Intializes the searches struct.
func (s *searches) init(core *Core) {
s.core = core
s.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case _ = <-s.reconfigure:
s.core.configMutex.RLock()
s.core.log.Println("Notified: searches")
s.core.configMutex.RUnlock()
continue
}
}
}()
s.searches = make(map[crypto.NodeID]*searchInfo)
}

View File

@ -18,6 +18,7 @@ import (
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct {
core *Core
reconfigure chan bool
theirAddr address.Address
theirSubnet address.Subnet
theirPermPub crypto.BoxPubKey
@ -101,6 +102,7 @@ func (s *sessionInfo) timedout() bool {
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
type sessions struct {
core *Core
reconfigure chan bool
lastCleanup time.Time
// Maps known permanent keys to their shared key, used by DHT a lot
permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey
@ -124,6 +126,22 @@ type sessions struct {
// Initializes the session struct.
func (ss *sessions) init(core *Core) {
ss.core = core
ss.reconfigure = make(chan bool, 1)
go func() {
for {
select {
case newConfig := <-ss.reconfigure:
ss.core.configMutex.RLock()
ss.core.log.Println("Notified: sessions")
ss.core.configMutex.RUnlock()
for _, sinfo := range ss.sinfos {
sinfo.reconfigure <- newConfig
}
continue
}
}
}()
ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
ss.sinfos = make(map[crypto.Handle]*sessionInfo)
ss.byMySes = make(map[crypto.BoxPubKey]*crypto.Handle)
@ -271,6 +289,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
}
sinfo := sessionInfo{}
sinfo.core = ss.core
sinfo.reconfigure = make(chan bool, 1)
sinfo.theirPermPub = *theirPermKey
pub, priv := crypto.NewBoxKeys()
sinfo.mySesPub = *pub
@ -539,6 +558,11 @@ func (sinfo *sessionInfo) doWorker() {
} else {
return
}
case _ = <-sinfo.reconfigure:
sinfo.core.configMutex.RLock()
sinfo.core.log.Println("Notified: sessionInfo")
sinfo.core.configMutex.RUnlock()
continue
}
}
}

View File

@ -162,6 +162,7 @@ type switchData struct {
// All the information stored by the switch.
type switchTable struct {
core *Core
reconfigure chan bool
key crypto.SigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
@ -184,6 +185,7 @@ const SwitchQueueTotalMinSize = 4 * 1024 * 1024
func (t *switchTable) init(core *Core, key crypto.SigPubKey) {
now := time.Now()
t.core = core
t.reconfigure = make(chan bool, 1)
t.key = key
locator := switchLocator{root: key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo)
@ -808,6 +810,11 @@ func (t *switchTable) doWorker() {
}
case f := <-t.admin:
f()
case _ = <-t.reconfigure:
t.core.configMutex.RLock()
t.core.log.Println("Notified: switchTable")
t.core.configMutex.RUnlock()
continue
}
}
}