cmd/tsrecorder: adds sending api level logging to tsrecorder (#16960)

Updates #17141

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
Tom Meadows
2025-10-08 15:15:12 +01:00
committed by GitHub
parent f25e47cdeb
commit cd2a3425cb
13 changed files with 1014 additions and 21 deletions

View File

@@ -6,10 +6,13 @@
package apiproxy
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
@@ -19,13 +22,16 @@ import (
"time"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"tailscale.com/client/local"
"tailscale.com/client/tailscale/apitype"
"tailscale.com/k8s-operator/sessionrecording"
ksr "tailscale.com/k8s-operator/sessionrecording"
"tailscale.com/kube/kubetypes"
"tailscale.com/net/netx"
"tailscale.com/sessionrecording"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/util/clientmetric"
@@ -83,12 +89,13 @@ func NewAPIServerProxy(zlog *zap.SugaredLogger, restConfig *rest.Config, ts *tsn
}
ap := &APIServerProxy{
log: zlog,
lc: lc,
authMode: mode == kubetypes.APIServerProxyModeAuth,
https: https,
upstreamURL: u,
ts: ts,
log: zlog,
lc: lc,
authMode: mode == kubetypes.APIServerProxyModeAuth,
https: https,
upstreamURL: u,
ts: ts,
sendEventFunc: sessionrecording.SendEvent,
}
ap.rp = &httputil.ReverseProxy{
Rewrite: func(pr *httputil.ProxyRequest) {
@@ -183,6 +190,8 @@ type APIServerProxy struct {
ts *tsnet.Server
hs *http.Server
upstreamURL *url.URL
sendEventFunc func(ap netip.AddrPort, event io.Reader, dial netx.DialFunc) error
}
// serveDefault is the default handler for Kubernetes API server requests.
@@ -192,7 +201,16 @@ func (ap *APIServerProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
ap.authError(w, err)
return
}
if err = ap.recordRequestAsEvent(r, who); err != nil {
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err)
ap.log.Errorf(msg)
http.Error(w, msg, http.StatusBadGateway)
return
}
counterNumRequestsProxied.Add(1)
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
}
@@ -220,7 +238,7 @@ func (ap *APIServerProxy) serveAttachWS(w http.ResponseWriter, r *http.Request)
ap.sessionForProto(w, r, ksr.AttachSessionType, ksr.WSProtocol)
}
func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request, sessionType sessionrecording.SessionType, proto ksr.Protocol) {
func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request, sessionType ksr.SessionType, proto ksr.Protocol) {
const (
podNameKey = "pod"
namespaceNameKey = "namespace"
@@ -232,6 +250,14 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
ap.authError(w, err)
return
}
if err = ap.recordRequestAsEvent(r, who); err != nil {
msg := fmt.Sprintf("error recording Kubernetes API request: %v", err)
ap.log.Errorf(msg)
http.Error(w, msg, http.StatusBadGateway)
return
}
counterNumRequestsProxied.Add(1)
failOpen, addrs, err := determineRecorderConfig(who)
if err != nil {
@@ -283,6 +309,107 @@ func (ap *APIServerProxy) sessionForProto(w http.ResponseWriter, r *http.Request
ap.rp.ServeHTTP(h, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
}
func (ap *APIServerProxy) recordRequestAsEvent(req *http.Request, who *apitype.WhoIsResponse) error {
failOpen, addrs, err := determineRecorderConfig(who)
if err != nil {
return fmt.Errorf("error trying to determine whether the kubernetes api request needs to be recorded: %w", err)
}
if len(addrs) == 0 {
if failOpen {
return nil
} else {
return fmt.Errorf("forbidden: kubernetes api request must be recorded, but no recorders are available")
}
}
factory := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
reqInfo, err := factory.NewRequestInfo(req)
if err != nil {
return fmt.Errorf("error parsing request %s %s: %w", req.Method, req.URL.Path, err)
}
kubeReqInfo := sessionrecording.KubernetesRequestInfo{
IsResourceRequest: reqInfo.IsResourceRequest,
Path: reqInfo.Path,
Verb: reqInfo.Verb,
APIPrefix: reqInfo.APIPrefix,
APIGroup: reqInfo.APIGroup,
APIVersion: reqInfo.APIVersion,
Namespace: reqInfo.Namespace,
Resource: reqInfo.Resource,
Subresource: reqInfo.Subresource,
Name: reqInfo.Name,
Parts: reqInfo.Parts,
FieldSelector: reqInfo.FieldSelector,
LabelSelector: reqInfo.LabelSelector,
}
event := &sessionrecording.Event{
Timestamp: time.Now().Unix(),
Kubernetes: kubeReqInfo,
Type: sessionrecording.KubernetesAPIEventType,
UserAgent: req.UserAgent(),
Request: sessionrecording.Request{
Method: req.Method,
Path: req.URL.String(),
QueryParameters: req.URL.Query(),
},
Source: sessionrecording.Source{
NodeID: who.Node.StableID,
Node: strings.TrimSuffix(who.Node.Name, "."),
},
}
if !who.Node.IsTagged() {
event.Source.NodeUser = who.UserProfile.LoginName
event.Source.NodeUserID = who.UserProfile.ID
} else {
event.Source.NodeTags = who.Node.Tags
}
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return fmt.Errorf("failed to read body: %w", err)
}
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
event.Request.Body = bodyBytes
var errs []error
// TODO: ChaosInTheCRD ensure that if there are multiple addrs timing out we don't experience slowdown on client waiting for response.
fail := true
for _, addr := range addrs {
data := new(bytes.Buffer)
if err := json.NewEncoder(data).Encode(event); err != nil {
return fmt.Errorf("error marshaling request event: %w", err)
}
if err := ap.sendEventFunc(addr, data, ap.ts.Dial); err != nil {
if apiSupportErr, ok := err.(sessionrecording.EventAPINotSupportedErr); ok {
ap.log.Warnf(apiSupportErr.Error())
fail = false
} else {
err := fmt.Errorf("error sending event to recorder with address %q: %v", addr.String(), err)
errs = append(errs, err)
}
} else {
return nil
}
}
merr := errors.Join(errs...)
if fail && failOpen {
msg := fmt.Sprintf("[unexpected] failed to send event to recorders with errors: %s", merr.Error())
msg = msg + "; failure mode is 'fail open'; continuing request without recording."
ap.log.Warn(msg)
return nil
}
return merr
}
func (ap *APIServerProxy) addImpersonationHeadersAsRequired(r *http.Request) {
r.URL.Scheme = ap.upstreamURL.Scheme
r.URL.Host = ap.upstreamURL.Host