2024-08-14 19:57:50 +03:00
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
2025-07-14 15:17:20 +01:00
// package ws has functionality to parse 'kubectl exec/attach' sessions streamed using
2024-08-14 19:57:50 +03:00
// WebSocket protocol.
package ws
import (
"bytes"
2025-07-14 15:17:20 +01:00
"context"
2024-08-14 19:57:50 +03:00
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/remotecommand"
"tailscale.com/k8s-operator/sessionrecording/tsrecorder"
"tailscale.com/sessionrecording"
"tailscale.com/util/multierr"
)
// New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
2025-07-14 15:17:20 +01:00
// The connection must be a hijacked connection for a 'kubectl exec/attach' session using WebSocket protocol and a *.channel.k8s.io subprotocol.
2024-08-14 19:57:50 +03:00
// The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes.
// Data read from the underlying network connection is data sent via one of the streams from the client to the container.
// Data written to the underlying connection is data sent from the container to the client.
2024-09-03 20:42:02 +03:00
// We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header.
2024-08-14 19:57:50 +03:00
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio
2025-07-14 15:17:20 +01:00
func New ( ctx context . Context , c net . Conn , rec * tsrecorder . Client , ch sessionrecording . CastHeader , hasTerm bool , log * zap . SugaredLogger ) ( net . Conn , error ) {
lc := & conn {
Conn : c ,
ctx : ctx ,
rec : rec ,
ch : ch ,
hasTerm : hasTerm ,
log : log ,
initialCastHeaderSent : make ( chan struct { } , 1 ) ,
2024-08-14 19:57:50 +03:00
}
2025-07-14 15:17:20 +01:00
// if there is no term, we don't need to wait for a resize message
if ! hasTerm {
var err error
lc . writeCastHeaderOnce . Do ( func ( ) {
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
err = lc . rec . WriteCastHeader ( ch )
close ( lc . initialCastHeaderSent )
} )
if err != nil {
return nil , fmt . Errorf ( "error writing CastHeader: %w" , err )
}
}
return lc , nil
2024-08-14 19:57:50 +03:00
}
// conn is a wrapper around net.Conn. It reads the bytestream
2025-07-14 15:17:20 +01:00
// for a 'kubectl exec/attach' session, sends session recording data to the configured
2024-08-14 19:57:50 +03:00
// recorder and forwards the raw bytes to the original destination.
// A new conn is created per session.
2025-07-14 15:17:20 +01:00
// conn only knows to how to read a 'kubectl exec/attach' session that is streamed using WebSocket protocol.
2024-08-14 19:57:50 +03:00
// https://www.rfc-editor.org/rfc/rfc6455
type conn struct {
net . Conn
2025-07-14 15:17:20 +01:00
ctx context . Context
2024-08-14 19:57:50 +03:00
// rec knows how to send data to a tsrecorder instance.
rec * tsrecorder . Client
2024-09-03 20:42:02 +03:00
// The following fields are related to sending asciinema CastHeader.
// CastHeader must be sent before any payload. If the session has a
// terminal attached, the CastHeader must have '.Width' and '.Height'
// fields set for the tsrecorder UI to be able to play the recording.
2025-07-14 15:17:20 +01:00
// For 'kubectl exec/attach' sessions, terminal width and height are sent as a
2024-09-03 20:42:02 +03:00
// resize message on resize stream from the client when the session
// starts as well as at any time the client detects a terminal change.
// We can intercept the resize message on Read calls. As there is no
// guarantee that the resize message from client will be intercepted
// before server writes stdout messages that we must record, we need to
// ensure that parsing stdout/stderr messages written to the connection
// waits till a resize message has been received and a CastHeader with
// correct terminal dimensions can be written.
// ch is asciinema CastHeader for the current session.
// https://docs.asciinema.org/manual/asciicast/v2/#header
ch sessionrecording . CastHeader
// writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once.
writeCastHeaderOnce sync . Once
hasTerm bool // whether the session has TTY attached
2025-07-14 15:17:20 +01:00
// initialCastHeaderSent is a boolean that is set to ensure that the cast
// header is the first thing that is streamed to the session recorder.
// Otherwise the stream will fail.
initialCastHeaderSent chan struct { }
2024-09-03 20:42:02 +03:00
2024-08-14 19:57:50 +03:00
log * zap . SugaredLogger
rmu sync . Mutex // sequences reads
// currentReadMsg contains parsed contents of a websocket binary data message that
// is currently being read from the underlying net.Conn.
currentReadMsg * message
// readBuf contains bytes for a currently parsed binary data message
// read from the underlying conn. If the message is masked, it is
// unmasked in place, so having this buffer allows us to avoid modifying
// the original byte array.
readBuf bytes . Buffer
2024-09-03 20:42:02 +03:00
wmu sync . Mutex // sequences writes
closed bool // connection is closed
2024-08-14 19:57:50 +03:00
// writeBuf contains bytes for a currently parsed binary data message
// being written to the underlying conn. If the message is masked, it is
// unmasked in place, so having this buffer allows us to avoid modifying
// the original byte array.
writeBuf bytes . Buffer
// currentWriteMsg contains parsed contents of a websocket binary data message that
// is currently being written to the underlying net.Conn.
currentWriteMsg * message
}
// Read reads bytes from the original connection and parses them as websocket
// message fragments.
// Bytes read from the original connection are the bytes sent from the Kubernetes client (kubectl) to the destination container via kubelet.
// If the message is for the resize stream, sets the width
// and height of the CastHeader for this connection.
// The fragment can be incomplete.
func ( c * conn ) Read ( b [ ] byte ) ( int , error ) {
c . rmu . Lock ( )
defer c . rmu . Unlock ( )
n , err := c . Conn . Read ( b )
if err != nil {
// It seems that we sometimes get a wrapped io.EOF, but the
// caller checks for io.EOF with ==.
if errors . Is ( err , io . EOF ) {
2025-07-25 13:55:25 +01:00
c . log . Infof ( "FOOOOOOOOOOOO: got %v from read, returning EOF" , err )
2024-08-14 19:57:50 +03:00
err = io . EOF
}
2025-07-25 13:55:25 +01:00
c . log . Infof ( "FOOOOOOOOOOO: Read errored: %v" , err )
return n , err
2024-08-14 19:57:50 +03:00
}
if n == 0 {
2025-07-25 13:55:25 +01:00
c . log . Info ( "[unexpected] Read called for 0 length bytes" )
2024-08-14 19:57:50 +03:00
return 0 , nil
}
typ := messageType ( opcode ( b ) )
if ( typ == noOpcode && c . readMsgIsIncomplete ( ) ) || c . readBufHasIncompleteFragment ( ) { // subsequent fragment
if typ , err = c . curReadMsgType ( ) ; err != nil {
2025-07-25 13:55:25 +01:00
c . log . Infof ( "FOOOOOOOOOOOO: got %v from curReadMsgType" , err )
2024-08-14 19:57:50 +03:00
return 0 , err
}
}
// A control message can not be fragmented and we are not interested in
// these messages. Just return.
if isControlMessage ( typ ) {
return n , nil
}
// The only data message type that Kubernetes supports is binary message.
// If we received another message type, return and let the API server close the connection.
// https://github.com/kubernetes/client-go/blob/release-1.30/tools/remotecommand/websocket.go#L281
if typ != binaryMessage {
c . log . Infof ( "[unexpected] received a data message with a type that is not binary message type %v" , typ )
return n , nil
}
2025-07-25 19:10:53 +01:00
c . log . Infof ( "FOOOOOOOOOOOO: Writing %d bytes to readBuf" , n )
2024-08-14 19:57:50 +03:00
if _ , err := c . readBuf . Write ( b [ : n ] ) ; err != nil {
2025-07-25 13:55:25 +01:00
c . log . Infof ( "FOOOOOOOOOOOO: got %v from readBuf" , err )
2024-08-14 19:57:50 +03:00
return 0 , fmt . Errorf ( "[unexpected] error writing message contents to read buffer: %w" , err )
}
2025-07-25 19:10:53 +01:00
for c . readBuf . Len ( ) != 0 {
readMsg := & message { typ : typ } // start a new message...
// ... or pick up an already started one if the previous fragment was not final.
if c . readMsgIsIncomplete ( ) {
c . log . Infof ( "FOOOOOOOOOOOO: continuing a message fragment with type %v, incompleteFragment: %v, len: %d" , c . currentReadMsg . typ , c . readBufHasIncompleteFragment ( ) , c . readBuf . Len ( ) )
readMsg = c . currentReadMsg
}
2024-08-14 19:57:50 +03:00
2025-07-25 19:10:53 +01:00
c . log . Infof ( "FOOOOOOOOOOOO: readMsg buffer has length %d, contents: %v" , c . readBuf . Len ( ) , c . readBuf . Bytes ( ) )
ok , err := readMsg . Parse ( c . readBuf . Bytes ( ) , c . log )
if err != nil {
c . log . Infof ( "FOOOOOOOOOOOO: got %v from readMsg.Parse" , err )
return 0 , fmt . Errorf ( "error parsing message: %v" , err )
}
if ! ok { // incomplete fragment
return n , nil
}
c . log . Infof ( "Popping %d bytes from readBuf" , len ( readMsg . raw ) )
c . readBuf . Next ( len ( readMsg . raw ) )
if readMsg . isFinalized && ! c . readMsgIsIncomplete ( ) {
// we want to send stream resize messages for terminal sessions
// Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg . streamID . Load ( ) == remotecommand . StreamResize && c . hasTerm {
var msg tsrecorder . ResizeMsg
c . log . Infof ( "FOOOOOOOOOOOO: unmarshalling payload: %s" , string ( readMsg . payload ) )
d := json . NewDecoder ( bytes . NewReader ( readMsg . payload ) )
var count int
for d . More ( ) {
msg = tsrecorder . ResizeMsg { }
if err = d . Decode ( & msg ) ; err != nil {
c . log . Infof ( "FOOOOOOOOOOOO: got %v from json.Unmarshal, payload: %s" , err , string ( readMsg . payload ) )
return 0 , fmt . Errorf ( "error umarshalling resize message: %w" , err )
}
c . log . Infof ( "FOOOOOOOOOOOO: got resize message: %v" , msg )
count ++
}
if count == 0 {
c . log . Infof ( "FOOOOOOOOOOOO: unexpectedly no resize messages, payload: %s" , string ( readMsg . payload ) )
return 0 , fmt . Errorf ( "no resize messages in payload: %s" , string ( readMsg . payload ) )
2025-07-25 13:55:25 +01:00
}
2024-09-03 20:42:02 +03:00
2025-07-25 19:10:53 +01:00
c . ch . Width = msg . Width
c . ch . Height = msg . Height
2024-09-03 20:42:02 +03:00
2025-07-25 19:10:53 +01:00
var isInitialResize bool
c . writeCastHeaderOnce . Do ( func ( ) {
isInitialResize = true
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
err = c . rec . WriteCastHeader ( c . ch )
close ( c . initialCastHeaderSent )
} )
if err != nil {
c . log . Infof ( "FOOOOOOOOOOOO: got %v from rec.WriteCastHeader" , err )
return 0 , fmt . Errorf ( "error writing CastHeader: %w" , err )
}
2025-07-14 15:17:20 +01:00
2025-07-25 19:10:53 +01:00
if ! isInitialResize {
if err := c . rec . WriteResize ( msg . Height , msg . Width ) ; err != nil {
c . log . Infof ( "FOOOOOOOOOOOO: got %v from rec.WriteResize" , err )
return 0 , fmt . Errorf ( "error writing resize message: %w" , err )
}
2024-09-03 20:42:02 +03:00
}
}
2024-08-14 19:57:50 +03:00
}
2025-07-25 19:10:53 +01:00
c . currentReadMsg = readMsg
2024-08-14 19:57:50 +03:00
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
return n , nil
}
// Write parses the written bytes as WebSocket message fragment. If the message
// is for stdout or stderr streams, it is written to the configured tsrecorder.
// A message fragment can be incomplete.
func ( c * conn ) Write ( b [ ] byte ) ( int , error ) {
c . wmu . Lock ( )
defer c . wmu . Unlock ( )
if len ( b ) == 0 {
c . log . Debug ( "[unexpected] Write called with 0 bytes" )
return 0 , nil
}
typ := messageType ( opcode ( b ) )
// If we are in process of parsing a message fragment, the received
// bytes are not structured as a message fragment and can not be used to
// determine a message fragment.
if c . writeBufHasIncompleteFragment ( ) { // buffer contains previous incomplete fragment
var err error
if typ , err = c . curWriteMsgType ( ) ; err != nil {
return 0 , err
}
}
if isControlMessage ( typ ) {
return c . Conn . Write ( b )
}
writeMsg := & message { typ : typ } // start a new message...
// ... or continue the existing one if it has not been finalized.
if c . writeMsgIsIncomplete ( ) || c . writeBufHasIncompleteFragment ( ) {
writeMsg = c . currentWriteMsg
}
if _ , err := c . writeBuf . Write ( b ) ; err != nil {
c . log . Errorf ( "write: error writing to write buf: %v" , err )
return 0 , fmt . Errorf ( "[unexpected] error writing to internal write buffer: %w" , err )
}
ok , err := writeMsg . Parse ( c . writeBuf . Bytes ( ) , c . log )
if err != nil {
c . log . Errorf ( "write: parsing a message errored: %v" , err )
return 0 , fmt . Errorf ( "write: error parsing message: %v" , err )
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
c . currentWriteMsg = writeMsg
if ! ok { // incomplete fragment
return len ( b ) , nil
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
c . writeBuf . Next ( len ( writeMsg . raw ) ) // advance frame
if len ( writeMsg . payload ) != 0 && writeMsg . isFinalized {
if writeMsg . streamID . Load ( ) == remotecommand . StreamStdOut || writeMsg . streamID . Load ( ) == remotecommand . StreamStdErr {
2025-07-14 15:17:20 +01:00
// we must wait for confirmation that the initial cast header was sent before proceeding with any more writes
select {
case <- c . ctx . Done ( ) :
return 0 , c . ctx . Err ( )
case <- c . initialCastHeaderSent :
if err := c . rec . WriteOutput ( writeMsg . payload ) ; err != nil {
return 0 , fmt . Errorf ( "error writing message to recorder: %w" , err )
2024-08-14 19:57:50 +03:00
}
}
}
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
_ , err = c . Conn . Write ( c . currentWriteMsg . raw )
if err != nil {
c . log . Errorf ( "write: error writing to conn: %v" , err )
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
return len ( b ) , nil
}
func ( c * conn ) Close ( ) error {
c . wmu . Lock ( )
defer c . wmu . Unlock ( )
if c . closed {
return nil
}
c . closed = true
connCloseErr := c . Conn . Close ( )
recCloseErr := c . rec . Close ( )
return multierr . New ( connCloseErr , recCloseErr )
}
// writeBufHasIncompleteFragment returns true if the latest data message
// fragment written to the connection was incomplete and the following write
// must be the remaining payload bytes of that fragment.
func ( c * conn ) writeBufHasIncompleteFragment ( ) bool {
return c . writeBuf . Len ( ) != 0
}
// readBufHasIncompleteFragment returns true if the latest data message
// fragment read from the connection was incomplete and the following read
// must be the remaining payload bytes of that fragment.
func ( c * conn ) readBufHasIncompleteFragment ( ) bool {
return c . readBuf . Len ( ) != 0
}
// writeMsgIsIncomplete returns true if the latest WebSocket message written to
// the connection was fragmented and the next data message fragment written to
// the connection must be a fragment of that message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
func ( c * conn ) writeMsgIsIncomplete ( ) bool {
return c . currentWriteMsg != nil && ! c . currentWriteMsg . isFinalized
}
// readMsgIsIncomplete returns true if the latest WebSocket message written to
// the connection was fragmented and the next data message fragment written to
// the connection must be a fragment of that message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
func ( c * conn ) readMsgIsIncomplete ( ) bool {
return c . currentReadMsg != nil && ! c . currentReadMsg . isFinalized
}
2025-07-14 15:17:20 +01:00
2024-08-14 19:57:50 +03:00
func ( c * conn ) curReadMsgType ( ) ( messageType , error ) {
if c . currentReadMsg != nil {
return c . currentReadMsg . typ , nil
}
return 0 , errors . New ( "[unexpected] attempted to determine type for nil message" )
}
func ( c * conn ) curWriteMsgType ( ) ( messageType , error ) {
if c . currentWriteMsg != nil {
return c . currentWriteMsg . typ , nil
}
return 0 , errors . New ( "[unexpected] attempted to determine type for nil message" )
}
// opcode reads the websocket message opcode that denotes the message type.
// opcode is contained in bits [4-8] of the message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
func opcode ( b [ ] byte ) int {
// 0xf = 00001111; b & 00001111 zeroes out bits [0 - 3] of b
var mask byte = 0xf
return int ( b [ 0 ] & mask )
}