chore: switch to in-mem sqlite over custom state
This commit is contained in:
parent
e0d92f4b55
commit
1d8ca57faf
|
@ -1,67 +0,0 @@
|
|||
package main
|
||||
|
||||
/*func handleClients(r *http.Request, w http.ResponseWriter, s *state) {
|
||||
// Pull necessary information out of the request.
|
||||
// Get the data from the request body.
|
||||
reqbody, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse the request body.
|
||||
reqdata := clientsRequest{}
|
||||
err = json.Unmarshal(reqbody, &reqdata)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure the requested page does not exceed the max.
|
||||
if reqdata.Limit > MAX_DATA_PAGE_SIZE {
|
||||
reqdata.Limit = MAX_DATA_PAGE_SIZE
|
||||
}
|
||||
|
||||
// Collect necessary information.
|
||||
clients, totalClients := s.GetClients(reqdata.Offset, reqdata.Limit)
|
||||
|
||||
// Build response data.
|
||||
data, err := json.Marshal(map[string]any{
|
||||
"clients": clients,
|
||||
"total": totalClients,
|
||||
})
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error while generating `clients` API response: %v", err))
|
||||
}
|
||||
|
||||
// Send!
|
||||
w.Write(data)
|
||||
}
|
||||
|
||||
func handleAbout(w http.ResponseWriter) {
|
||||
// Collect necessary information.
|
||||
buildinfo, _ := debug.ReadBuildInfo()
|
||||
|
||||
currentUser, _ := user.Current()
|
||||
binaryPath, _ := os.Executable()
|
||||
systemInfo := map[string]string{
|
||||
"os": runtime.GOOS,
|
||||
"arch": runtime.GOARCH,
|
||||
"currentTime": time.Now().String(),
|
||||
"binaryPath": binaryPath,
|
||||
"runningUserName": currentUser.Username,
|
||||
"runningUserId": currentUser.Uid,
|
||||
}
|
||||
|
||||
// Build response data.
|
||||
data, err := json.Marshal(map[string]any{
|
||||
"build": buildinfo,
|
||||
"system": systemInfo,
|
||||
})
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error while generating `about` API response: %v", err))
|
||||
}
|
||||
|
||||
// Send!
|
||||
w.Write(data)
|
||||
}*/
|
|
@ -6,135 +6,82 @@ import (
|
|||
|
||||
"dev.l1qu1d.net/wraith-labs/wraith-module-pinecomms/internal/proto"
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
func MkState() *state {
|
||||
s := state{
|
||||
clients: clientList{
|
||||
clients: map[string]*client{},
|
||||
},
|
||||
requests: map[string]request{},
|
||||
}
|
||||
return &s
|
||||
type state struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
type client struct {
|
||||
Address string `json:"address"`
|
||||
LastHeartbeatTime time.Time `json:"lastHeartbeatTime"`
|
||||
LastHeartbeat proto.PacketHeartbeat `json:"lastHeartbeat"`
|
||||
ID string `gorm:"primaryKey"`
|
||||
Address string `gorm:"index;not null;unique"`
|
||||
|
||||
prev *client
|
||||
next *client
|
||||
}
|
||||
|
||||
// This data structure allows for storage of clients while allowing for efficient ordering
|
||||
// and therefore pagination, deletion and addition of clients, and accessing a client by ID.
|
||||
type clientList struct {
|
||||
head *client
|
||||
tail *client
|
||||
clients map[string]*client
|
||||
}
|
||||
|
||||
func (l *clientList) Append(id string, c client) {
|
||||
c.prev, c.next = nil, nil
|
||||
if _, exists := l.clients[id]; !exists {
|
||||
if l.head == nil {
|
||||
l.head = &c
|
||||
}
|
||||
if l.tail != nil {
|
||||
l.tail.next = &c
|
||||
c.prev = l.tail
|
||||
}
|
||||
l.tail = &c
|
||||
}
|
||||
l.clients[id] = &c
|
||||
}
|
||||
|
||||
func (l *clientList) Delete(id string) {
|
||||
c, ok := l.clients[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if c.prev == nil {
|
||||
// This is the first element. Make the next one first.
|
||||
l.head = c.next
|
||||
} else {
|
||||
c.prev.next = c.next
|
||||
}
|
||||
|
||||
if c.next == nil {
|
||||
// This is the last element. Make the previous one last.
|
||||
l.tail = c.prev
|
||||
} else {
|
||||
c.next.prev = c.prev
|
||||
}
|
||||
|
||||
delete(l.clients, id)
|
||||
}
|
||||
|
||||
func (l *clientList) Get(id string) (*client, bool) {
|
||||
c, ok := l.clients[id]
|
||||
return c, ok
|
||||
}
|
||||
|
||||
func (l *clientList) GetPage(offset, limit int) ([]*client, int) {
|
||||
if totalClients := len(l.clients); offset > totalClients {
|
||||
return []*client{}, totalClients
|
||||
}
|
||||
|
||||
// If the remainder of the client list after the offset is
|
||||
// smaller than the limit, reduce the limit to the size of that
|
||||
// remainder to avoid nulls in the returned data.
|
||||
if maxLimit := len(l.clients) - offset; maxLimit < limit {
|
||||
limit = maxLimit
|
||||
}
|
||||
|
||||
page := make([]*client, limit)
|
||||
current := l.head
|
||||
for i := 0; i < offset+limit; i++ {
|
||||
if current == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if i >= offset && i < offset+limit {
|
||||
page[i-offset] = current
|
||||
}
|
||||
|
||||
current = current.next
|
||||
}
|
||||
return page, len(l.clients)
|
||||
FirstHeartbeatTime time.Time `gorm:"not null"`
|
||||
LastHeartbeatTime time.Time `gorm:"not null"`
|
||||
LastHeartbeat proto.PacketHeartbeat `gorm:"not null;serializer:json;type:json"`
|
||||
}
|
||||
|
||||
type request struct {
|
||||
target string
|
||||
TxId string `gorm:"primaryKey"`
|
||||
Target string `gorm:"index;not null;unique"`
|
||||
|
||||
requestTime time.Time
|
||||
request proto.PacketReq
|
||||
RequestTime time.Time `gorm:"not null"`
|
||||
Request proto.PacketReq `gorm:"not null;serializer:json;type:json"`
|
||||
|
||||
responseTime time.Time
|
||||
response proto.PacketRes
|
||||
ResponseTime time.Time
|
||||
Response proto.PacketRes `gorm:"serializer:json;type:json"`
|
||||
}
|
||||
|
||||
type state struct {
|
||||
// List of "connected" Wraith clients.
|
||||
clients clientList
|
||||
clientsMutex sync.RWMutex
|
||||
func MkState() *state {
|
||||
//db, err := gorm.Open(sqlite.Open("file::memory:"), &gorm.Config{})
|
||||
db, err := gorm.Open(sqlite.Open("./test.db"), &gorm.Config{})
|
||||
if err != nil {
|
||||
panic("failed to open memory db")
|
||||
}
|
||||
|
||||
// List of request/response pairs.
|
||||
requests map[string]request
|
||||
requestsMutex sync.RWMutex
|
||||
db.AutoMigrate(&client{}, &request{})
|
||||
|
||||
return &state{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *state) ClientAppend(c *client) error {
|
||||
result := s.db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "address"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"last_heartbeat_time", "last_heartbeat"}),
|
||||
}).Create(c)
|
||||
return result.Error
|
||||
}
|
||||
|
||||
func (s *state) ClientDelete(c *client) error {
|
||||
result := s.db.Delete(c)
|
||||
return result.Error
|
||||
}
|
||||
|
||||
func (s *state) ClientGet(id string) (client, error) {
|
||||
c := client{}
|
||||
result := s.db.Take(&c, id)
|
||||
return c, result.Error
|
||||
}
|
||||
|
||||
func (s *state) ClientGetPage(offset, limit int) ([]client, error) {
|
||||
page := make([]client, limit)
|
||||
result := s.db.Order("first_heartbeat_time ASC").Find(&page)
|
||||
return page, result.Error
|
||||
}
|
||||
|
||||
// Save/update a Wraith client entry.
|
||||
func (s *state) Heartbeat(src string, hb proto.PacketHeartbeat) {
|
||||
s.clientsMutex.Lock()
|
||||
defer s.clientsMutex.Unlock()
|
||||
|
||||
s.clients.Append(src, client{
|
||||
Address: src,
|
||||
LastHeartbeatTime: time.Now(),
|
||||
LastHeartbeat: hb,
|
||||
s.ClientAppend(&client{
|
||||
ID: uuid.NewString(),
|
||||
Address: src,
|
||||
FirstHeartbeatTime: time.Now(),
|
||||
LastHeartbeatTime: time.Now(),
|
||||
LastHeartbeat: hb,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -143,28 +90,26 @@ func (s *state) Request(dst string, req proto.PacketReq) proto.PacketReq {
|
|||
reqTxId := uuid.NewString()
|
||||
req.TxId = reqTxId
|
||||
|
||||
s.requestsMutex.Lock()
|
||||
defer s.requestsMutex.Unlock()
|
||||
|
||||
s.requests[reqTxId] = request{
|
||||
target: dst,
|
||||
requestTime: time.Now(),
|
||||
request: req,
|
||||
}
|
||||
s.db.Create(&request{
|
||||
TxId: reqTxId,
|
||||
Target: dst,
|
||||
RequestTime: time.Now(),
|
||||
Request: req,
|
||||
})
|
||||
|
||||
return req
|
||||
}
|
||||
|
||||
// Save a response to a request.
|
||||
func (s *state) Response(src string, res proto.PacketRes) {
|
||||
s.requestsMutex.Lock()
|
||||
defer s.requestsMutex.Unlock()
|
||||
|
||||
if req, ok := s.requests[res.TxId]; ok && src == req.target && req.responseTime.IsZero() {
|
||||
req.responseTime = time.Now()
|
||||
req.response = res
|
||||
s.requests[res.TxId] = req
|
||||
func (s *state) Response(src string, res proto.PacketRes) error {
|
||||
req := request{}
|
||||
result := s.db.First(&req, res.TxId)
|
||||
if result.Error == nil && src == req.Target && req.ResponseTime.IsZero() {
|
||||
req.ResponseTime = time.Now()
|
||||
req.Response = res
|
||||
s.db.Save(req)
|
||||
}
|
||||
return result.Error
|
||||
}
|
||||
|
||||
// Expire timed-out entries in the state.
|
||||
|
@ -175,35 +120,14 @@ func (s *state) Prune() {
|
|||
// Clean up expired client heartbeats.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.clientsMutex.Lock()
|
||||
defer s.clientsMutex.Unlock()
|
||||
|
||||
for id, c := range s.clients.clients {
|
||||
if time.Since(c.LastHeartbeatTime) > proto.HEARTBEAT_MARK_DEAD_DELAY {
|
||||
s.clients.Delete(id)
|
||||
}
|
||||
}
|
||||
s.db.Where("last_heartbeat_time <= ?", time.Now().Add(-1*STATE_CLIENT_EXPIRY_DELAY)).Delete(&client{})
|
||||
}()
|
||||
|
||||
// Clean up expired request-response pairs.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.requestsMutex.Lock()
|
||||
defer s.requestsMutex.Unlock()
|
||||
|
||||
for id, r := range s.requests {
|
||||
if time.Since(r.requestTime) > STATE_REQUEST_EXPIRY_DELAY {
|
||||
delete(s.requests, id)
|
||||
}
|
||||
}
|
||||
s.db.Where("request_time <= ?", time.Now().Add(-1*STATE_REQUEST_EXPIRY_DELAY)).Delete(&request{})
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *state) GetClients(offset, limit int) ([]*client, int) {
|
||||
s.clientsMutex.Lock()
|
||||
defer s.clientsMutex.Unlock()
|
||||
|
||||
return s.clients.GetPage(offset, limit)
|
||||
}
|
||||
|
|
3
cmd/pc3/snippets/screenshot.go
Normal file
3
cmd/pc3/snippets/screenshot.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package snippets
|
||||
|
||||
const SCREENSHOT = ""
|
4
go.mod
4
go.mod
|
@ -11,6 +11,8 @@ require (
|
|||
github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a
|
||||
github.com/mattn/go-sqlite3 v1.14.17
|
||||
github.com/traefik/yaegi v0.15.1
|
||||
gorm.io/driver/sqlite v1.5.2
|
||||
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55
|
||||
maunium.net/go/mautrix v0.15.2
|
||||
)
|
||||
|
||||
|
@ -19,6 +21,8 @@ require (
|
|||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
github.com/golang/mock v1.6.0 // indirect
|
||||
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.5 // indirect
|
||||
github.com/klauspost/compress v1.16.5 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -47,6 +47,10 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
|
|||
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
|
@ -163,6 +167,10 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
|||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gorm.io/driver/sqlite v1.5.2 h1:TpQ+/dqCY4uCigCFyrfnrJnrW9zjpelWVoEVNy5qJkc=
|
||||
gorm.io/driver/sqlite v1.5.2/go.mod h1:qxAuCol+2r6PannQDpOP1FP6ag3mKi4esLnB/jHed+4=
|
||||
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55 h1:sC1Xj4TYrLqg1n3AN10w871An7wJM0gzgcm8jkIkECQ=
|
||||
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
|
||||
maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8=
|
||||
maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho=
|
||||
maunium.net/go/mautrix v0.15.2 h1:fUiVajeoOR92uJoSShHbCvh7uG6lDY4ZO4Mvt90LbjU=
|
||||
|
|
Loading…
Reference in New Issue
Block a user