2024-08-14 19:57:50 +03:00
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package ws
import (
"encoding/binary"
"fmt"
"sync/atomic"
"go.uber.org/zap"
"golang.org/x/net/websocket"
)
const (
noOpcode messageType = 0 // continuation frame for fragmented messages
binaryMessage messageType = 2
)
// messageType is the type of a websocket data or control message as defined by opcode.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
// Known types of control messages are close, ping and pong.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.5
// The only data message type supported by Kubernetes is binary message
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L281
type messageType int
// message is a parsed Websocket Message.
type message struct {
// payload is the contents of the so far parsed Websocket
// data Message payload, potentially from multiple fragments written by
// multiple invocations of Parse. As per RFC 6455 We can assume that the
// fragments will always arrive in order and data messages will not be
// interleaved.
payload [ ] byte
// isFinalized is set to true if msgPayload contains full contents of
// the message (the final fragment has been received).
isFinalized bool
// streamID is the stream to which the message belongs, i.e stdin, stout
// etc. It is one of the stream IDs defined in
// https://github.com/kubernetes/apimachinery/blob/73d12d09c5be8703587b5127416eb83dc3b7e182/pkg/util/httpstream/wsstream/doc.go#L23-L36
streamID atomic . Uint32
// typ is the type of a WebsocketMessage as defined by its opcode
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
typ messageType
raw [ ] byte
}
// Parse accepts a websocket message fragment as a byte slice and parses its contents.
// It returns true if the fragment is complete, false if the fragment is incomplete.
// If the fragment is incomplete, Parse will be called again with the same fragment + more bytes when those are received.
// If the fragment is complete, it will be parsed into msg.
// A complete fragment can be:
// - a fragment that consists of a whole message
// - an initial fragment for a message for which we expect more fragments
// - a subsequent fragment for a message that we are currently parsing and whose so-far parsed contents are stored in msg.
// Parse must not be called with bytes that don't contain fragment header (so, no less than 2 bytes).
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
//
// Fragmentation rules:
// An unfragmented message consists of a single frame with the FIN
// bit set (Section 5.2) and an opcode other than 0.
// A fragmented message consists of a single frame with the FIN bit
// clear and an opcode other than 0, followed by zero or more frames
// with the FIN bit clear and the opcode set to 0, and terminated by
// a single frame with the FIN bit set and an opcode of 0.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
func ( msg * message ) Parse ( b [ ] byte , log * zap . SugaredLogger ) ( bool , error ) {
if len ( b ) < 2 {
return false , fmt . Errorf ( "[unexpected] Parse should not be called with less than 2 bytes, got %d bytes" , len ( b ) )
}
if msg . typ != binaryMessage {
return false , fmt . Errorf ( "[unexpected] internal error: attempted to parse a message with type %d" , msg . typ )
}
isInitialFragment := len ( msg . raw ) == 0
msg . isFinalized = isFinalFragment ( b )
maskSet := isMasked ( b )
payloadLength , payloadOffset , maskOffset , err := fragmentDimensions ( b , maskSet )
if err != nil {
return false , fmt . Errorf ( "error determining payload length: %w" , err )
}
2025-07-25 19:10:53 +01:00
log . Infof ( "parse: parsing a message fragment with payload length: %d payload offset: %d maskOffset: %d mask set: %t, is finalized: %t, is initial fragment: %t" , payloadLength , payloadOffset , maskOffset , maskSet , msg . isFinalized , isInitialFragment )
2024-08-14 19:57:50 +03:00
if len ( b ) < int ( payloadOffset + payloadLength ) { // incomplete fragment
return false , nil
}
// TODO (irbekrm): perhaps only do this extra allocation if we know we
// will need to unmask?
msg . raw = make ( [ ] byte , int ( payloadOffset ) + int ( payloadLength ) )
copy ( msg . raw , b [ : payloadOffset + payloadLength ] )
// Extract the payload.
msgPayload := b [ payloadOffset : payloadOffset + payloadLength ]
// Unmask the payload if needed.
// TODO (irbekrm): instead of unmasking all of the payload each time,
// determine if the payload is for a resize message early and skip
// unmasking the remaining bytes if not.
if maskSet {
m := b [ maskOffset : payloadOffset ]
var mask [ 4 ] byte
copy ( mask [ : ] , m )
maskBytes ( mask , msgPayload )
}
// Determine what stream the message is for. Stream ID of a Kubernetes
// streaming session is a 32bit integer, stored in the first byte of the
// message payload.
// https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36
if len ( msgPayload ) == 0 {
2025-07-25 19:10:53 +01:00
// if !msg.isFinalized {
// return false, nil
// }
return false , fmt . Errorf ( "received a message fragment with no stream ID, initial fragment: %v, is finalised: %v" , isInitialFragment , msg . isFinalized )
2024-08-14 19:57:50 +03:00
}
2025-07-25 19:10:53 +01:00
// Stream ID will be one of the constants from:
// https://github.com/kubernetes/kubernetes/blob/f9ed14bf9b1119a2e091f4b487a3b54930661034/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57-L64
2024-08-14 19:57:50 +03:00
streamID := uint32 ( msgPayload [ 0 ] )
if ! isInitialFragment && msg . streamID . Load ( ) != streamID {
2025-07-25 19:10:53 +01:00
return false , fmt . Errorf ( "[unexpected] received message fragments with mismatched streamIDs %d and %d, payload: %s" , msg . streamID . Load ( ) , streamID , string ( append ( msg . payload , msgPayload ... ) ) )
2024-08-14 19:57:50 +03:00
}
msg . streamID . Store ( streamID )
// This is normal, Kubernetes seem to send a couple data messages with
// no payloads at the start.
if len ( msgPayload ) < 2 {
return true , nil
}
msgPayload = msgPayload [ 1 : ] // remove the stream ID byte
msg . payload = append ( msg . payload , msgPayload ... )
return true , nil
}
// maskBytes applies mask to bytes in place.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.3
func maskBytes ( key [ 4 ] byte , b [ ] byte ) {
for i := range b {
b [ i ] = b [ i ] ^ key [ i % 4 ]
}
}
// isControlMessage returns true if the message type is one of the known control
// frame message types.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.5
func isControlMessage ( t messageType ) bool {
const (
closeMessage messageType = 8
pingMessage messageType = 9
pongMessage messageType = 10
)
return t == closeMessage || t == pingMessage || t == pongMessage
}
// isFinalFragment can be called with websocket message fragment and returns true if
// the fragment is the final fragment of a websocket message.
func isFinalFragment ( b [ ] byte ) bool {
return extractFirstBit ( b [ 0 ] ) != 0
}
// isMasked can be called with a websocket message fragment and returns true if
// the payload of the message is masked. It uses the mask bit to determine if
// the payload is masked.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.3
func isMasked ( b [ ] byte ) bool {
return extractFirstBit ( b [ 1 ] ) != 0
}
// extractFirstBit extracts first bit of a byte by zeroing out all the other
// bits.
func extractFirstBit ( b byte ) byte {
return b & 0x80
}
// zeroFirstBit returns the provided byte with the first bit set to 0.
func zeroFirstBit ( b byte ) byte {
return b & 0x7f
}
// fragmentDimensions returns payload length as well as payload offset and mask offset.
func fragmentDimensions ( b [ ] byte , maskSet bool ) ( payloadLength , payloadOffset , maskOffset uint64 , _ error ) {
// payload length can be stored either in bits [9-15] or in bytes 2, 3
// or in bytes 2, 3, 4, 5, 6, 7.
// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
payloadLengthIndicator := zeroFirstBit ( b [ 1 ] )
switch {
case payloadLengthIndicator < 126 :
maskOffset = 2
payloadLength = uint64 ( payloadLengthIndicator )
case payloadLengthIndicator == 126 :
maskOffset = 4
if len ( b ) < int ( maskOffset ) {
return 0 , 0 , 0 , fmt . Errorf ( "invalid message fragment- length indicator suggests that length is stored in bytes 2:4, but message length is only %d" , len ( b ) )
}
payloadLength = uint64 ( binary . BigEndian . Uint16 ( b [ 2 : 4 ] ) )
case payloadLengthIndicator == 127 :
maskOffset = 10
if len ( b ) < int ( maskOffset ) {
return 0 , 0 , 0 , fmt . Errorf ( "invalid message fragment- length indicator suggests that length is stored in bytes 2:10, but message length is only %d" , len ( b ) )
}
payloadLength = binary . BigEndian . Uint64 ( b [ 2 : 10 ] )
default :
return 0 , 0 , 0 , fmt . Errorf ( "unexpected payload length indicator value: %v" , payloadLengthIndicator )
}
// Ensure that a rogue or broken client doesn't cause us attempt to
// allocate a huge array by setting a high payload size.
// websocket.DefaultMaxPayloadBytes is the maximum payload size accepted
// by server side of this connection, so we can safely reject messages
// with larger payload size.
if payloadLength > websocket . DefaultMaxPayloadBytes {
return 0 , 0 , 0 , fmt . Errorf ( "[unexpected]: too large payload size: %v" , payloadLength )
}
// Masking key can take up 0 or 4 bytes- we need to take that into
// account when determining payload offset.
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// ....
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// + - - - - - - - - - - - - - - - +-------------------------------+
// ...
if maskSet {
payloadOffset = maskOffset + 4
} else {
payloadOffset = maskOffset
}
return
}