2025-02-20 10:50:25 -08:00

149 lines
3.8 KiB
Go

package tsconsensus
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
)
type joinRequest struct {
RemoteHost string `json:'remoteAddr'`
RemoteID string `json:'remoteID'`
}
type commandClient struct {
port uint16
httpClient *http.Client
}
func (rac *commandClient) Url(host string, path string) string {
return fmt.Sprintf("http://%s:%d%s", host, rac.port, path)
}
func (rac *commandClient) Join(host string, jr joinRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
rBs, err := json.Marshal(jr)
if err != nil {
return err
}
url := rac.Url(host, "/join")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(rBs))
if err != nil {
return err
}
resp, err := rac.httpClient.Do(req)
if err != nil {
return err
}
respBs, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return errors.New(fmt.Sprintf("remote responded %d: %s", resp.StatusCode, string(respBs)))
}
return nil
}
func (rac *commandClient) ExecuteCommand(host string, bs []byte) (CommandResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
url := rac.Url(host, "/executeCommand")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bs))
if err != nil {
return CommandResult{}, err
}
resp, err := rac.httpClient.Do(req)
if err != nil {
return CommandResult{}, err
}
respBs, err := io.ReadAll(resp.Body)
if err != nil {
return CommandResult{}, err
}
if resp.StatusCode != 200 {
return CommandResult{}, errors.New(fmt.Sprintf("remote responded %d: %s", resp.StatusCode, string(respBs)))
}
var cr CommandResult
if err = json.Unmarshal(respBs, &cr); err != nil {
return CommandResult{}, err
}
return cr, nil
}
func authorized(auth *authorization, fx func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
err := auth.refresh(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
a, err := addrFromServerAddress(r.RemoteAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
allowed := auth.allowsHost(a)
if !allowed {
http.Error(w, "peer not allowed", http.StatusBadRequest)
return
}
fx(w, r)
}
}
func (c *Consensus) makeCommandMux(auth *authorization) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/join", authorized(auth, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
decoder := json.NewDecoder(r.Body)
var jr joinRequest
err := decoder.Decode(&jr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if jr.RemoteHost == "" {
http.Error(w, "Required: remoteAddr", http.StatusBadRequest)
return
}
if jr.RemoteID == "" {
http.Error(w, "Required: remoteID", http.StatusBadRequest)
return
}
err = c.handleJoin(jr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}))
mux.HandleFunc("/executeCommand", authorized(auth, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
decoder := json.NewDecoder(r.Body)
var cmd Command
err := decoder.Decode(&cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result, err := c.executeCommandLocally(cmd)
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}))
return mux
}