mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-29 15:23:45 +00:00
k8s-operator: handle multiple WebSocket frames per read (#16678)
When kubectl starts an interactive attach session, it sends 2 resize messages in quick succession. It seems that particularly in HTTP mode, we often receive both of these WebSocket frames from the underlying connection in a single read. However, our parser currently assumes 0-1 frames per read, and leaves the second frame in the read buffer until the next read from the underlying connection. It doesn't take long after that before we end up failing to skip a control message as we normally should, and then we parse a control message as though it will have a stream ID (part of the Kubernetes protocol) and error out. Instead, we should keep parsing frames from the read buffer for as long as we're able to parse complete frames, so this commit refactors the messages parsing logic into a loop based on the contents of the read buffer being non-empty. k/k staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go for full details of the resize messages. There are at least a couple more multiple-frame read edge cases we should handle, but this commit is very conservatively fixing a single observed issue to make it a low-risk candidate for cherry picking. Updates #13358 Change-Id: Iafb91ad1cbeed9c5231a1525d4563164fc1f002f Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
parent
5731869565
commit
02084629e2
@ -114,8 +114,9 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
|
||||
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachWS)
|
||||
|
||||
ap.hs = &http.Server{
|
||||
Handler: mux,
|
||||
ErrorLog: zap.NewStdLog(ap.log.Desugar()),
|
||||
Handler: mux,
|
||||
ErrorLog: zap.NewStdLog(ap.log.Desugar()),
|
||||
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
|
||||
}
|
||||
|
||||
mode := "noauth"
|
||||
@ -140,7 +141,6 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
|
||||
GetCertificate: ap.lc.GetCertificate,
|
||||
NextProtos: []string{"http/1.1"},
|
||||
}
|
||||
ap.hs.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler))
|
||||
} else {
|
||||
var err error
|
||||
tsLn, err = ap.ts.Listen("tcp", ":80")
|
||||
|
@ -237,7 +237,6 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
|
||||
if err := lc.Close(); err != nil {
|
||||
h.log.Infof("error closing recorder connections: %v", err)
|
||||
}
|
||||
return
|
||||
}()
|
||||
return lc, nil
|
||||
}
|
||||
|
@ -148,6 +148,8 @@ func (c *conn) Read(b []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// TODO(tomhjp): If we get multiple frames in a single Read with different
|
||||
// types, we may parse the second frame with the wrong type.
|
||||
typ := messageType(opcode(b))
|
||||
if (typ == noOpcode && c.readMsgIsIncomplete()) || c.readBufHasIncompleteFragment() { // subsequent fragment
|
||||
if typ, err = c.curReadMsgType(); err != nil {
|
||||
@ -157,6 +159,8 @@ func (c *conn) Read(b []byte) (int, error) {
|
||||
|
||||
// A control message can not be fragmented and we are not interested in
|
||||
// these messages. Just return.
|
||||
// TODO(tomhjp): If we get multiple frames in a single Read, we may skip
|
||||
// some non-control messages.
|
||||
if isControlMessage(typ) {
|
||||
return n, nil
|
||||
}
|
||||
@ -169,62 +173,65 @@ func (c *conn) Read(b []byte) (int, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
readMsg := &message{typ: typ} // start a new message...
|
||||
// ... or pick up an already started one if the previous fragment was not final.
|
||||
if c.readMsgIsIncomplete() || c.readBufHasIncompleteFragment() {
|
||||
readMsg = c.currentReadMsg
|
||||
}
|
||||
|
||||
if _, err := c.readBuf.Write(b[:n]); err != nil {
|
||||
return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err)
|
||||
}
|
||||
|
||||
ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error parsing message: %v", err)
|
||||
}
|
||||
if !ok { // incomplete fragment
|
||||
return n, nil
|
||||
}
|
||||
c.readBuf.Next(len(readMsg.raw))
|
||||
for c.readBuf.Len() != 0 {
|
||||
readMsg := &message{typ: typ} // start a new message...
|
||||
// ... or pick up an already started one if the previous fragment was not final.
|
||||
if c.readMsgIsIncomplete() {
|
||||
readMsg = c.currentReadMsg
|
||||
}
|
||||
|
||||
if readMsg.isFinalized && !c.readMsgIsIncomplete() {
|
||||
// we want to send stream resize messages for terminal sessions
|
||||
// Stream IDs for websocket streams are static.
|
||||
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
|
||||
if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm {
|
||||
var msg tsrecorder.ResizeMsg
|
||||
if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
|
||||
return 0, fmt.Errorf("error umarshalling resize message: %w", err)
|
||||
}
|
||||
ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error parsing message: %v", err)
|
||||
}
|
||||
if !ok { // incomplete fragment
|
||||
return n, nil
|
||||
}
|
||||
c.readBuf.Next(len(readMsg.raw))
|
||||
|
||||
c.ch.Width = msg.Width
|
||||
c.ch.Height = msg.Height
|
||||
if readMsg.isFinalized && !c.readMsgIsIncomplete() {
|
||||
// we want to send stream resize messages for terminal sessions
|
||||
// Stream IDs for websocket streams are static.
|
||||
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
|
||||
if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm {
|
||||
var msg tsrecorder.ResizeMsg
|
||||
if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
|
||||
return 0, fmt.Errorf("error umarshalling resize message: %w", err)
|
||||
}
|
||||
|
||||
var isInitialResize bool
|
||||
c.writeCastHeaderOnce.Do(func() {
|
||||
isInitialResize = true
|
||||
// If this is a session with a terminal attached,
|
||||
// we must wait for the terminal width and
|
||||
// height to be parsed from a resize message
|
||||
// before sending CastHeader, else tsrecorder
|
||||
// will not be able to play this recording.
|
||||
err = c.rec.WriteCastHeader(c.ch)
|
||||
close(c.initialCastHeaderSent)
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error writing CastHeader: %w", err)
|
||||
}
|
||||
c.ch.Width = msg.Width
|
||||
c.ch.Height = msg.Height
|
||||
|
||||
if !isInitialResize {
|
||||
if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil {
|
||||
return 0, fmt.Errorf("error writing resize message: %w", err)
|
||||
var isInitialResize bool
|
||||
c.writeCastHeaderOnce.Do(func() {
|
||||
isInitialResize = true
|
||||
// If this is a session with a terminal attached,
|
||||
// we must wait for the terminal width and
|
||||
// height to be parsed from a resize message
|
||||
// before sending CastHeader, else tsrecorder
|
||||
// will not be able to play this recording.
|
||||
err = c.rec.WriteCastHeader(c.ch)
|
||||
close(c.initialCastHeaderSent)
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error writing CastHeader: %w", err)
|
||||
}
|
||||
|
||||
if !isInitialResize {
|
||||
if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil {
|
||||
return 0, fmt.Errorf("error writing resize message: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.currentReadMsg = readMsg
|
||||
}
|
||||
|
||||
c.currentReadMsg = readMsg
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
@ -58,15 +58,39 @@ func Test_conn_Read(t *testing.T) {
|
||||
wantCastHeaderHeight: 20,
|
||||
},
|
||||
{
|
||||
name: "two_reads_resize_message",
|
||||
inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d}},
|
||||
name: "resize_data_frame_two_in_one_read",
|
||||
inputs: [][]byte{
|
||||
fmt.Appendf(nil, "%s%s",
|
||||
append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...),
|
||||
append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...),
|
||||
),
|
||||
},
|
||||
wantRecorded: append(fakes.AsciinemaCastHeaderMsg(t, 10, 20), fakes.AsciinemaCastResizeMsg(t, 10, 20)...),
|
||||
wantCastHeaderWidth: 10,
|
||||
wantCastHeaderHeight: 20,
|
||||
},
|
||||
{
|
||||
name: "two_reads_resize_message",
|
||||
inputs: [][]byte{
|
||||
// op, len, stream ID, `{"width`
|
||||
{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22},
|
||||
// op, len, stream ID, `:10,"height":20}`
|
||||
{0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d},
|
||||
},
|
||||
wantCastHeaderWidth: 10,
|
||||
wantCastHeaderHeight: 20,
|
||||
wantRecorded: fakes.AsciinemaCastHeaderMsg(t, 10, 20),
|
||||
},
|
||||
{
|
||||
name: "three_reads_resize_message_with_split_fragment",
|
||||
inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74}, {0x22, 0x3a, 0x32, 0x30, 0x7d}},
|
||||
name: "three_reads_resize_message_with_split_fragment",
|
||||
inputs: [][]byte{
|
||||
// op, len, stream ID, `{"width"`
|
||||
{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22},
|
||||
// op, len, stream ID, `:10,"height`
|
||||
{0x00, 0x0c, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74},
|
||||
// op, len, stream ID, `":20}`
|
||||
{0x80, 0x06, 0x4, 0x22, 0x3a, 0x32, 0x30, 0x7d},
|
||||
},
|
||||
wantCastHeaderWidth: 10,
|
||||
wantCastHeaderHeight: 20,
|
||||
wantRecorded: fakes.AsciinemaCastHeaderMsg(t, 10, 20),
|
||||
|
@ -7,10 +7,10 @@ package ws
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
@ -139,6 +139,8 @@ func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) {
|
||||
return false, errors.New("[unexpected] received a message fragment with no stream ID")
|
||||
}
|
||||
|
||||
// Stream ID will be one of the constants from:
|
||||
// https://github.com/kubernetes/kubernetes/blob/f9ed14bf9b1119a2e091f4b487a3b54930661034/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57-L64
|
||||
streamID := uint32(msgPayload[0])
|
||||
if !isInitialFragment && msg.streamID.Load() != streamID {
|
||||
return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamID)
|
||||
|
Loading…
x
Reference in New Issue
Block a user