Misc work on pineconemanager

This commit is contained in:
0x1a8510f2 2022-06-13 02:43:52 +01:00
parent a355d77829
commit 5b2b380749
Signed by: 0x1a8510f2
GPG Key ID: 1C692E355D76775D
3 changed files with 244 additions and 2 deletions

10
internal/misc/noerror.go Normal file
View File

@ -0,0 +1,10 @@
package misc
// Simple function which gets rid of error return values from functions when
// we know they definitely won't error. If they error anyway, panic.
func NoError[T any](value T, err error) T {
if err != nil {
panic("cannot discard non-nil error")
}
return value
}

View File

@ -3,11 +3,24 @@ package pineconemanager
import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
"time"
"git.0x1a8510f2.space/wraith-labs/wraith-module-pinecomms/internal/misc"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/sirupsen/logrus"
)
type pineconeManagerConfOption int
@ -184,8 +197,119 @@ func (pm *pineconeManager) Start() {
// Main pinecone stuff
//
logger, _ := pm.ConfGet(CONF_LOGGER)
privkey, _ := pm.ConfGet(CONF_PINECONE_IDENTITY)
// Grab a snapshot of the config (this makes access to config
// values easier and ensures the config is never in an inconsistent
// state when values need to be read multiple times).
privkey := misc.NoError(pm.ConfGet(CONF_PINECONE_IDENTITY)).(ed25519.PrivateKey)
logger := misc.NoError(pm.ConfGet(CONF_LOGGER)).(*log.Logger)
inboundAddr := misc.NoError(pm.ConfGet(CONF_INBOUND_ADDR)).(string)
webserverAddr := misc.NoError(pm.ConfGet(CONF_WEBSERVER_ADDR)).(string)
webserverDebugPath := misc.NoError(pm.ConfGet(CONF_WEBSERVER_DEBUG_PATH)).(string)
useMulticast := misc.NoError(pm.ConfGet(CONF_USE_MULTICAST)).(bool)
protos := misc.NoError(pm.ConfGet(CONF_WRAPPED_PROTOS)).([]string)
staticPeers := misc.NoError(pm.ConfGet(CONF_STATIC_PEERS)).([]string)
webserverHandlers := misc.NoError(pm.ConfGet(CONF_WEBSERVER_HANDLERS)).(map[string]http.Handler)
pRouter := pineconeRouter.NewRouter(logger, privkey, false)
pQUIC := pineconeSessions.NewSessions(logger, pRouter, protos)
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
if useMulticast {
pMulticast.Start()
}
for _, peer := range staticPeers {
pManager.AddPeer(peer)
}
if inboundAddr != "" {
}
///////////////////////////////
go func() {
listener, err := net.Listen("tcp", *instanceListen)
if err != nil {
panic(err)
}
fmt.Println("Listening on", listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
logrus.WithError(err).Error("listener.Accept failed")
continue
}
port, err := pRouter.Connect(
conn,
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
)
if err != nil {
logrus.WithError(err).Error("pSwitch.Connect failed")
continue
}
fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port)
}
}()
wsUpgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix("/test").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "HeloWorld Function") })
httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
c, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
return
}
conn := wrapWebSocketConn(c)
if _, err = pRouter.Connect(
conn,
pineconeRouter.ConnectionZone("websocket"),
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
}
})
httpRouter.HandleFunc("/pinecone", pRouter.ManholeHandler)
pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
pMux.PathPrefix("/ptest").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "HeloWorld Function") })
pHTTP := pQUIC.Protocol("matrix").HTTP()
pHTTP.Mux().Handle("/ptest", pMux)
// Build both ends of a HTTP multiplex.
httpServer := &http.Server{
Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return context.Background()
},
Handler: pMux,
}
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
logrus.Fatal(httpServer.Serve(pQUIC.Protocol("matrix")))
}()
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
logrus.Info("Listening on ", httpBindAddr)
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
}()
///////////////////////////////
for {
select {
@ -193,6 +317,10 @@ func (pm *pineconeManager) Start() {
break
}
}
if useMulticast {
pMulticast.Stop()
}
})
}

View File

@ -0,0 +1,104 @@
package pineconemanager
/*
Stolen from https://github.com/matrix-org/pinecone/blob/c05f24e907e9eb0f84384bfa226dad174f5ea2ad/util/websocket.go and hence:
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Modifications:
- Change wrapWebSocketConn and webSocketConn to private
*/
import (
"io"
"net"
"time"
"github.com/gorilla/websocket"
)
func wrapWebSocketConn(c *websocket.Conn) *webSocketConn {
return &webSocketConn{c: c}
}
type webSocketConn struct {
r io.Reader
c *websocket.Conn
}
func (c *webSocketConn) Write(p []byte) (int, error) {
err := c.c.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}
func (c *webSocketConn) Read(p []byte) (int, error) {
for {
if c.r == nil {
// Advance to next message.
var err error
_, c.r, err = c.c.NextReader()
if err != nil {
return 0, err
}
}
n, err := c.r.Read(p)
if err == io.EOF {
// At end of message.
c.r = nil
if n > 0 {
return n, nil
} else {
// No data read, continue to next message.
continue
}
}
return n, err
}
}
func (c *webSocketConn) Close() error {
return c.c.Close()
}
func (c *webSocketConn) LocalAddr() net.Addr {
return c.c.LocalAddr()
}
func (c *webSocketConn) RemoteAddr() net.Addr {
return c.c.RemoteAddr()
}
func (c *webSocketConn) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
if err := c.SetWriteDeadline(t); err != nil {
return err
}
return nil
}
func (c *webSocketConn) SetReadDeadline(t time.Time) error {
return c.c.SetReadDeadline(t)
}
func (c *webSocketConn) SetWriteDeadline(t time.Time) error {
return c.c.SetWriteDeadline(t)
}