mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-11 05:07:33 +00:00
cmd/k8s-operator,ssh/tailssh,tsnet: optionally record 'kubectl exec' sessions via Kubernetes operator's API server proxy (#12274)
cmd/k8s-operator,ssh/tailssh,tsnet: optionally record kubectl exec sessions The Kubernetes operator's API server proxy, when it receives a request for 'kubectl exec' session now reads 'RecorderAddrs', 'EnforceRecorder' fields from tailcfg.KubernetesCapRule. If 'RecorderAddrs' is set to one or more addresses (of a tsrecorder instance(s)), it attempts to connect to those and sends the session contents to the recorder before forwarding the request to the kube API server. If connection cannot be established or fails midway, it is only allowed if 'EnforceRecorder' is not true (fail open). Updates tailscale/corp#19821 Signed-off-by: Irbe Krumina <irbe@tailscale.com> Co-authored-by: Maisem Ali <maisem@tailscale.com>
This commit is contained in:
@@ -17,7 +17,6 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptrace"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -45,7 +44,6 @@ import (
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/httpm"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/util/multierr"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -1485,128 +1483,6 @@ type CastHeader struct {
|
||||
ConnectionID string `json:"connectionID"`
|
||||
}
|
||||
|
||||
// sessionRecordingClient returns an http.Client that uses srv.lb.Dialer() to
|
||||
// dial connections. This is used to make requests to the session recording
|
||||
// server to upload session recordings.
|
||||
// It uses the provided dialCtx to dial connections, and limits a single dial
|
||||
// to 5 seconds.
|
||||
func (ss *sshSession) sessionRecordingClient(dialCtx context.Context) (*http.Client, error) {
|
||||
dialer := ss.conn.srv.lb.Dialer()
|
||||
if dialer == nil {
|
||||
return nil, errors.New("no peer API transport")
|
||||
}
|
||||
tr := dialer.PeerAPITransport().Clone()
|
||||
dialContextFn := tr.DialContext
|
||||
|
||||
tr.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
perAttemptCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-perAttemptCtx.Done():
|
||||
case <-dialCtx.Done():
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
return dialContextFn(perAttemptCtx, network, addr)
|
||||
}
|
||||
return &http.Client{
|
||||
Transport: tr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// connectToRecorder connects to the recorder at any of the provided addresses.
|
||||
// It returns the first successful response, or a multierr if all attempts fail.
|
||||
//
|
||||
// On success, it returns a WriteCloser that can be used to upload the
|
||||
// recording, and a channel that will be sent an error (or nil) when the upload
|
||||
// fails or completes.
|
||||
//
|
||||
// In both cases, a slice of SSHRecordingAttempts is returned which detail the
|
||||
// attempted recorder IP and the error message, if the attempt failed. The
|
||||
// attempts are in order the recorder(s) was attempted. If successful a
|
||||
// successful connection is made, the last attempt in the slice is the
|
||||
// attempt for connected recorder.
|
||||
func (ss *sshSession) connectToRecorder(ctx context.Context, recs []netip.AddrPort) (io.WriteCloser, []*tailcfg.SSHRecordingAttempt, <-chan error, error) {
|
||||
if len(recs) == 0 {
|
||||
return nil, nil, nil, errors.New("no recorders configured")
|
||||
}
|
||||
// We use a special context for dialing the recorder, so that we can
|
||||
// limit the time we spend dialing to 30 seconds and still have an
|
||||
// unbounded context for the upload.
|
||||
dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer dialCancel()
|
||||
hc, err := ss.sessionRecordingClient(dialCtx)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
var errs []error
|
||||
var attempts []*tailcfg.SSHRecordingAttempt
|
||||
for _, ap := range recs {
|
||||
attempt := &tailcfg.SSHRecordingAttempt{
|
||||
Recorder: ap,
|
||||
}
|
||||
attempts = append(attempts, attempt)
|
||||
|
||||
// We dial the recorder and wait for it to send a 100-continue
|
||||
// response before returning from this function. This ensures that
|
||||
// the recorder is ready to accept the recording.
|
||||
|
||||
// got100 is closed when we receive the 100-continue response.
|
||||
got100 := make(chan struct{})
|
||||
ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
|
||||
Got100Continue: func() {
|
||||
close(got100)
|
||||
},
|
||||
})
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s:%d/record", ap.Addr(), ap.Port()), pr)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("recording: error starting recording: %w", err)
|
||||
attempt.FailureMessage = err.Error()
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
// We set the Expect header to 100-continue, so that the recorder
|
||||
// will send a 100-continue response before it starts reading the
|
||||
// request body.
|
||||
req.Header.Set("Expect", "100-continue")
|
||||
|
||||
// errChan is used to indicate the result of the request.
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("recording: error starting recording: %w", err)
|
||||
return
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
errChan <- fmt.Errorf("recording: unexpected status: %v", resp.Status)
|
||||
return
|
||||
}
|
||||
errChan <- nil
|
||||
}()
|
||||
select {
|
||||
case <-got100:
|
||||
case err := <-errChan:
|
||||
// If we get an error before we get the 100-continue response,
|
||||
// we need to try another recorder.
|
||||
if err == nil {
|
||||
// If the error is nil, we got a 200 response, which
|
||||
// is unexpected as we haven't sent any data yet.
|
||||
err = errors.New("recording: unexpected EOF")
|
||||
}
|
||||
attempt.FailureMessage = err.Error()
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
return pw, attempts, errChan, nil
|
||||
}
|
||||
return nil, attempts, nil, multierr.New(errs...)
|
||||
}
|
||||
|
||||
func (ss *sshSession) openFileForRecording(now time.Time) (_ io.WriteCloser, err error) {
|
||||
varRoot := ss.conn.srv.lb.TailscaleVarRoot()
|
||||
if varRoot == "" {
|
||||
@@ -1672,7 +1548,7 @@ func (ss *sshSession) startNewRecording() (_ *recording, err error) {
|
||||
} else {
|
||||
var errChan <-chan error
|
||||
var attempts []*tailcfg.SSHRecordingAttempt
|
||||
rec.out, attempts, errChan, err = ss.connectToRecorder(ctx, recorders)
|
||||
rec.out, attempts, errChan, err = ConnectToRecorder(ctx, recorders, ss.conn.srv.lb.Dialer().UserDial)
|
||||
if err != nil {
|
||||
if onFailure != nil && onFailure.NotifyURL != "" && len(attempts) > 0 {
|
||||
eventType := tailcfg.SSHSessionRecordingFailed
|
||||
|
Reference in New Issue
Block a user