More misc work on pineconemanager

I think we might be nearly there for this part!
This commit is contained in:
0x1a8510f2 2022-07-04 06:13:50 +01:00
parent 5b2b380749
commit 78510267d2
Signed by: 0x1a8510f2
GPG Key ID: 1C692E355D76775D
4 changed files with 322 additions and 191 deletions

View File

@ -0,0 +1,132 @@
package misc
import (
"sync"
"sync/atomic"
)
// The following code has been near-directly copied from the Go standard
// library and, as such, the below copyright notice and license apply
// only to the code between the following tags:
// <not-my-code>
/*
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
type CheckableOnce struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m sync.Mutex
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *CheckableOnce) Do(f func()) {
// Note: Here is an incorrect implementation of Do:
//
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
// f()
// }
//
// Do guarantees that when it returns, f has finished.
// This implementation would not implement that guarantee:
// given two simultaneous calls, the winner of the cas would
// call f, and the second would return immediately, without
// waiting for the first's call to f to complete.
// This is why the slow path falls back to a mutex, and why
// the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (co *CheckableOnce) doSlow(f func()) {
co.m.Lock()
defer co.m.Unlock()
if co.done == 0 {
defer atomic.StoreUint32(&co.done, 1)
f()
}
}
// </not-my-code>
// True if Do() was called and finished executing. False if Do()
// has not been called. Blocks if Do() is currently executing.
func (co *CheckableOnce) Done() bool {
co.m.Lock()
defer co.m.Unlock()
return atomic.LoadUint32(&co.done) == 1
}
// True if Do() is currently in progress. False otherwise.
func (o *CheckableOnce) Doing() bool {
var locked bool
if locked = o.m.TryLock(); locked {
defer o.m.Unlock()
}
return !locked
}
// True if Do() was called. False otherwise.
func (o *CheckableOnce) DoingOrDone() bool {
if o.m.TryLock() {
defer o.m.Unlock()
return atomic.LoadUint32(&o.done) == 1
}
return false
}

View File

@ -1,10 +0,0 @@
package misc
// Simple function which gets rid of error return values from functions when
// we know they definitely won't error. If they error anyway, panic.
func NoError[T any](value T, err error) T {
if err != nil {
panic("cannot discard non-nil error")
}
return value
}

View File

@ -0,0 +1,52 @@
package pineconemanager
type pineconeManagerConfOption int
const (
// The private key for this pinecone peer; effectively its "identity".
CONF_PINECONE_IDENTITY pineconeManagerConfOption = iota
// A logger instance which is passed on to pinecone.
CONF_LOGGER
// The address to listen on for incoming pinecone connections. If this
// is an empty string, the node does not listen for connections and
// multicast is also disabled (so the node can only connect to peers
// outbound and cannot receive peer connections).
CONF_INBOUND_ADDR
// The address to listen on for inbound HTTP. This allows peers to connect
// to this node over websockets and exposes a debugging endpoint if enabled
// via `WebserverDebugPath`. Additional routes can be configured via
// `WebserverHandlers`. The webserver is disabled if this option is an empty
// string.
CONF_WEBSERVER_ADDR
// A path on the webserver to expose debugging information at. If this is an
// empty string, the node does not expose debugging information. This setting
// depends on the webserver being enabled.
CONF_WEBSERVER_DEBUG_PATH
// Whether to advertise this peer on the local network via multicast. This allows
// for peers to find each other locally but may require modifications to firewall
// rules. This option is always disabled if `InboundAddr` is not set.
CONF_USE_MULTICAST
// A list of protocols to advertise as supported by this node over pinecone.
CONF_WRAPPED_PROTOS
// A list of pinecone nodes with known addresses which this node can connect to
// for a more stable connection to the network.
CONF_STATIC_PEERS
// Additional handlers added to the webserver. This option exists mainly for
// efficiency, to allow nodes which also need to run a regular webserver to
// use the one used by pinecone for websockets. This saves allocating another
// port and other system resources.
CONF_WEBSERVER_HANDLERS
// Because this is at the bottom, it will automatically hold the value representing
// the number of config options available. This is useful to create an array for
// config options.
conf_option_count
)

View File

@ -4,7 +4,6 @@ import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"fmt"
"io"
"log"
@ -23,67 +22,15 @@ import (
"github.com/sirupsen/logrus"
)
type pineconeManagerConfOption int
const (
// The private key for this pinecone peer; effectively its "identity".
CONF_PINECONE_IDENTITY pineconeManagerConfOption = iota
// A logger instance which is passed on to pinecone.
CONF_LOGGER
// The address to listen on for incoming pinecone connections. If this
// is an empty string, the node does not listen for connections and
// multicast is also disabled (so the node can only connect to peers
// outbound and cannot receive peer connections).
CONF_INBOUND_ADDR
// The address to listen on for inbound HTTP. This allows peers to connect
// to this node over websockets and exposes a debugging endpoint if enabled
// via `WebserverDebugPath`. Additional routes can be configured via
// `WebserverHandlers`. The webserver is disabled if this option is an empty
// string.
CONF_WEBSERVER_ADDR
// A path on the webserver to expose debugging information at. If this is an
// empty string, the node does not expose debugging information. This setting
// depends on the webserver being enabled.
CONF_WEBSERVER_DEBUG_PATH
// Whether to advertise this peer on the local network via multicast. This allows
// for peers to find each other locally but may require modifications to firewall
// rules. This option is always disabled if `InboundAddr` is not set.
CONF_USE_MULTICAST
// A list of protocols to advertise as supported by this node over pinecone.
CONF_WRAPPED_PROTOS
// A list of pinecone nodes with known addresses which this node can connect to
// for a more stable connection to the network.
CONF_STATIC_PEERS
// Additional handlers added to the webserver. This option exists mainly for
// efficiency, to allow nodes which also need to run a regular webserver to
// use the one used by pinecone for websockets. This saves allocating another
// port and other system resources.
CONF_WEBSERVER_HANDLERS
// Because this is at the bottom, it will automatically hold the value representing
// the number of config options available. This is useful to create an array for
// config options.
conf_option_count
)
type pineconeManager struct {
// Once instances ensuring that each method is only executed once at a given time.
startOnce sync.Once
stopOnce sync.Once
restartOnce sync.Once
startOnce misc.CheckableOnce
stopOnce misc.CheckableOnce
restartOnce misc.CheckableOnce
// A context and related fields which control the lifetime of the pinecone manager.
ctx context.Context
ctxCancel context.CancelFunc
ctxLock sync.RWMutex
// Communication channels.
reqExit chan struct{}
ackExit chan struct{}
// An array of config options for the manager and a lock to make it thread-safe.
conf [conf_option_count]any
@ -91,17 +38,11 @@ type pineconeManager struct {
}
// Read a config option of the pinecone manager. This is thread-safe.
func (pm *pineconeManager) ConfGet(confId pineconeManagerConfOption) (any, error) {
func (pm *pineconeManager) ConfGet(confId pineconeManagerConfOption) any {
defer pm.confLock.RUnlock()
pm.confLock.RLock()
// Make sure we're not writing out-of-bounds (though this should never really
// happen unless we did something wrong in this module specifically).
if confId > conf_option_count-1 {
return nil, fmt.Errorf("config option %d does not exist", confId)
}
return pm.conf[confId], nil
return pm.conf[confId]
}
// Set a config option of the pinecone manager. This is thead-safe. Note that the
@ -119,6 +60,7 @@ func (pm *pineconeManager) ConfSet(confId pineconeManagerConfOption, confVal any
invalidTypeErr := fmt.Errorf("invalid type for config value %d", confId)
// Validate config values before writing them.
// TODO: Add extra validation where possible.
switch confId {
case CONF_PINECONE_IDENTITY:
if _, ok := confVal.(ed25519.PrivateKey); !ok {
@ -128,7 +70,11 @@ func (pm *pineconeManager) ConfSet(confId pineconeManagerConfOption, confVal any
if _, ok := confVal.(*log.Logger); !ok {
return invalidTypeErr
}
case CONF_INBOUND_ADDR, CONF_WEBSERVER_ADDR, CONF_WEBSERVER_DEBUG_PATH:
case CONF_INBOUND_ADDR:
if _, ok := confVal.(*net.TCPAddr); !ok {
return invalidTypeErr
}
case CONF_WEBSERVER_ADDR, CONF_WEBSERVER_DEBUG_PATH:
if _, ok := confVal.(string); !ok {
return invalidTypeErr
}
@ -146,115 +92,106 @@ func (pm *pineconeManager) ConfSet(confId pineconeManagerConfOption, confVal any
}
}
// Write the config
// Update the config.
pm.conf[confId] = confVal
return nil
}
func (pm *pineconeManager) ConfSetDefaults() {
_, randomPineconeIdentity, randomPineconeIdentityErr := ed25519.GenerateKey(nil)
if randomPineconeIdentityErr != nil {
panic(fmt.Errorf("fatal error while generating pinecone identity for pineconeManager defaults: %e", randomPineconeIdentityErr))
}
defaults := map[pineconeManagerConfOption]any{
CONF_PINECONE_IDENTITY: randomPineconeIdentity,
CONF_LOGGER: log.New(io.Discard, "", 0),
CONF_INBOUND_ADDR: ":0",
CONF_WEBSERVER_ADDR: ":0",
CONF_WEBSERVER_DEBUG_PATH: "",
CONF_USE_MULTICAST: false,
CONF_WRAPPED_PROTOS: []string{},
CONF_STATIC_PEERS: []string{},
CONF_WEBSERVER_HANDLERS: map[string]http.Handler{},
}
for key, value := range defaults {
err := pm.ConfSet(key, value)
if err != nil {
panic(fmt.Errorf("fatal error while setting pineconeManager defaults: %e", err))
}
}
}
// Start the pinecone manager as configured. This blocks while the
// manager is running but can be started in a goroutine.
func (pm *pineconeManager) Start() {
// Only execute this once at a time.
pm.startOnce.Do(func() {
// Reset startOnce when this function exits.
// Acknowledge the exit request that caused the manager to exit.
// This MUST be the first defer as that means it gets executed last.
defer func(pm *pineconeManager) {
pm.startOnce = sync.Once{}
pm.ackExit <- struct{}{}
}(pm)
// Set up the context to allow stopping the pinecone manager.
pm.ctxLock.Lock()
pm.ctx, pm.ctxCancel = context.WithCancel(context.Background())
pm.ctxLock.Unlock()
// Reset startOnce when this function exits.
defer func(pm *pineconeManager) {
pm.startOnce = misc.CheckableOnce{}
}(pm)
// Grab a snapshot of the config (this makes access to config
// values easier and ensures the config is never in an inconsistent
// state when values need to be read multiple times).
privkey := pm.ConfGet(CONF_PINECONE_IDENTITY).(ed25519.PrivateKey)
logger := pm.ConfGet(CONF_LOGGER).(*log.Logger)
inboundAddr := pm.ConfGet(CONF_INBOUND_ADDR).(*net.TCPAddr)
webserverAddr := pm.ConfGet(CONF_WEBSERVER_ADDR).(string)
webserverDebugPath := pm.ConfGet(CONF_WEBSERVER_DEBUG_PATH).(string)
useMulticast := pm.ConfGet(CONF_USE_MULTICAST).(bool)
protos := pm.ConfGet(CONF_WRAPPED_PROTOS).([]string)
staticPeers := pm.ConfGet(CONF_STATIC_PEERS).([]string)
webserverHandlers := pm.ConfGet(CONF_WEBSERVER_HANDLERS).(map[string]http.Handler)
// Keep track of any goroutines we start.
var wg sync.WaitGroup
// Create a context to kill any goroutines we start.
ctx, ctxCancel := context.WithCancel(context.Background())
//
// Main pinecone stuff
//
// Grab a snapshot of the config (this makes access to config
// values easier and ensures the config is never in an inconsistent
// state when values need to be read multiple times).
privkey := misc.NoError(pm.ConfGet(CONF_PINECONE_IDENTITY)).(ed25519.PrivateKey)
logger := misc.NoError(pm.ConfGet(CONF_LOGGER)).(*log.Logger)
inboundAddr := misc.NoError(pm.ConfGet(CONF_INBOUND_ADDR)).(string)
webserverAddr := misc.NoError(pm.ConfGet(CONF_WEBSERVER_ADDR)).(string)
webserverDebugPath := misc.NoError(pm.ConfGet(CONF_WEBSERVER_DEBUG_PATH)).(string)
useMulticast := misc.NoError(pm.ConfGet(CONF_USE_MULTICAST)).(bool)
protos := misc.NoError(pm.ConfGet(CONF_WRAPPED_PROTOS)).([]string)
staticPeers := misc.NoError(pm.ConfGet(CONF_STATIC_PEERS)).([]string)
webserverHandlers := misc.NoError(pm.ConfGet(CONF_WEBSERVER_HANDLERS)).(map[string]http.Handler)
// Set up pinecone components.
pRouter := pineconeRouter.NewRouter(logger, privkey, false)
pQUIC := pineconeSessions.NewSessions(logger, pRouter, protos)
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
if useMulticast {
wg.Add(1)
pMulticast.Start()
}
// Connect to any static peers we were given.
for _, peer := range staticPeers {
pManager.AddPeer(peer)
}
if inboundAddr != "" {
// Listen for inbound connections if a listener was configured.
if inboundAddr != nil {
wg.Add(1)
go func(ctx context.Context, wg sync.WaitGroup) {
listener, err := net.ListenTCP("tcp", inboundAddr)
if err != nil {
// TODO: Handle this?
panic(fmt.Errorf("error while setting up inbound pinecone listener: %e", err))
}
for ctx.Err() == nil {
// Don't block indefinitely on listener.Accept() so we can exit the
// goroutine when the context in cancelled.
listener.SetDeadline(time.Now().Add(time.Second))
// Accept incoming connections. In case of error, drop connection but
// otherwise ignore.
conn, err := listener.Accept()
if err != nil {
continue
}
// Set up pinecone over the newly created connection.
_, err = pRouter.Connect(
conn,
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
)
// If pinecone setup failed, drop the connection.
if err != nil {
conn.Close()
continue
}
}
wg.Done()
}(ctx, wg)
}
///////////////////////////////
go func() {
listener, err := net.Listen("tcp", *instanceListen)
if err != nil {
panic(err)
}
fmt.Println("Listening on", listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
logrus.WithError(err).Error("listener.Accept failed")
continue
}
port, err := pRouter.Connect(
conn,
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
)
if err != nil {
logrus.WithError(err).Error("pSwitch.Connect failed")
continue
}
fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port)
}
}()
wsUpgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
@ -262,7 +199,6 @@ func (pm *pineconeManager) Start() {
},
}
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix("/test").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "HeloWorld Function") })
httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
c, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
@ -278,7 +214,9 @@ func (pm *pineconeManager) Start() {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
}
})
httpRouter.HandleFunc("/pinecone", pRouter.ManholeHandler)
if webserverDebugPath != "" {
httpRouter.HandleFunc(webserverDebugPath, pRouter.ManholeHandler)
}
pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
pMux.PathPrefix("/ptest").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "HeloWorld Function") })
@ -300,27 +238,36 @@ func (pm *pineconeManager) Start() {
}
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
logrus.Fatal(httpServer.Serve(pQUIC.Protocol("matrix")))
httpServer.Serve(pQUIC.Protocol("matrix"))
}()
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
logrus.Info("Listening on ", httpBindAddr)
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
httpBindAddr := fmt.Sprintf(":%d", webserverAddr)
http.ListenAndServe(httpBindAddr, httpRouter)
}()
///////////////////////////////
// Wait until exit is requested.
for {
select {
case <-pm.ctx.Done():
case <-pm.reqExit:
break
}
}
// Kill all goroutines we spawned.
ctxCancel()
// Tear down pinecone.
if useMulticast {
pMulticast.Stop()
wg.Done()
}
pManager.RemovePeers()
pQUIC.Close()
pRouter.Close()
// Wait for all the goroutines we started to exit.
wg.Wait()
})
}
@ -330,21 +277,23 @@ func (pm *pineconeManager) Stop() {
pm.stopOnce.Do(func() {
// Reset stopOnce when this function exits.
defer func(pm *pineconeManager) {
pm.stopOnce = sync.Once{}
pm.stopOnce = misc.CheckableOnce{}
}(pm)
// Only actually do anything if the manager is running.
// Note: This does not guarantee that the context is not cancelled
// between the call to pm.IsRunning() and pm.ctxCancel(). A goroutine
// could cancel the context after we check, which theoretically creates
// a race condition. However, as a context CancelFunc is a no-op when
// called multiple times, this is okay. The main reason for this check
// is to prevent panics if the cancel func is nil which, it will be
// before the manager's first run. As long as we know the manager
// ran at some point (which this check guarantees), there won't be
// issues.
// Only actually do anything if the manager is running, otherwise
// we'll block forever because nothing would read from the channel.
//
// Theoretically the manager could exit after our check but before
// we write to the channel which causes a race and results in a
// deadlock. In practice this should be impossible as the manager
// can only exit when this function is called and only one of this
// function can run at a time. The guarantee could be made stronger
// with locks but this isn't really worth the added complexity.
if pm.IsRunning() {
pm.ctxCancel()
pm.reqExit <- struct{}{}
// Wait for the exit request to be acknowledged.
<-pm.ackExit
}
})
}
@ -355,7 +304,7 @@ func (pm *pineconeManager) Restart() {
pm.restartOnce.Do(func() {
// Reset restartOnce when this function exits.
defer func(pm *pineconeManager) {
pm.restartOnce = sync.Once{}
pm.restartOnce = misc.CheckableOnce{}
}(pm)
pm.Stop()
@ -365,22 +314,7 @@ func (pm *pineconeManager) Restart() {
// Check whether the pinecone manager is currently running.
func (pm *pineconeManager) IsRunning() bool {
// Make sure the context isn't modified while we're checking it.
defer pm.ctxLock.RUnlock()
pm.ctxLock.RLock()
// If the context is nil, we're definitely not running.
if pm.ctx == nil {
return false
}
// If the context is not nil, we need to check if context.Err()
// is nil to determine if the pm is running.
if pm.ctx.Err() != nil {
return false
}
return true
return pm.startOnce.Doing()
}
var initonce sync.Once
@ -393,10 +327,33 @@ func GetInstance() *pineconeManager {
initonce.Do(func() {
pineconeManagerInstance = &pineconeManager{}
// Generate some default options.
_, randomPineconeIdentity, randomPineconeIdentityErr := ed25519.GenerateKey(nil)
if randomPineconeIdentityErr != nil {
panic(fmt.Errorf("fatal error while generating pinecone identity for pineconeManager defaults: %e", randomPineconeIdentityErr))
}
defaults := [conf_option_count]any{
CONF_PINECONE_IDENTITY: randomPineconeIdentity,
CONF_LOGGER: log.New(io.Discard, "", 0),
CONF_INBOUND_ADDR: ":0",
CONF_WEBSERVER_ADDR: ":0",
CONF_WEBSERVER_DEBUG_PATH: "",
CONF_USE_MULTICAST: false,
CONF_WRAPPED_PROTOS: []string{},
CONF_STATIC_PEERS: []string{},
CONF_WEBSERVER_HANDLERS: map[string]http.Handler{},
}
// Set default config values to ensure that the config is never
// in an unusable state and allow for sane options without setting
// everything manually.
pineconeManagerInstance.ConfSetDefaults()
pineconeManagerInstance.conf = defaults
// Init communication channels.
pineconeManagerInstance.reqExit = make(chan struct{})
pineconeManagerInstance.ackExit = make(chan struct{})
})
return pineconeManagerInstance