mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-13 22:47:30 +00:00
cmd/tsrecorder: adds sending api level logging to tsrecorder
Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
@@ -6,8 +6,10 @@
|
||||
package apiproxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -19,6 +21,8 @@ import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
"tailscale.com/client/local"
|
||||
@@ -185,6 +189,105 @@ type APIServerProxy struct {
|
||||
upstreamURL *url.URL
|
||||
}
|
||||
|
||||
func (ap *APIServerProxy) processRequest(ctx context.Context, who *apitype.WhoIsResponse, req *http.Request) error {
|
||||
factory := &request.RequestInfoFactory{
|
||||
APIPrefixes: sets.NewString("api", "apis"),
|
||||
GrouplessAPIPrefixes: sets.NewString("api"),
|
||||
}
|
||||
|
||||
reqInfo, err := factory.NewRequestInfo(req)
|
||||
if err != nil {
|
||||
ap.log.Errorf("Error parsing request %s %s: %v\n", req.Method, req.URL.Path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
failOpen, addrs, err := determineRecorderConfig(who)
|
||||
if err != nil {
|
||||
ap.log.Errorf("error trying to determine whether the 'kubectl' session needs to be recorded: %v", err)
|
||||
return err
|
||||
}
|
||||
if failOpen && len(addrs) == 0 { // will not send event
|
||||
return err
|
||||
}
|
||||
|
||||
if !failOpen && len(addrs) == 0 {
|
||||
ap.log.Errorf("forbidden: 'kubectl' event must be recorded, but no recorders are available.")
|
||||
return err
|
||||
}
|
||||
|
||||
event := &APIRequest{
|
||||
Timestamp: time.Now(),
|
||||
IsResourceRequest: reqInfo.IsResourceRequest,
|
||||
Verb: reqInfo.Verb,
|
||||
APIGroup: reqInfo.APIGroup,
|
||||
Resource: reqInfo.Resource,
|
||||
Subresource: reqInfo.Subresource,
|
||||
Namespace: reqInfo.Namespace,
|
||||
Name: reqInfo.Name,
|
||||
Method: req.Method,
|
||||
Path: reqInfo.Path,
|
||||
User: who.UserProfile.LoginName,
|
||||
}
|
||||
|
||||
for _, ad := range addrs {
|
||||
eventJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
ap.log.Errorf("Error marshaling request event: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
data := bytes.NewBuffer(eventJSON)
|
||||
|
||||
ap.log.Infof("sending data: %s", data.String())
|
||||
|
||||
// NOTE: was erroring out for ipv6 so just added this here for now
|
||||
if ad.Addr().Is4() {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s/event", ad), bytes.NewBuffer(eventJSON))
|
||||
if err != nil {
|
||||
ap.log.Warnf("Error creating request: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
ap.log.Errorf("Error sending request: %v", err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err := fmt.Errorf("server returned non-OK status: %s", resp.Status)
|
||||
ap.log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// APIRequest represents the relevant information from a Kubernetes API request
|
||||
// for logging purposes.
|
||||
type APIRequest struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Method string `json:"method"`
|
||||
Path string `json:"path"`
|
||||
IsResourceRequest bool `json:"isResourceRequest"`
|
||||
Verb string `json:"verb"`
|
||||
APIGroup string `json:"apiGroup"`
|
||||
APIVersion string `json:"apiVersion"`
|
||||
Resource string `json:"resource"`
|
||||
Subresource string `json:"subresource"`
|
||||
Namespace string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
User string `json:"user"`
|
||||
}
|
||||
|
||||
// serveDefault is the default handler for Kubernetes API server requests.
|
||||
func (ap *APIServerProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
|
||||
who, err := ap.whoIs(r)
|
||||
@@ -192,7 +295,14 @@ func (ap *APIServerProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
|
||||
ap.authError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
counterNumRequestsProxied.Add(1)
|
||||
|
||||
err = ap.processRequest(r.Context(), who, r)
|
||||
if err != nil {
|
||||
ap.log.Errorf("failed to process request: %v", err)
|
||||
}
|
||||
|
||||
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
|
||||
}
|
||||
|
||||
@@ -232,6 +342,12 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
|
||||
ap.authError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = ap.processRequest(r.Context(), who, r)
|
||||
if err != nil {
|
||||
ap.log.Errorf("failed to process request: %v", err)
|
||||
}
|
||||
|
||||
counterNumRequestsProxied.Add(1)
|
||||
failOpen, addrs, err := determineRecorderConfig(who)
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user