diff --git a/k8s-operator/sessionrecording/ws/conn.go b/k8s-operator/sessionrecording/ws/conn.go index 717c3a32b..17c77f1d0 100644 --- a/k8s-operator/sessionrecording/ws/conn.go +++ b/k8s-operator/sessionrecording/ws/conn.go @@ -172,79 +172,86 @@ func (c *conn) Read(b []byte) (int, error) { return n, nil } - readMsg := &message{typ: typ} // start a new message... - // ... or pick up an already started one if the previous fragment was not final. - if c.readMsgIsIncomplete() || c.readBufHasIncompleteFragment() { - readMsg = c.currentReadMsg - } - + c.log.Infof("FOOOOOOOOOOOO: Writing %d bytes to readBuf", n) if _, err := c.readBuf.Write(b[:n]); err != nil { c.log.Infof("FOOOOOOOOOOOO: got %v from readBuf", err) return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err) } - ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log) - if err != nil { - c.log.Infof("FOOOOOOOOOOOO: got %v from readMsg.Parse", err) - return 0, fmt.Errorf("error parsing message: %v", err) - } - if !ok { // incomplete fragment - return n, nil - } - c.readBuf.Next(len(readMsg.raw)) + for c.readBuf.Len() != 0 { + readMsg := &message{typ: typ} // start a new message... + // ... or pick up an already started one if the previous fragment was not final. + if c.readMsgIsIncomplete() { + c.log.Infof("FOOOOOOOOOOOO: continuing a message fragment with type %v, incompleteFragment: %v, len: %d", c.currentReadMsg.typ, c.readBufHasIncompleteFragment(), c.readBuf.Len()) + readMsg = c.currentReadMsg + } - if readMsg.isFinalized && !c.readMsgIsIncomplete() { - // we want to send stream resize messages for terminal sessions - // Stream IDs for websocket streams are static. - // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 - if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm { - var msg tsrecorder.ResizeMsg - c.log.Infof("FOOOOOOOOOOOO: unmarshalling payload: %s", string(readMsg.payload)) - d := json.NewDecoder(bytes.NewReader(readMsg.payload)) - var count int - for d.More() { - msg = tsrecorder.ResizeMsg{} - if err = d.Decode(&msg); err != nil { - c.log.Infof("FOOOOOOOOOOOO: got %v from json.Unmarshal, payload: %s", err, string(readMsg.payload)) - return 0, fmt.Errorf("error umarshalling resize message: %w", err) + c.log.Infof("FOOOOOOOOOOOO: readMsg buffer has length %d, contents: %v", c.readBuf.Len(), c.readBuf.Bytes()) + ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log) + if err != nil { + c.log.Infof("FOOOOOOOOOOOO: got %v from readMsg.Parse", err) + return 0, fmt.Errorf("error parsing message: %v", err) + } + if !ok { // incomplete fragment + return n, nil + } + c.log.Infof("Popping %d bytes from readBuf", len(readMsg.raw)) + c.readBuf.Next(len(readMsg.raw)) + + if readMsg.isFinalized && !c.readMsgIsIncomplete() { + // we want to send stream resize messages for terminal sessions + // Stream IDs for websocket streams are static. + // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 + if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm { + var msg tsrecorder.ResizeMsg + c.log.Infof("FOOOOOOOOOOOO: unmarshalling payload: %s", string(readMsg.payload)) + d := json.NewDecoder(bytes.NewReader(readMsg.payload)) + var count int + for d.More() { + msg = tsrecorder.ResizeMsg{} + if err = d.Decode(&msg); err != nil { + c.log.Infof("FOOOOOOOOOOOO: got %v from json.Unmarshal, payload: %s", err, string(readMsg.payload)) + return 0, fmt.Errorf("error umarshalling resize message: %w", err) + } + c.log.Infof("FOOOOOOOOOOOO: got resize message: %v", msg) + count++ + } + if count == 0 { + c.log.Infof("FOOOOOOOOOOOO: unexpectedly no resize messages, payload: %s", string(readMsg.payload)) + return 0, fmt.Errorf("no resize messages in payload: %s", string(readMsg.payload)) } - c.log.Infof("FOOOOOOOOOOOO: got resize message: %v", msg) - count++ - } - if count == 0 { - c.log.Infof("FOOOOOOOOOOOO: unexpectedly no resize messages, payload: %s", string(readMsg.payload)) - return 0, fmt.Errorf("no resize messages in payload: %s", string(readMsg.payload)) - } - c.ch.Width = msg.Width - c.ch.Height = msg.Height + c.ch.Width = msg.Width + c.ch.Height = msg.Height - var isInitialResize bool - c.writeCastHeaderOnce.Do(func() { - isInitialResize = true - // If this is a session with a terminal attached, - // we must wait for the terminal width and - // height to be parsed from a resize message - // before sending CastHeader, else tsrecorder - // will not be able to play this recording. - err = c.rec.WriteCastHeader(c.ch) - close(c.initialCastHeaderSent) - }) - if err != nil { - c.log.Infof("FOOOOOOOOOOOO: got %v from rec.WriteCastHeader", err) - return 0, fmt.Errorf("error writing CastHeader: %w", err) - } + var isInitialResize bool + c.writeCastHeaderOnce.Do(func() { + isInitialResize = true + // If this is a session with a terminal attached, + // we must wait for the terminal width and + // height to be parsed from a resize message + // before sending CastHeader, else tsrecorder + // will not be able to play this recording. + err = c.rec.WriteCastHeader(c.ch) + close(c.initialCastHeaderSent) + }) + if err != nil { + c.log.Infof("FOOOOOOOOOOOO: got %v from rec.WriteCastHeader", err) + return 0, fmt.Errorf("error writing CastHeader: %w", err) + } - if !isInitialResize { - if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil { - c.log.Infof("FOOOOOOOOOOOO: got %v from rec.WriteResize", err) - return 0, fmt.Errorf("error writing resize message: %w", err) + if !isInitialResize { + if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil { + c.log.Infof("FOOOOOOOOOOOO: got %v from rec.WriteResize", err) + return 0, fmt.Errorf("error writing resize message: %w", err) + } } } } + + c.currentReadMsg = readMsg } - c.currentReadMsg = readMsg return n, nil } diff --git a/k8s-operator/sessionrecording/ws/message.go b/k8s-operator/sessionrecording/ws/message.go index 713febec7..ba400dbaf 100644 --- a/k8s-operator/sessionrecording/ws/message.go +++ b/k8s-operator/sessionrecording/ws/message.go @@ -10,7 +10,6 @@ import ( "fmt" "sync/atomic" - "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/net/websocket" @@ -107,7 +106,7 @@ func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) { if err != nil { return false, fmt.Errorf("error determining payload length: %w", err) } - log.Debugf("parse: parsing a message fragment with payload length: %d payload offset: %d maskOffset: %d mask set: %t, is finalized: %t, is initial fragment: %t", payloadLength, payloadOffset, maskOffset, maskSet, msg.isFinalized, isInitialFragment) + log.Infof("parse: parsing a message fragment with payload length: %d payload offset: %d maskOffset: %d mask set: %t, is finalized: %t, is initial fragment: %t", payloadLength, payloadOffset, maskOffset, maskSet, msg.isFinalized, isInitialFragment) if len(b) < int(payloadOffset+payloadLength) { // incomplete fragment return false, nil @@ -136,12 +135,18 @@ func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) { // message payload. // https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36 if len(msgPayload) == 0 { - return false, errors.New("[unexpected] received a message fragment with no stream ID") + // if !msg.isFinalized { + // return false, nil + // } + + return false, fmt.Errorf("received a message fragment with no stream ID, initial fragment: %v, is finalised: %v", isInitialFragment, msg.isFinalized) } + // Stream ID will be one of the constants from: + // https://github.com/kubernetes/kubernetes/blob/f9ed14bf9b1119a2e091f4b487a3b54930661034/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57-L64 streamID := uint32(msgPayload[0]) if !isInitialFragment && msg.streamID.Load() != streamID { - return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamID) + return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d, payload: %s", msg.streamID.Load(), streamID, string(append(msg.payload, msgPayload...))) } msg.streamID.Store(streamID)