2024-08-14 16:57:50 +00:00
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// package ws has functionality to parse 'kubectl exec' sessions streamed using
// WebSocket protocol.
package ws
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/remotecommand"
"tailscale.com/k8s-operator/sessionrecording/tsrecorder"
"tailscale.com/sessionrecording"
"tailscale.com/util/multierr"
)
// New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
// The connection must be a hijacked connection for a 'kubectl exec' session using WebSocket protocol and a *.channel.k8s.io subprotocol.
// The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes.
// Data read from the underlying network connection is data sent via one of the streams from the client to the container.
// Data written to the underlying connection is data sent from the container to the client.
2024-09-03 17:42:02 +00:00
// We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header.
2024-08-14 16:57:50 +00:00
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio
2024-09-03 17:42:02 +00:00
func New ( c net . Conn , rec * tsrecorder . Client , ch sessionrecording . CastHeader , hasTerm bool , log * zap . SugaredLogger ) net . Conn {
2024-08-14 16:57:50 +00:00
return & conn {
2024-09-03 17:42:02 +00:00
Conn : c ,
rec : rec ,
ch : ch ,
hasTerm : hasTerm ,
log : log ,
initialTermSizeSet : make ( chan struct { } , 1 ) ,
2024-08-14 16:57:50 +00:00
}
}
// conn is a wrapper around net.Conn. It reads the bytestream
// for a 'kubectl exec' session, sends session recording data to the configured
// recorder and forwards the raw bytes to the original destination.
// A new conn is created per session.
// conn only knows to how to read a 'kubectl exec' session that is streamed using WebSocket protocol.
// https://www.rfc-editor.org/rfc/rfc6455
type conn struct {
net . Conn
// rec knows how to send data to a tsrecorder instance.
rec * tsrecorder . Client
2024-09-03 17:42:02 +00:00
// The following fields are related to sending asciinema CastHeader.
// CastHeader must be sent before any payload. If the session has a
// terminal attached, the CastHeader must have '.Width' and '.Height'
// fields set for the tsrecorder UI to be able to play the recording.
// For 'kubectl exec' sessions, terminal width and height are sent as a
// resize message on resize stream from the client when the session
// starts as well as at any time the client detects a terminal change.
// We can intercept the resize message on Read calls. As there is no
// guarantee that the resize message from client will be intercepted
// before server writes stdout messages that we must record, we need to
// ensure that parsing stdout/stderr messages written to the connection
// waits till a resize message has been received and a CastHeader with
// correct terminal dimensions can be written.
// ch is asciinema CastHeader for the current session.
// https://docs.asciinema.org/manual/asciicast/v2/#header
ch sessionrecording . CastHeader
// writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once.
writeCastHeaderOnce sync . Once
hasTerm bool // whether the session has TTY attached
// initialTermSizeSet channel gets sent a value once, when the Read has
// received a resize message and set the initial terminal size. It must
// be set to a buffered channel to prevent Reads being blocked on the
// first stdout/stderr write reading from the channel.
initialTermSizeSet chan struct { }
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
// initialTermSizeSet channel only once, when the initial resize message
// is received.
sendInitialTermSizeSetOnce sync . Once
2024-08-14 16:57:50 +00:00
log * zap . SugaredLogger
rmu sync . Mutex // sequences reads
// currentReadMsg contains parsed contents of a websocket binary data message that
// is currently being read from the underlying net.Conn.
currentReadMsg * message
// readBuf contains bytes for a currently parsed binary data message
// read from the underlying conn. If the message is masked, it is
// unmasked in place, so having this buffer allows us to avoid modifying
// the original byte array.
readBuf bytes . Buffer
2024-09-03 17:42:02 +00:00
wmu sync . Mutex // sequences writes
closed bool // connection is closed
2024-08-14 16:57:50 +00:00
// writeBuf contains bytes for a currently parsed binary data message
// being written to the underlying conn. If the message is masked, it is
// unmasked in place, so having this buffer allows us to avoid modifying
// the original byte array.
writeBuf bytes . Buffer
// currentWriteMsg contains parsed contents of a websocket binary data message that
// is currently being written to the underlying net.Conn.
currentWriteMsg * message
}
// Read reads bytes from the original connection and parses them as websocket
// message fragments.
// Bytes read from the original connection are the bytes sent from the Kubernetes client (kubectl) to the destination container via kubelet.
// If the message is for the resize stream, sets the width
// and height of the CastHeader for this connection.
// The fragment can be incomplete.
func ( c * conn ) Read ( b [ ] byte ) ( int , error ) {
c . rmu . Lock ( )
defer c . rmu . Unlock ( )
n , err := c . Conn . Read ( b )
if err != nil {
// It seems that we sometimes get a wrapped io.EOF, but the
// caller checks for io.EOF with ==.
if errors . Is ( err , io . EOF ) {
err = io . EOF
}
return 0 , err
}
if n == 0 {
c . log . Debug ( "[unexpected] Read called for 0 length bytes" )
return 0 , nil
}
typ := messageType ( opcode ( b ) )
if ( typ == noOpcode && c . readMsgIsIncomplete ( ) ) || c . readBufHasIncompleteFragment ( ) { // subsequent fragment
if typ , err = c . curReadMsgType ( ) ; err != nil {
return 0 , err
}
}
// A control message can not be fragmented and we are not interested in
// these messages. Just return.
if isControlMessage ( typ ) {
return n , nil
}
// The only data message type that Kubernetes supports is binary message.
// If we received another message type, return and let the API server close the connection.
// https://github.com/kubernetes/client-go/blob/release-1.30/tools/remotecommand/websocket.go#L281
if typ != binaryMessage {
c . log . Infof ( "[unexpected] received a data message with a type that is not binary message type %v" , typ )
return n , nil
}
readMsg := & message { typ : typ } // start a new message...
// ... or pick up an already started one if the previous fragment was not final.
if c . readMsgIsIncomplete ( ) || c . readBufHasIncompleteFragment ( ) {
readMsg = c . currentReadMsg
}
if _ , err := c . readBuf . Write ( b [ : n ] ) ; err != nil {
return 0 , fmt . Errorf ( "[unexpected] error writing message contents to read buffer: %w" , err )
}
ok , err := readMsg . Parse ( c . readBuf . Bytes ( ) , c . log )
if err != nil {
return 0 , fmt . Errorf ( "error parsing message: %v" , err )
}
if ! ok { // incomplete fragment
return n , nil
}
c . readBuf . Next ( len ( readMsg . raw ) )
2024-09-03 17:42:02 +00:00
if readMsg . isFinalized && ! c . readMsgIsIncomplete ( ) {
2024-08-14 16:57:50 +00:00
// Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg . streamID . Load ( ) == remotecommand . StreamResize {
var msg tsrecorder . ResizeMsg
if err = json . Unmarshal ( readMsg . payload , & msg ) ; err != nil {
return 0 , fmt . Errorf ( "error umarshalling resize message: %w" , err )
}
2024-09-03 17:42:02 +00:00
2024-08-14 16:57:50 +00:00
c . ch . Width = msg . Width
c . ch . Height = msg . Height
2024-09-03 17:42:02 +00:00
// If this is initial resize message, the width and
// height will be sent in the CastHeader. If this is a
// subsequent resize message, we need to send asciinema
// resize message.
var isInitialResize bool
c . sendInitialTermSizeSetOnce . Do ( func ( ) {
isInitialResize = true
close ( c . initialTermSizeSet ) // unblock sending of CastHeader
} )
if ! isInitialResize {
if err := c . rec . WriteResize ( c . ch . Height , c . ch . Width ) ; err != nil {
return 0 , fmt . Errorf ( "error writing resize message: %w" , err )
}
}
2024-08-14 16:57:50 +00:00
}
}
c . currentReadMsg = readMsg
return n , nil
}
// Write parses the written bytes as WebSocket message fragment. If the message
// is for stdout or stderr streams, it is written to the configured tsrecorder.
// A message fragment can be incomplete.
func ( c * conn ) Write ( b [ ] byte ) ( int , error ) {
c . wmu . Lock ( )
defer c . wmu . Unlock ( )
if len ( b ) == 0 {
c . log . Debug ( "[unexpected] Write called with 0 bytes" )
return 0 , nil
}
typ := messageType ( opcode ( b ) )
// If we are in process of parsing a message fragment, the received
// bytes are not structured as a message fragment and can not be used to
// determine a message fragment.
if c . writeBufHasIncompleteFragment ( ) { // buffer contains previous incomplete fragment
var err error
if typ , err = c . curWriteMsgType ( ) ; err != nil {
return 0 , err
}
}
if isControlMessage ( typ ) {
return c . Conn . Write ( b )
}
writeMsg := & message { typ : typ } // start a new message...
// ... or continue the existing one if it has not been finalized.
if c . writeMsgIsIncomplete ( ) || c . writeBufHasIncompleteFragment ( ) {
writeMsg = c . currentWriteMsg
}
if _ , err := c . writeBuf . Write ( b ) ; err != nil {
c . log . Errorf ( "write: error writing to write buf: %v" , err )
return 0 , fmt . Errorf ( "[unexpected] error writing to internal write buffer: %w" , err )
}
ok , err := writeMsg . Parse ( c . writeBuf . Bytes ( ) , c . log )
if err != nil {
c . log . Errorf ( "write: parsing a message errored: %v" , err )
return 0 , fmt . Errorf ( "write: error parsing message: %v" , err )
}
c . currentWriteMsg = writeMsg
if ! ok { // incomplete fragment
return len ( b ) , nil
}
c . writeBuf . Next ( len ( writeMsg . raw ) ) // advance frame
if len ( writeMsg . payload ) != 0 && writeMsg . isFinalized {
if writeMsg . streamID . Load ( ) == remotecommand . StreamStdOut || writeMsg . streamID . Load ( ) == remotecommand . StreamStdErr {
var err error
c . writeCastHeaderOnce . Do ( func ( ) {
2024-09-03 17:42:02 +00:00
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
if c . hasTerm {
c . log . Debug ( "waiting for terminal size to be set before starting to send recorded data" )
<- c . initialTermSizeSet
2024-08-14 16:57:50 +00:00
}
2024-09-03 17:42:02 +00:00
err = c . rec . WriteCastHeader ( c . ch )
2024-08-14 16:57:50 +00:00
} )
if err != nil {
return 0 , fmt . Errorf ( "error writing CastHeader: %w" , err )
}
2024-09-03 17:42:02 +00:00
if err := c . rec . WriteOutput ( writeMsg . payload ) ; err != nil {
2024-08-14 16:57:50 +00:00
return 0 , fmt . Errorf ( "error writing message to recorder: %v" , err )
}
}
}
_ , err = c . Conn . Write ( c . currentWriteMsg . raw )
if err != nil {
c . log . Errorf ( "write: error writing to conn: %v" , err )
}
return len ( b ) , nil
}
func ( c * conn ) Close ( ) error {
c . wmu . Lock ( )
defer c . wmu . Unlock ( )
if c . closed {
return nil
}
c . closed = true
connCloseErr := c . Conn . Close ( )
recCloseErr := c . rec . Close ( )
return multierr . New ( connCloseErr , recCloseErr )
}
// writeBufHasIncompleteFragment returns true if the latest data message
// fragment written to the connection was incomplete and the following write
// must be the remaining payload bytes of that fragment.
func ( c * conn ) writeBufHasIncompleteFragment ( ) bool {
return c . writeBuf . Len ( ) != 0
}
// readBufHasIncompleteFragment returns true if the latest data message
// fragment read from the connection was incomplete and the following read
// must be the remaining payload bytes of that fragment.
func ( c * conn ) readBufHasIncompleteFragment ( ) bool {
return c . readBuf . Len ( ) != 0
}
// writeMsgIsIncomplete returns true if the latest WebSocket message written to
// the connection was fragmented and the next data message fragment written to
// the connection must be a fragment of that message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
func ( c * conn ) writeMsgIsIncomplete ( ) bool {
return c . currentWriteMsg != nil && ! c . currentWriteMsg . isFinalized
}
// readMsgIsIncomplete returns true if the latest WebSocket message written to
// the connection was fragmented and the next data message fragment written to
// the connection must be a fragment of that message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
func ( c * conn ) readMsgIsIncomplete ( ) bool {
return c . currentReadMsg != nil && ! c . currentReadMsg . isFinalized
}
func ( c * conn ) curReadMsgType ( ) ( messageType , error ) {
if c . currentReadMsg != nil {
return c . currentReadMsg . typ , nil
}
return 0 , errors . New ( "[unexpected] attempted to determine type for nil message" )
}
func ( c * conn ) curWriteMsgType ( ) ( messageType , error ) {
if c . currentWriteMsg != nil {
return c . currentWriteMsg . typ , nil
}
return 0 , errors . New ( "[unexpected] attempted to determine type for nil message" )
}
// opcode reads the websocket message opcode that denotes the message type.
// opcode is contained in bits [4-8] of the message.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
func opcode ( b [ ] byte ) int {
// 0xf = 00001111; b & 00001111 zeroes out bits [0 - 3] of b
var mask byte = 0xf
return int ( b [ 0 ] & mask )
}