mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-25 02:02:51 +00:00 
			
		
		
		
	 a15ff1bade
			
		
	
	a15ff1bade
	
	
	
		
			
			cmd/k8s-operator,k8s-operator/sessionrecording: support recording WebSocket sessions Kubernetes currently supports two streaming protocols, SPDY and WebSockets. WebSockets are replacing SPDY, see https://github.com/kubernetes/enhancements/issues/4006. We were currently only supporting SPDY, erroring out if session was not SPDY and relying on the kube's built-in SPDY fallback. This PR: - adds support for parsing contents of 'kubectl exec' sessions streamed over WebSockets - adds logic to distinguish 'kubectl exec' requests for a SPDY/WebSockets sessions and call the relevant handler Updates tailscale/corp#19821 Signed-off-by: Irbe Krumina <irbe@tailscale.com> Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
		
			
				
	
	
		
			302 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| //go:build !plan9
 | |
| 
 | |
| // package ws has functionality to parse 'kubectl exec' sessions streamed using
 | |
| // WebSocket protocol.
 | |
| package ws
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 
 | |
| 	"go.uber.org/zap"
 | |
| 	"k8s.io/apimachinery/pkg/util/remotecommand"
 | |
| 	"tailscale.com/k8s-operator/sessionrecording/tsrecorder"
 | |
| 	"tailscale.com/sessionrecording"
 | |
| 	"tailscale.com/util/multierr"
 | |
| )
 | |
| 
 | |
| // New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
 | |
| // The connection must be a hijacked connection for a 'kubectl exec' session using WebSocket protocol and a *.channel.k8s.io subprotocol.
 | |
| // The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes.
 | |
| // Data read from the underlying network connection is data sent via one of the streams from the client to the container.
 | |
| // Data written to the underlying connection is data sent from the container to the client.
 | |
| // We parse the data and send everything for the STDOUT/STDERR streams to the configured tsrecorder as an asciinema recording with the provided header.
 | |
| // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio
 | |
| func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, log *zap.SugaredLogger) net.Conn {
 | |
| 	return &conn{
 | |
| 		Conn: c,
 | |
| 		rec:  rec,
 | |
| 		ch:   ch,
 | |
| 		log:  log,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // conn is a wrapper around net.Conn. It reads the bytestream
 | |
| // for a 'kubectl exec' session, sends session recording data to the configured
 | |
| // recorder and forwards the raw bytes to the original destination.
 | |
| // A new conn is created per session.
 | |
| // conn only knows to how to read a 'kubectl exec' session that is streamed using WebSocket protocol.
 | |
| // https://www.rfc-editor.org/rfc/rfc6455
 | |
| type conn struct {
 | |
| 	net.Conn
 | |
| 	// rec knows how to send data to a tsrecorder instance.
 | |
| 	rec *tsrecorder.Client
 | |
| 	// ch is the asiinema CastHeader for a session.
 | |
| 	ch  sessionrecording.CastHeader
 | |
| 	log *zap.SugaredLogger
 | |
| 
 | |
| 	rmu sync.Mutex // sequences reads
 | |
| 	// currentReadMsg contains parsed contents of a websocket binary data message that
 | |
| 	// is currently being read from the underlying net.Conn.
 | |
| 	currentReadMsg *message
 | |
| 	// readBuf contains bytes for a currently parsed binary data message
 | |
| 	// read from the underlying conn. If the message is masked, it is
 | |
| 	// unmasked in place, so having this buffer allows us to avoid modifying
 | |
| 	// the original byte array.
 | |
| 	readBuf bytes.Buffer
 | |
| 
 | |
| 	wmu                 sync.Mutex // sequences writes
 | |
| 	writeCastHeaderOnce sync.Once
 | |
| 	closed              bool // connection is closed
 | |
| 	// writeBuf contains bytes for a currently parsed binary data message
 | |
| 	// being written to the underlying conn. If the message is masked, it is
 | |
| 	// unmasked in place, so having this buffer allows us to avoid modifying
 | |
| 	// the original byte array.
 | |
| 	writeBuf bytes.Buffer
 | |
| 	// currentWriteMsg contains parsed contents of a websocket binary data message that
 | |
| 	// is currently being written to the underlying net.Conn.
 | |
| 	currentWriteMsg *message
 | |
| }
 | |
| 
 | |
| // Read reads bytes from the original connection and parses them as websocket
 | |
| // message fragments.
 | |
| // Bytes read from the original connection are the bytes sent from the Kubernetes client (kubectl) to the destination container via kubelet.
 | |
| 
 | |
| // If the message is for the resize stream, sets the width
 | |
| // and height of the CastHeader for this connection.
 | |
| // The fragment can be incomplete.
 | |
| func (c *conn) Read(b []byte) (int, error) {
 | |
| 	c.rmu.Lock()
 | |
| 	defer c.rmu.Unlock()
 | |
| 	n, err := c.Conn.Read(b)
 | |
| 	if err != nil {
 | |
| 		// It seems that we sometimes get a wrapped io.EOF, but the
 | |
| 		// caller checks for io.EOF with ==.
 | |
| 		if errors.Is(err, io.EOF) {
 | |
| 			err = io.EOF
 | |
| 		}
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	if n == 0 {
 | |
| 		c.log.Debug("[unexpected] Read called for 0 length bytes")
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 
 | |
| 	typ := messageType(opcode(b))
 | |
| 	if (typ == noOpcode && c.readMsgIsIncomplete()) || c.readBufHasIncompleteFragment() { // subsequent fragment
 | |
| 		if typ, err = c.curReadMsgType(); err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// A control message can not be fragmented and we are not interested in
 | |
| 	// these messages. Just return.
 | |
| 	if isControlMessage(typ) {
 | |
| 		return n, nil
 | |
| 	}
 | |
| 
 | |
| 	// The only data message type that Kubernetes supports is binary message.
 | |
| 	// If we received another message type, return and let the API server close the connection.
 | |
| 	// https://github.com/kubernetes/client-go/blob/release-1.30/tools/remotecommand/websocket.go#L281
 | |
| 	if typ != binaryMessage {
 | |
| 		c.log.Infof("[unexpected] received a data message with a type that is not binary message type %v", typ)
 | |
| 		return n, nil
 | |
| 	}
 | |
| 
 | |
| 	readMsg := &message{typ: typ} // start a new message...
 | |
| 	// ... or pick up an already started one if the previous fragment was not final.
 | |
| 	if c.readMsgIsIncomplete() || c.readBufHasIncompleteFragment() {
 | |
| 		readMsg = c.currentReadMsg
 | |
| 	}
 | |
| 
 | |
| 	if _, err := c.readBuf.Write(b[:n]); err != nil {
 | |
| 		return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
 | |
| 	if err != nil {
 | |
| 		return 0, fmt.Errorf("error parsing message: %v", err)
 | |
| 	}
 | |
| 	if !ok { // incomplete fragment
 | |
| 		return n, nil
 | |
| 	}
 | |
| 	c.readBuf.Next(len(readMsg.raw))
 | |
| 
 | |
| 	if readMsg.isFinalized {
 | |
| 		// Stream IDs for websocket streams are static.
 | |
| 		// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
 | |
| 		if readMsg.streamID.Load() == remotecommand.StreamResize {
 | |
| 			var err error
 | |
| 			var msg tsrecorder.ResizeMsg
 | |
| 			if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
 | |
| 				return 0, fmt.Errorf("error umarshalling resize message: %w", err)
 | |
| 			}
 | |
| 			c.ch.Width = msg.Width
 | |
| 			c.ch.Height = msg.Height
 | |
| 		}
 | |
| 	}
 | |
| 	c.currentReadMsg = readMsg
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| // Write parses the written bytes as WebSocket message fragment. If the message
 | |
| // is for stdout or stderr streams, it is written to the configured tsrecorder.
 | |
| // A message fragment can be incomplete.
 | |
| func (c *conn) Write(b []byte) (int, error) {
 | |
| 	c.wmu.Lock()
 | |
| 	defer c.wmu.Unlock()
 | |
| 	if len(b) == 0 {
 | |
| 		c.log.Debug("[unexpected] Write called with 0 bytes")
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 
 | |
| 	typ := messageType(opcode(b))
 | |
| 	// If we are in process of parsing a message fragment, the received
 | |
| 	// bytes are not structured as a message fragment and can not be used to
 | |
| 	// determine a message fragment.
 | |
| 	if c.writeBufHasIncompleteFragment() { // buffer contains previous incomplete fragment
 | |
| 		var err error
 | |
| 		if typ, err = c.curWriteMsgType(); err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if isControlMessage(typ) {
 | |
| 		return c.Conn.Write(b)
 | |
| 	}
 | |
| 
 | |
| 	writeMsg := &message{typ: typ} // start a new message...
 | |
| 	// ... or continue the existing one if it has not been finalized.
 | |
| 	if c.writeMsgIsIncomplete() || c.writeBufHasIncompleteFragment() {
 | |
| 		writeMsg = c.currentWriteMsg
 | |
| 	}
 | |
| 
 | |
| 	if _, err := c.writeBuf.Write(b); err != nil {
 | |
| 		c.log.Errorf("write: error writing to write buf: %v", err)
 | |
| 		return 0, fmt.Errorf("[unexpected] error writing to internal write buffer: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	ok, err := writeMsg.Parse(c.writeBuf.Bytes(), c.log)
 | |
| 	if err != nil {
 | |
| 		c.log.Errorf("write: parsing a message errored: %v", err)
 | |
| 		return 0, fmt.Errorf("write: error parsing message: %v", err)
 | |
| 	}
 | |
| 	c.currentWriteMsg = writeMsg
 | |
| 	if !ok { // incomplete fragment
 | |
| 		return len(b), nil
 | |
| 	}
 | |
| 	c.writeBuf.Next(len(writeMsg.raw)) // advance frame
 | |
| 
 | |
| 	if len(writeMsg.payload) != 0 && writeMsg.isFinalized {
 | |
| 		if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr {
 | |
| 			var err error
 | |
| 			c.writeCastHeaderOnce.Do(func() {
 | |
| 				var j []byte
 | |
| 				j, err = json.Marshal(c.ch)
 | |
| 				if err != nil {
 | |
| 					c.log.Errorf("error marhsalling conn: %v", err)
 | |
| 					return
 | |
| 				}
 | |
| 				j = append(j, '\n')
 | |
| 				err = c.rec.WriteCastLine(j)
 | |
| 				if err != nil {
 | |
| 					c.log.Errorf("received error from recorder: %v", err)
 | |
| 				}
 | |
| 			})
 | |
| 			if err != nil {
 | |
| 				return 0, fmt.Errorf("error writing CastHeader: %w", err)
 | |
| 			}
 | |
| 			if err := c.rec.Write(writeMsg.payload); err != nil {
 | |
| 				return 0, fmt.Errorf("error writing message to recorder: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	_, err = c.Conn.Write(c.currentWriteMsg.raw)
 | |
| 	if err != nil {
 | |
| 		c.log.Errorf("write: error writing to conn: %v", err)
 | |
| 	}
 | |
| 	return len(b), nil
 | |
| }
 | |
| 
 | |
| func (c *conn) Close() error {
 | |
| 	c.wmu.Lock()
 | |
| 	defer c.wmu.Unlock()
 | |
| 	if c.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 	c.closed = true
 | |
| 	connCloseErr := c.Conn.Close()
 | |
| 	recCloseErr := c.rec.Close()
 | |
| 	return multierr.New(connCloseErr, recCloseErr)
 | |
| }
 | |
| 
 | |
| // writeBufHasIncompleteFragment returns true if the latest data message
 | |
| // fragment written to the connection was incomplete and the following write
 | |
| // must be the remaining payload bytes of that fragment.
 | |
| func (c *conn) writeBufHasIncompleteFragment() bool {
 | |
| 	return c.writeBuf.Len() != 0
 | |
| }
 | |
| 
 | |
| // readBufHasIncompleteFragment returns true if the latest data message
 | |
| // fragment read from the connection was incomplete and the following read
 | |
| // must be the remaining payload bytes of that fragment.
 | |
| func (c *conn) readBufHasIncompleteFragment() bool {
 | |
| 	return c.readBuf.Len() != 0
 | |
| }
 | |
| 
 | |
| // writeMsgIsIncomplete returns true if the latest WebSocket message written to
 | |
| // the connection was fragmented and the next data message fragment written to
 | |
| // the connection must be a fragment of that message.
 | |
| // https://www.rfc-editor.org/rfc/rfc6455#section-5.4
 | |
| func (c *conn) writeMsgIsIncomplete() bool {
 | |
| 	return c.currentWriteMsg != nil && !c.currentWriteMsg.isFinalized
 | |
| }
 | |
| 
 | |
| // readMsgIsIncomplete returns true if the latest WebSocket message written to
 | |
| // the connection was fragmented and the next data message fragment written to
 | |
| // the connection must be a fragment of that message.
 | |
| // https://www.rfc-editor.org/rfc/rfc6455#section-5.4
 | |
| func (c *conn) readMsgIsIncomplete() bool {
 | |
| 	return c.currentReadMsg != nil && !c.currentReadMsg.isFinalized
 | |
| }
 | |
| func (c *conn) curReadMsgType() (messageType, error) {
 | |
| 	if c.currentReadMsg != nil {
 | |
| 		return c.currentReadMsg.typ, nil
 | |
| 	}
 | |
| 	return 0, errors.New("[unexpected] attempted to determine type for nil message")
 | |
| }
 | |
| 
 | |
| func (c *conn) curWriteMsgType() (messageType, error) {
 | |
| 	if c.currentWriteMsg != nil {
 | |
| 		return c.currentWriteMsg.typ, nil
 | |
| 	}
 | |
| 	return 0, errors.New("[unexpected] attempted to determine type for nil message")
 | |
| }
 | |
| 
 | |
| // opcode reads the websocket message opcode that denotes the message type.
 | |
| // opcode is contained in bits [4-8] of the message.
 | |
| // https://www.rfc-editor.org/rfc/rfc6455#section-5.2
 | |
| func opcode(b []byte) int {
 | |
| 	// 0xf = 00001111; b & 00001111 zeroes out bits [0 - 3] of b
 | |
| 	var mask byte = 0xf
 | |
| 	return int(b[0] & mask)
 | |
| }
 |