Txq handling+exit handling&internal pm comms tweak

This commit is contained in:
0x1a8510f2 2022-10-06 05:18:45 +01:00
parent bcf9f648c1
commit 908beb5e38
Signed by: 0x1a8510f2
GPG Key ID: 1C692E355D76775D
4 changed files with 111 additions and 55 deletions

11
go.mod
View File

@ -1,11 +1,12 @@
module git.0x1a8510f2.space/wraith-labs/wraith-module-pinecomms
go 1.18
go 1.19
require (
github.com/cristalhq/jwt/v4 v4.0.2
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c
github.com/matrix-org/pinecone v0.0.0-20221003135901-d05e7fbb5d8f
)
require (
@ -20,10 +21,10 @@ require (
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/exp v0.0.0-20221002003631-540bb7301a08 // indirect
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b // indirect
golang.org/x/net v0.0.0-20221004154528-8021a29435af // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/tools v0.1.12 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

18
go.sum
View File

@ -23,6 +23,8 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/jwt/v4 v4.0.2 h1:g/AD3h0VicDamtlM70GWGElp8kssQEv+5wYd7L9WOhU=
github.com/cristalhq/jwt/v4 v4.0.2/go.mod h1:HnYraSNKDRag1DZP92rYHyrjyQHnVEHPNqesmzs+miQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -129,8 +131,8 @@ github.com/marten-seemann/qtls-go1-18 v0.1.2/go.mod h1:mJttiymBAByA49mhlNZZGrH5u
github.com/marten-seemann/qtls-go1-19 v0.1.0-beta.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
github.com/marten-seemann/qtls-go1-19 v0.1.0 h1:rLFKD/9mp/uq1SYGYuVZhm83wkmU95pK5df3GufyYYU=
github.com/marten-seemann/qtls-go1-19 v0.1.0/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c h1:iCHLYwwlPsf4TYFrvhKdhQoAM2lXzcmDZYqwBNWcnVk=
github.com/matrix-org/pinecone v0.0.0-20220929155234-2ce51dd4a42c/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/matrix-org/pinecone v0.0.0-20221003135901-d05e7fbb5d8f h1:iVDoBz5mVAqTetcIY0lxtDrYcHPgjZb30wlhptjRlvg=
github.com/matrix-org/pinecone v0.0.0-20221003135901-d05e7fbb5d8f/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattomatic/dijkstra v0.0.0-20130617153013-6f6d134eb237/go.mod h1:UOnLAUmVG5paym8pD3C4B9BQylUDC2vXFJJpT7JrlEA=
@ -230,12 +232,12 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A=
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20221002003631-540bb7301a08 h1:LtBIgSqNhkuC9gA3BFjGy5obHQT1lnmNsMDFSqWzQ5w=
golang.org/x/exp v0.0.0-20221002003631-540bb7301a08/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 h1:fGZugkZk2UgYBxtpKmvub51Yno1LJDeEsRp2xGD+0gY=
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@ -271,8 +273,8 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b h1:uKO3Js8lXGjpjdc4J3rqs0/Ex5yDKUGfk43tTYWVLas=
golang.org/x/net v0.0.0-20220930213112-107f3e3c3b0b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20221004154528-8021a29435af h1:wv66FM3rLZGPdxpYL+ApnDe2HzHcTFta3z5nsc13wI4=
golang.org/x/net v0.0.0-20221004154528-8021a29435af/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

View File

@ -9,11 +9,14 @@ import (
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"git.0x1a8510f2.space/wraith-labs/wraith-module-pinecomms/internal/misc"
"github.com/cristalhq/jwt/v4"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
pineconeConnections "github.com/matrix-org/pinecone/connections"
@ -31,8 +34,8 @@ type manager struct {
// Internal communication channels.
reqExit chan struct{}
ackExit chan struct{}
tx chan struct{}
rx chan struct{}
txq chan packet
rxq chan packet
// A struct of config options for the manager with a lock to make it thread-safe.
conf config
@ -48,17 +51,9 @@ func (pm *manager) Start() {
// Only execute this once at a time.
pm.startOnce.Do(func() {
// Acknowledge the exit request that caused the manager to exit.
// This MUST be the first defer as that means it gets executed last.
defer func() {
// Catch and re-raise panics as otherwise they could possibly
// block on the channel send below.
if err := recover(); err != nil {
panic(err)
}
pm.ackExit <- struct{}{}
}()
// Init some internal communication channels.
managerInstance.reqExit = make(chan struct{})
managerInstance.ackExit = make(chan struct{})
// Grab a snapshot of the config (this ensures the
// config is never in an inconsistent state when values
@ -72,7 +67,7 @@ func (pm *manager) Start() {
ctx, ctxCancel := context.WithCancel(context.Background())
//
// Main pinecone stuff
// Main pinecone stuff.
//
// Set up pinecone components.
@ -239,6 +234,52 @@ func (pm *manager) Start() {
}*/
}
// Manage tx queue.
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
// Prepare JWT signing helpers.
signer, err := jwt.NewSignerEdDSA(c.pineconeIdentity)
if err != nil {
panic(err)
}
builder := jwt.NewBuilder(signer)
for {
select {
case p := <-pm.txq:
// Build a JWT with the payload from the queue element.
claims := map[string]any{}
data, err := builder.Build(claims)
if err != nil {
pm.conf.logger.Printf("failed to build token for tx queue element due to error: %e", err)
continue
}
// Set up request to peer.
req := http.Request{
Method: p.Method,
URL: &url.URL{
Scheme: "http",
Host: p.Peer,
Path: "/" + fmt.Sprint(p.Route),
},
Cancel: ctx.Done(),
Body: io.NopCloser(strings.NewReader(data.String())),
}
// Send request to peer.
pHTTP.Client().Do(&req)
case <-ctx.Done():
// If the context is closed, exit.
return
}
}
}(ctx)
// Wait until exit is requested.
<-pm.reqExit
@ -266,6 +307,9 @@ func (pm *manager) Start() {
// Wait for all the goroutines we started to exit.
wg.Wait()
// Acknowledge the exit request.
close(pm.ackExit)
})
}
@ -288,7 +332,7 @@ func (pm *manager) Stop() {
// function can run at a time. The guarantee could be made stronger
// with locks but this isn't really worth the added complexity.
if pm.IsRunning() {
pm.reqExit <- struct{}{}
close(pm.reqExit)
// Wait for the exit request to be acknowledged.
<-pm.ackExit
@ -311,19 +355,28 @@ func (pm *manager) Restart() {
}
// Send a given packet to a specific peer.
func (pm *manager) Send(peer string, packet packetOuter) error {
return nil
func (pm *manager) Send(ctx context.Context, p packet) error {
select {
case pm.txq <- p:
return nil
case <-pm.ackExit:
return fmt.Errorf("manager exited while trying to send packet")
case <-ctx.Done():
return fmt.Errorf("context cancelled while trying to send packet (%e)", ctx.Err())
}
}
// Ping a specific peer.
func (pm *manager) Ping(peer string) error {
return nil
}
// Poll for incoming packets. Blocks until either a packet is received or
// Receive incoming packets. Blocks until either a packet is received or
// the provided context expires.
func (pm *manager) Poll(ctx context.Context) (packetInner, error) {
return packetInner{}, nil
func (pm *manager) Recv(ctx context.Context) (packet, error) {
select {
case p := <-pm.rxq:
return p, nil
case <-pm.ackExit:
return packet{}, fmt.Errorf("manager exited while trying to send packet")
case <-ctx.Done():
return packet{}, fmt.Errorf("context cancelled while trying to receive packet (%e)", ctx.Err())
}
}
// Check whether the pinecone manager is currently running.
@ -368,8 +421,8 @@ func GetInstance() *manager {
managerInstance.conf.configSnapshot = defaults
// Init communication channels.
managerInstance.reqExit = make(chan struct{})
managerInstance.ackExit = make(chan struct{})
managerInstance.txq = make(chan packet)
managerInstance.rxq = make(chan packet)
})
return managerInstance

View File

@ -31,24 +31,24 @@ const (
ROUTE_SEND
)
type packetOuter struct {
// The pinecone peer this packet came from or is going to.
Peer string
// The HTTP route this packet was received on or is being sent to.
Route route
// The HTTP method this packet was received or is to be sent with.
Method string
// The data included with the packet encoded as pmanager-flavoured JWT.
Data string
}
type packetInner struct {
type packetData struct {
// Wraith shm cells to read from.
R []string
// Wraith shm cells to write to and data to write.
W map[string]any
}
type packet struct {
// The pinecone peer this packet came from or is going to.
Peer string
// The HTTP method this packet was received or is to be sent with.
Method string
// The HTTP route this packet was received on or is being sent to.
Route route
// The data included with the packet encoded as pmanager-flavoured JWT.
Data packetData
}