cmd/k8s-operator,k8s-operator/sessionrecording: ensure recording header contains terminal size for terminal sessions (#12965)

* cmd/k8s-operator,k8s-operator/sessonrecording: ensure CastHeader contains terminal size

For tsrecorder to be able to play session recordings, the recording's
CastHeader must have '.Width' and '.Height' fields set to non-zero.
Kubectl (or whoever is the client that initiates the 'kubectl exec'
session recording) sends the terminal dimensions in a resize message that
the API server proxy can intercept, however that races with the first server
message that we need to record.
This PR ensures we wait for the terminal dimensions to be processed from
the first resize message before any other data is sent, so that for all
sessions with terminal attached, the header of the session recording
contains the terminal dimensions and the recording can be played by tsrecorder.

Updates tailscale/tailscale#19821

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina
2024-09-03 20:42:02 +03:00
committed by GitHub
parent 1c972bc7cb
commit 8e1c00f841
7 changed files with 265 additions and 95 deletions

View File

@@ -31,11 +31,10 @@ import (
"tailscale.com/util/set" "tailscale.com/util/set"
) )
var whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil))
var ( var (
// counterNumRequestsproxies counts the number of API server requests proxied via this proxy. // counterNumRequestsproxies counts the number of API server requests proxied via this proxy.
counterNumRequestsProxied = clientmetric.NewCounter("k8s_auth_proxy_requests_proxied") counterNumRequestsProxied = clientmetric.NewCounter("k8s_auth_proxy_requests_proxied")
whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil))
) )
type apiServerProxyMode int type apiServerProxyMode int
@@ -222,6 +221,12 @@ func (ap *apiserverProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
} }
func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) { func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) {
const (
podNameKey = "pod"
namespaceNameKey = "namespace"
upgradeHeaderKey = "Upgrade"
)
who, err := ap.whoIs(r) who, err := ap.whoIs(r)
if err != nil { if err != nil {
ap.authError(w, err) ap.authError(w, err)
@@ -246,7 +251,7 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
} }
wantsHeader := upgradeHeaderForProto[proto] wantsHeader := upgradeHeaderForProto[proto]
if h := r.Header.Get("Upgrade"); h != wantsHeader { if h := r.Header.Get(upgradeHeaderKey); h != wantsHeader {
msg := fmt.Sprintf("[unexpected] unable to verify that streaming protocol is %s, wants Upgrade header %q, got: %q", proto, wantsHeader, h) msg := fmt.Sprintf("[unexpected] unable to verify that streaming protocol is %s, wants Upgrade header %q, got: %q", proto, wantsHeader, h)
if failOpen { if failOpen {
msg = msg + "; failure mode is 'fail open'; continuing session without recording." msg = msg + "; failure mode is 'fail open'; continuing session without recording."
@@ -268,8 +273,8 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
Who: who, Who: who,
Addrs: addrs, Addrs: addrs,
FailOpen: failOpen, FailOpen: failOpen,
Pod: r.PathValue("pod"), Pod: r.PathValue(podNameKey),
Namespace: r.PathValue("namespace"), Namespace: r.PathValue(namespaceNameKey),
Log: ap.log, Log: ap.log,
} }
h := ksr.New(opts) h := ksr.New(opts)
@@ -309,9 +314,11 @@ func (h *apiserverProxy) addImpersonationHeadersAsRequired(r *http.Request) {
log.Printf("failed to add impersonation headers: " + err.Error()) log.Printf("failed to add impersonation headers: " + err.Error())
} }
} }
func (ap *apiserverProxy) whoIs(r *http.Request) (*apitype.WhoIsResponse, error) { func (ap *apiserverProxy) whoIs(r *http.Request) (*apitype.WhoIsResponse, error) {
return ap.lc.WhoIs(r.Context(), r.RemoteAddr) return ap.lc.WhoIs(r.Context(), r.RemoteAddr)
} }
func (ap *apiserverProxy) authError(w http.ResponseWriter, err error) { func (ap *apiserverProxy) authError(w http.ResponseWriter, err error) {
ap.log.Errorf("failed to authenticate caller: %v", err) ap.log.Errorf("failed to authenticate caller: %v", err)
http.Error(w, "failed to authenticate caller", http.StatusInternalServerError) http.Error(w, "failed to authenticate caller", http.StatusInternalServerError)

View File

@@ -126,7 +126,10 @@ func (h *Hijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, error) { func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, error) {
const ( const (
// https://docs.asciinema.org/manual/asciicast/v2/ // https://docs.asciinema.org/manual/asciicast/v2/
asciicastv2 = 2 asciicastv2 = 2
ttyKey = "tty"
commandKey = "command"
containerKey = "container"
) )
var ( var (
wc io.WriteCloser wc io.WriteCloser
@@ -153,18 +156,20 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
// TODO (irbekrm): log which recorder // TODO (irbekrm): log which recorder
h.log.Info("successfully connected to a session recorder") h.log.Info("successfully connected to a session recorder")
cl := tstime.DefaultClock{} cl := tstime.DefaultClock{}
rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen) rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen, h.log)
qp := h.req.URL.Query() qp := h.req.URL.Query()
tty := strings.Join(qp[ttyKey], "")
hasTerm := (tty == "true") // session has terminal attached
ch := sessionrecording.CastHeader{ ch := sessionrecording.CastHeader{
Version: asciicastv2, Version: asciicastv2,
Timestamp: cl.Now().Unix(), Timestamp: cl.Now().Unix(),
Command: strings.Join(qp["command"], " "), Command: strings.Join(qp[commandKey], " "),
SrcNode: strings.TrimSuffix(h.who.Node.Name, "."), SrcNode: strings.TrimSuffix(h.who.Node.Name, "."),
SrcNodeID: h.who.Node.StableID, SrcNodeID: h.who.Node.StableID,
Kubernetes: &sessionrecording.Kubernetes{ Kubernetes: &sessionrecording.Kubernetes{
PodName: h.pod, PodName: h.pod,
Namespace: h.ns, Namespace: h.ns,
Container: strings.Join(qp["container"], " "), Container: strings.Join(qp[containerKey], " "),
}, },
} }
if !h.who.Node.IsTagged() { if !h.who.Node.IsTagged() {
@@ -177,9 +182,9 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
var lc net.Conn var lc net.Conn
switch h.proto { switch h.proto {
case SPDYProtocol: case SPDYProtocol:
lc = spdy.New(conn, rec, ch, h.log) lc = spdy.New(conn, rec, ch, hasTerm, h.log)
case WSProtocol: case WSProtocol:
lc = ws.New(conn, rec, ch, h.log) lc = ws.New(conn, rec, ch, hasTerm, h.log)
default: default:
return nil, fmt.Errorf("unknown protocol: %s", h.proto) return nil, fmt.Errorf("unknown protocol: %s", h.proto)
} }

View File

@@ -28,14 +28,16 @@ import (
// The hijacked connection is used to transmit SPDY streams between Kubernetes client ('kubectl') and the destination container. // The hijacked connection is used to transmit SPDY streams between Kubernetes client ('kubectl') and the destination container.
// Data read from the underlying network connection is data sent via one of the SPDY streams from the client to the container. // Data read from the underlying network connection is data sent via one of the SPDY streams from the client to the container.
// Data written to the underlying connection is data sent from the container to the client. // Data written to the underlying connection is data sent from the container to the client.
// We parse the data and send everything for the STDOUT/STDERR streams to the configured tsrecorder as an asciinema recording with the provided header. // We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header.
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#background-remotecommand-subprotocol // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#background-remotecommand-subprotocol
func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, log *zap.SugaredLogger) net.Conn { func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn {
return &conn{ return &conn{
Conn: nc, Conn: nc,
rec: rec, rec: rec,
ch: ch, ch: ch,
log: log, log: log,
hasTerm: hasTerm,
initialTermSizeSet: make(chan struct{}),
} }
} }
@@ -47,7 +49,6 @@ type conn struct {
net.Conn net.Conn
// rec knows how to send data written to it to a tsrecorder instance. // rec knows how to send data written to it to a tsrecorder instance.
rec *tsrecorder.Client rec *tsrecorder.Client
ch sessionrecording.CastHeader
stdoutStreamID atomic.Uint32 stdoutStreamID atomic.Uint32
stderrStreamID atomic.Uint32 stderrStreamID atomic.Uint32
@@ -56,8 +57,37 @@ type conn struct {
wmu sync.Mutex // sequences writes wmu sync.Mutex // sequences writes
closed bool closed bool
rmu sync.Mutex // sequences reads rmu sync.Mutex // sequences reads
// The following fields are related to sending asciinema CastHeader.
// CastHeader must be sent before any payload. If the session has a
// terminal attached, the CastHeader must have '.Width' and '.Height'
// fields set for the tsrecorder UI to be able to play the recording.
// For 'kubectl exec' sessions, terminal width and height are sent as a
// resize message on resize stream from the client when the session
// starts as well as at any time the client detects a terminal change.
// We can intercept the resize message on Read calls. As there is no
// guarantee that the resize message from client will be intercepted
// before server writes stdout messages that we must record, we need to
// ensure that parsing stdout/stderr messages written to the connection
// waits till a resize message has been received and a CastHeader with
// correct terminal dimensions can be written.
// ch is the asciinema CastHeader for the current session.
// https://docs.asciinema.org/manual/asciicast/v2/#header
ch sessionrecording.CastHeader
// writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once.
writeCastHeaderOnce sync.Once writeCastHeaderOnce sync.Once
hasTerm bool // whether the session had TTY attached
// initialTermSizeSet channel gets sent a value once, when the Read has
// received a resize message and set the initial terminal size. It must
// be set to a buffered channel to prevent Reads being blocked on the
// first stdout/stderr write reading from the channel.
initialTermSizeSet chan struct{}
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
// initialTermSizeSet channel only once, when the initial resize message
// is received.
sendinitialTermSizeSetOnce sync.Once
zlibReqReader zlibReader zlibReqReader zlibReader
// writeBuf is used to store data written to the connection that has not // writeBuf is used to store data written to the connection that has not
@@ -97,13 +127,28 @@ func (c *conn) Read(b []byte) (int, error) {
if !sf.Ctrl { // data frame if !sf.Ctrl { // data frame
switch sf.StreamID { switch sf.StreamID {
case c.resizeStreamID.Load(): case c.resizeStreamID.Load():
var err error
var msg spdyResizeMsg var msg spdyResizeMsg
if err = json.Unmarshal(sf.Payload, &msg); err != nil { if err = json.Unmarshal(sf.Payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize msg: %w", err) return 0, fmt.Errorf("error umarshalling resize msg: %w", err)
} }
c.ch.Width = msg.Width c.ch.Width = msg.Width
c.ch.Height = msg.Height c.ch.Height = msg.Height
// If this is initial resize message, the width and
// height will be sent in the CastHeader. If this is a
// subsequent resize message, we need to send asciinema
// resize message.
var isInitialResize bool
c.sendinitialTermSizeSetOnce.Do(func() {
isInitialResize = true
close(c.initialTermSizeSet) // unblock sending of CastHeader
})
if !isInitialResize {
if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil {
return 0, fmt.Errorf("error writing resize message: %w", err)
}
}
} }
return n, nil return n, nil
} }
@@ -147,21 +192,21 @@ func (c *conn) Write(b []byte) (int, error) {
case c.stdoutStreamID.Load(), c.stderrStreamID.Load(): case c.stdoutStreamID.Load(), c.stderrStreamID.Load():
var err error var err error
c.writeCastHeaderOnce.Do(func() { c.writeCastHeaderOnce.Do(func() {
var j []byte // If this is a session with a terminal attached,
j, err = json.Marshal(c.ch) // we must wait for the terminal width and
if err != nil { // height to be parsed from a resize message
return // before sending CastHeader, else tsrecorder
} // will not be able to play this recording.
j = append(j, '\n') if c.hasTerm {
err = c.rec.WriteCastLine(j) c.log.Debugf("write: waiting for the initial terminal size to be set before proceeding with sending the first payload")
if err != nil { <-c.initialTermSizeSet
c.log.Errorf("received error from recorder: %v", err)
} }
err = c.rec.WriteCastHeader(c.ch)
}) })
if err != nil { if err != nil {
return 0, fmt.Errorf("error writing CastHeader: %w", err) return 0, fmt.Errorf("error writing CastHeader: %w", err)
} }
if err := c.rec.Write(sf.Payload); err != nil { if err := c.rec.WriteOutput(sf.Payload); err != nil {
return 0, fmt.Errorf("error sending payload to session recorder: %w", err) return 0, fmt.Errorf("error sending payload to session recorder: %w", err)
} }
} }

View File

@@ -29,13 +29,15 @@ func Test_Writes(t *testing.T) {
} }
cl := tstest.NewClock(tstest.ClockOpts{}) cl := tstest.NewClock(tstest.ClockOpts{})
tests := []struct { tests := []struct {
name string name string
inputs [][]byte inputs [][]byte
wantForwarded []byte wantForwarded []byte
wantRecorded []byte wantRecorded []byte
firstWrite bool firstWrite bool
width int width int
height int height int
sendInitialResize bool
hasTerm bool
}{ }{
{ {
name: "single_write_control_frame_with_payload", name: "single_write_control_frame_with_payload",
@@ -76,7 +78,18 @@ func Test_Writes(t *testing.T) {
wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl), wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl),
}, },
{ {
name: "single_first_write_stdout_data_frame_with_payload", name: "single_first_write_stdout_data_frame_with_payload_sess_has_terminal",
inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...),
width: 10,
height: 20,
hasTerm: true,
firstWrite: true,
sendInitialResize: true,
},
{
name: "single_first_write_stdout_data_frame_with_payload_sess_does_not_have_terminal",
inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...), wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...),
@@ -89,7 +102,7 @@ func Test_Writes(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tc := &fakes.TestConn{} tc := &fakes.TestConn{}
sr := &fakes.TestSessionRecorder{} sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true) rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
c := &conn{ c := &conn{
Conn: tc, Conn: tc,
@@ -99,15 +112,21 @@ func Test_Writes(t *testing.T) {
Width: tt.width, Width: tt.width,
Height: tt.height, Height: tt.height,
}, },
initialTermSizeSet: make(chan struct{}),
hasTerm: tt.hasTerm,
} }
if !tt.firstWrite { if !tt.firstWrite {
// this test case does not intend to test that cast header gets written once // this test case does not intend to test that cast header gets written once
c.writeCastHeaderOnce.Do(func() {}) c.writeCastHeaderOnce.Do(func() {})
} }
if tt.sendInitialResize {
close(c.initialTermSizeSet)
}
c.stdoutStreamID.Store(stdoutStreamID) c.stdoutStreamID.Store(stdoutStreamID)
c.stderrStreamID.Store(stderrStreamID) c.stderrStreamID.Store(stderrStreamID)
for i, input := range tt.inputs { for i, input := range tt.inputs {
c.hasTerm = tt.hasTerm
if _, err := c.Write(input); err != nil { if _, err := c.Write(input); err != nil {
t.Errorf("[%d] spdyRemoteConnRecorder.Write() unexpected error %v", i, err) t.Errorf("[%d] spdyRemoteConnRecorder.Write() unexpected error %v", i, err)
} }
@@ -195,11 +214,12 @@ func Test_Reads(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tc := &fakes.TestConn{} tc := &fakes.TestConn{}
sr := &fakes.TestSessionRecorder{} sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true) rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
c := &conn{ c := &conn{
Conn: tc, Conn: tc,
log: zl.Sugar(), log: zl.Sugar(),
rec: rec, rec: rec,
initialTermSizeSet: make(chan struct{}),
} }
c.resizeStreamID.Store(tt.resizeStreamIDBeforeRead) c.resizeStreamID.Store(tt.resizeStreamIDBeforeRead)

View File

@@ -14,10 +14,12 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap"
"tailscale.com/sessionrecording"
"tailscale.com/tstime" "tailscale.com/tstime"
) )
func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool) *Client { func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool, logger *zap.SugaredLogger) *Client {
return &Client{ return &Client{
start: start, start: start,
clock: clock, clock: clock,
@@ -35,38 +37,66 @@ type Client struct {
// failOpen specifies whether the session should be allowed to // failOpen specifies whether the session should be allowed to
// continue if writing to the recording fails. // continue if writing to the recording fails.
failOpen bool failOpen bool
// failedOpen is set to true if the recording of this session failed and
// we should not attempt to send any more data.
failedOpen bool
// backOff is set to true if we've failed open and should stop logger *zap.SugaredLogger
// attempting to write to tsrecorder.
backOff bool
mu sync.Mutex // guards writes to conn mu sync.Mutex // guards writes to conn
conn io.WriteCloser // connection to a tsrecorder instance conn io.WriteCloser // connection to a tsrecorder instance
} }
// Write appends timestamp to the provided bytes and sends them to the // WriteOutput sends terminal stdout and stderr to the tsrecorder.
// configured tsrecorder. // https://docs.asciinema.org/manual/asciicast/v2/#o-output-data-written-to-a-terminal
func (rec *Client) Write(p []byte) (err error) { func (rec *Client) WriteOutput(p []byte) (err error) {
const outputEventCode = "o"
if len(p) == 0 { if len(p) == 0 {
return nil return nil
} }
if rec.backOff { return rec.write([]any{
rec.clock.Now().Sub(rec.start).Seconds(),
outputEventCode,
string(p)})
}
// WriteResize writes an asciinema resize message. This can be called if
// terminal size has changed.
// https://docs.asciinema.org/manual/asciicast/v2/#r-resize
func (rec *Client) WriteResize(height, width int) (err error) {
const resizeEventCode = "r"
p := fmt.Sprintf("%dx%d", height, width)
return rec.write([]any{
rec.clock.Now().Sub(rec.start).Seconds(),
resizeEventCode,
string(p)})
}
// WriteCastHeaders writes asciinema CastHeader. This must be called once,
// before any payload is sent to the tsrecorder.
// https://docs.asciinema.org/manual/asciicast/v2/#header
func (rec *Client) WriteCastHeader(ch sessionrecording.CastHeader) error {
return rec.write(ch)
}
// write writes the data to session recorder. If recording fails and policy is
// 'fail open', sets the state to failed and does not attempt to write any more
// data during this session.
func (rec *Client) write(data any) error {
if rec.failedOpen {
return nil return nil
} }
j, err := json.Marshal([]any{ j, err := json.Marshal(data)
rec.clock.Now().Sub(rec.start).Seconds(),
"o",
string(p),
})
if err != nil { if err != nil {
return fmt.Errorf("error marhalling payload: %w", err) return fmt.Errorf("error marshalling data as json: %v", err)
} }
j = append(j, '\n') j = append(j, '\n')
if err := rec.WriteCastLine(j); err != nil { if err := rec.writeCastLine(j); err != nil {
if !rec.failOpen { if !rec.failOpen {
return fmt.Errorf("error writing payload to recorder: %w", err) return fmt.Errorf("error writing payload to recorder: %w", err)
} }
rec.backOff = true rec.logger.Infof("error writing to tsrecorder: %v. Failure policy is to fail open, so rest of session contents will not be recorded.", err)
rec.failedOpen = true
} }
return nil return nil
} }
@@ -82,9 +112,9 @@ func (rec *Client) Close() error {
return err return err
} }
// writeCastLine sends bytes to the tsrecorder. The bytes should be in // writeToRecorder sends bytes to the tsrecorder. The bytes should be in
// asciinema format. // asciinema format.
func (c *Client) WriteCastLine(j []byte) error { func (c *Client) writeCastLine(j []byte) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.conn == nil { if c.conn == nil {

View File

@@ -28,14 +28,16 @@ import (
// The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes. // The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes.
// Data read from the underlying network connection is data sent via one of the streams from the client to the container. // Data read from the underlying network connection is data sent via one of the streams from the client to the container.
// Data written to the underlying connection is data sent from the container to the client. // Data written to the underlying connection is data sent from the container to the client.
// We parse the data and send everything for the STDOUT/STDERR streams to the configured tsrecorder as an asciinema recording with the provided header. // We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header.
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio
func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, log *zap.SugaredLogger) net.Conn { func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn {
return &conn{ return &conn{
Conn: c, Conn: c,
rec: rec, rec: rec,
ch: ch, ch: ch,
log: log, hasTerm: hasTerm,
log: log,
initialTermSizeSet: make(chan struct{}, 1),
} }
} }
@@ -49,8 +51,37 @@ type conn struct {
net.Conn net.Conn
// rec knows how to send data to a tsrecorder instance. // rec knows how to send data to a tsrecorder instance.
rec *tsrecorder.Client rec *tsrecorder.Client
// ch is the asiinema CastHeader for a session.
ch sessionrecording.CastHeader // The following fields are related to sending asciinema CastHeader.
// CastHeader must be sent before any payload. If the session has a
// terminal attached, the CastHeader must have '.Width' and '.Height'
// fields set for the tsrecorder UI to be able to play the recording.
// For 'kubectl exec' sessions, terminal width and height are sent as a
// resize message on resize stream from the client when the session
// starts as well as at any time the client detects a terminal change.
// We can intercept the resize message on Read calls. As there is no
// guarantee that the resize message from client will be intercepted
// before server writes stdout messages that we must record, we need to
// ensure that parsing stdout/stderr messages written to the connection
// waits till a resize message has been received and a CastHeader with
// correct terminal dimensions can be written.
// ch is asciinema CastHeader for the current session.
// https://docs.asciinema.org/manual/asciicast/v2/#header
ch sessionrecording.CastHeader
// writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once.
writeCastHeaderOnce sync.Once
hasTerm bool // whether the session has TTY attached
// initialTermSizeSet channel gets sent a value once, when the Read has
// received a resize message and set the initial terminal size. It must
// be set to a buffered channel to prevent Reads being blocked on the
// first stdout/stderr write reading from the channel.
initialTermSizeSet chan struct{}
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
// initialTermSizeSet channel only once, when the initial resize message
// is received.
sendInitialTermSizeSetOnce sync.Once
log *zap.SugaredLogger log *zap.SugaredLogger
rmu sync.Mutex // sequences reads rmu sync.Mutex // sequences reads
@@ -63,9 +94,8 @@ type conn struct {
// the original byte array. // the original byte array.
readBuf bytes.Buffer readBuf bytes.Buffer
wmu sync.Mutex // sequences writes wmu sync.Mutex // sequences writes
writeCastHeaderOnce sync.Once closed bool // connection is closed
closed bool // connection is closed
// writeBuf contains bytes for a currently parsed binary data message // writeBuf contains bytes for a currently parsed binary data message
// being written to the underlying conn. If the message is masked, it is // being written to the underlying conn. If the message is masked, it is
// unmasked in place, so having this buffer allows us to avoid modifying // unmasked in place, so having this buffer allows us to avoid modifying
@@ -140,17 +170,32 @@ func (c *conn) Read(b []byte) (int, error) {
} }
c.readBuf.Next(len(readMsg.raw)) c.readBuf.Next(len(readMsg.raw))
if readMsg.isFinalized { if readMsg.isFinalized && !c.readMsgIsIncomplete() {
// Stream IDs for websocket streams are static. // Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg.streamID.Load() == remotecommand.StreamResize { if readMsg.streamID.Load() == remotecommand.StreamResize {
var err error
var msg tsrecorder.ResizeMsg var msg tsrecorder.ResizeMsg
if err = json.Unmarshal(readMsg.payload, &msg); err != nil { if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize message: %w", err) return 0, fmt.Errorf("error umarshalling resize message: %w", err)
} }
c.ch.Width = msg.Width c.ch.Width = msg.Width
c.ch.Height = msg.Height c.ch.Height = msg.Height
// If this is initial resize message, the width and
// height will be sent in the CastHeader. If this is a
// subsequent resize message, we need to send asciinema
// resize message.
var isInitialResize bool
c.sendInitialTermSizeSetOnce.Do(func() {
isInitialResize = true
close(c.initialTermSizeSet) // unblock sending of CastHeader
})
if !isInitialResize {
if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil {
return 0, fmt.Errorf("error writing resize message: %w", err)
}
}
} }
} }
c.currentReadMsg = readMsg c.currentReadMsg = readMsg
@@ -209,22 +254,21 @@ func (c *conn) Write(b []byte) (int, error) {
if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr { if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr {
var err error var err error
c.writeCastHeaderOnce.Do(func() { c.writeCastHeaderOnce.Do(func() {
var j []byte // If this is a session with a terminal attached,
j, err = json.Marshal(c.ch) // we must wait for the terminal width and
if err != nil { // height to be parsed from a resize message
c.log.Errorf("error marhsalling conn: %v", err) // before sending CastHeader, else tsrecorder
return // will not be able to play this recording.
} if c.hasTerm {
j = append(j, '\n') c.log.Debug("waiting for terminal size to be set before starting to send recorded data")
err = c.rec.WriteCastLine(j) <-c.initialTermSizeSet
if err != nil {
c.log.Errorf("received error from recorder: %v", err)
} }
err = c.rec.WriteCastHeader(c.ch)
}) })
if err != nil { if err != nil {
return 0, fmt.Errorf("error writing CastHeader: %w", err) return 0, fmt.Errorf("error writing CastHeader: %w", err)
} }
if err := c.rec.Write(writeMsg.payload); err != nil { if err := c.rec.WriteOutput(writeMsg.payload); err != nil {
return 0, fmt.Errorf("error writing message to recorder: %v", err) return 0, fmt.Errorf("error writing message to recorder: %v", err)
} }
} }

View File

@@ -65,6 +65,7 @@ func Test_conn_Read(t *testing.T) {
log: zl.Sugar(), log: zl.Sugar(),
} }
for i, input := range tt.inputs { for i, input := range tt.inputs {
c.initialTermSizeSet = make(chan struct{})
if err := tc.WriteReadBufBytes(input); err != nil { if err := tc.WriteReadBufBytes(input); err != nil {
t.Fatalf("writing bytes to test conn: %v", err) t.Fatalf("writing bytes to test conn: %v", err)
} }
@@ -93,13 +94,15 @@ func Test_conn_Write(t *testing.T) {
} }
cl := tstest.NewClock(tstest.ClockOpts{}) cl := tstest.NewClock(tstest.ClockOpts{})
tests := []struct { tests := []struct {
name string name string
inputs [][]byte inputs [][]byte
wantForwarded []byte wantForwarded []byte
wantRecorded []byte wantRecorded []byte
firstWrite bool firstWrite bool
width int width int
height int height int
hasTerm bool
sendInitialResize bool
}{ }{
{ {
name: "single_write_control_frame", name: "single_write_control_frame",
@@ -144,12 +147,23 @@ func Test_conn_Write(t *testing.T) {
wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}, wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5},
wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl), wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl),
}, },
{
name: "three_writes_stdout_data_message_with_split_fragment_cast_header_with_terminal",
inputs: [][]byte{{0x2, 0x3, 0x1, 0x7, 0x8}, {0x80, 0x6, 0x1, 0x1, 0x2, 0x3}, {0x4, 0x5}},
wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5},
wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl)...),
height: 20,
width: 10,
hasTerm: true,
firstWrite: true,
sendInitialResize: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tc := &fakes.TestConn{} tc := &fakes.TestConn{}
sr := &fakes.TestSessionRecorder{} sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true) rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
c := &conn{ c := &conn{
Conn: tc, Conn: tc,
log: zl.Sugar(), log: zl.Sugar(),
@@ -157,12 +171,17 @@ func Test_conn_Write(t *testing.T) {
Width: tt.width, Width: tt.width,
Height: tt.height, Height: tt.height,
}, },
rec: rec, rec: rec,
initialTermSizeSet: make(chan struct{}),
hasTerm: tt.hasTerm,
} }
if !tt.firstWrite { if !tt.firstWrite {
// This test case does not intend to test that cast header gets written once. // This test case does not intend to test that cast header gets written once.
c.writeCastHeaderOnce.Do(func() {}) c.writeCastHeaderOnce.Do(func() {})
} }
if tt.sendInitialResize {
close(c.initialTermSizeSet)
}
for i, input := range tt.inputs { for i, input := range tt.inputs {
_, err := c.Write(input) _, err := c.Write(input)
if err != nil { if err != nil {
@@ -221,7 +240,7 @@ func Test_conn_WriteRand(t *testing.T) {
} }
cl := tstest.NewClock(tstest.ClockOpts{}) cl := tstest.NewClock(tstest.ClockOpts{})
sr := &fakes.TestSessionRecorder{} sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true) rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
for i := range 100 { for i := range 100 {
tc := &fakes.TestConn{} tc := &fakes.TestConn{}
c := &conn{ c := &conn{