diff --git a/k8s-operator/api-proxy/proxy.go b/k8s-operator/api-proxy/proxy.go index c648e1622..ff0373270 100644 --- a/k8s-operator/api-proxy/proxy.go +++ b/k8s-operator/api-proxy/proxy.go @@ -114,8 +114,9 @@ func (ap *APIServerProxy) Run(ctx context.Context) error { mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachWS) ap.hs = &http.Server{ - Handler: mux, - ErrorLog: zap.NewStdLog(ap.log.Desugar()), + Handler: mux, + ErrorLog: zap.NewStdLog(ap.log.Desugar()), + TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), } mode := "noauth" @@ -140,7 +141,6 @@ func (ap *APIServerProxy) Run(ctx context.Context) error { GetCertificate: ap.lc.GetCertificate, NextProtos: []string{"http/1.1"}, } - ap.hs.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) } else { var err error tsLn, err = ap.ts.Listen("tcp", ":80") diff --git a/k8s-operator/sessionrecording/hijacker.go b/k8s-operator/sessionrecording/hijacker.go index 675a9b1dd..f816a8483 100644 --- a/k8s-operator/sessionrecording/hijacker.go +++ b/k8s-operator/sessionrecording/hijacker.go @@ -236,7 +236,6 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, if err := lc.Close(); err != nil { h.log.Infof("error closing recorder connections: %v", err) } - return }() return lc, nil } diff --git a/k8s-operator/sessionrecording/ws/conn.go b/k8s-operator/sessionrecording/ws/conn.go index 0d8aefaac..f8a62ace9 100644 --- a/k8s-operator/sessionrecording/ws/conn.go +++ b/k8s-operator/sessionrecording/ws/conn.go @@ -169,62 +169,65 @@ func (c *conn) Read(b []byte) (int, error) { return n, nil } - readMsg := &message{typ: typ} // start a new message... - // ... or pick up an already started one if the previous fragment was not final. - if c.readMsgIsIncomplete() || c.readBufHasIncompleteFragment() { - readMsg = c.currentReadMsg - } - if _, err := c.readBuf.Write(b[:n]); err != nil { return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err) } - ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log) - if err != nil { - return 0, fmt.Errorf("error parsing message: %v", err) - } - if !ok { // incomplete fragment - return n, nil - } - c.readBuf.Next(len(readMsg.raw)) + for c.readBuf.Len() != 0 { + readMsg := &message{typ: typ} // start a new message... + // ... or pick up an already started one if the previous fragment was not final. + if c.readMsgIsIncomplete() { + readMsg = c.currentReadMsg + } - if readMsg.isFinalized && !c.readMsgIsIncomplete() { - // we want to send stream resize messages for terminal sessions - // Stream IDs for websocket streams are static. - // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 - if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm { - var msg tsrecorder.ResizeMsg - if err = json.Unmarshal(readMsg.payload, &msg); err != nil { - return 0, fmt.Errorf("error umarshalling resize message: %w", err) - } + ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log) + if err != nil { + return 0, fmt.Errorf("error parsing message: %v", err) + } + if !ok { // incomplete fragment + return n, nil + } + c.readBuf.Next(len(readMsg.raw)) - c.ch.Width = msg.Width - c.ch.Height = msg.Height + if readMsg.isFinalized && !c.readMsgIsIncomplete() { + // we want to send stream resize messages for terminal sessions + // Stream IDs for websocket streams are static. + // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 + if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm { + var msg tsrecorder.ResizeMsg + if err = json.Unmarshal(readMsg.payload, &msg); err != nil { + return 0, fmt.Errorf("error umarshalling resize message: %w", err) + } - var isInitialResize bool - c.writeCastHeaderOnce.Do(func() { - isInitialResize = true - // If this is a session with a terminal attached, - // we must wait for the terminal width and - // height to be parsed from a resize message - // before sending CastHeader, else tsrecorder - // will not be able to play this recording. - err = c.rec.WriteCastHeader(c.ch) - close(c.initialCastHeaderSent) - }) - if err != nil { - return 0, fmt.Errorf("error writing CastHeader: %w", err) - } + c.ch.Width = msg.Width + c.ch.Height = msg.Height - if !isInitialResize { - if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil { - return 0, fmt.Errorf("error writing resize message: %w", err) + var isInitialResize bool + c.writeCastHeaderOnce.Do(func() { + isInitialResize = true + // If this is a session with a terminal attached, + // we must wait for the terminal width and + // height to be parsed from a resize message + // before sending CastHeader, else tsrecorder + // will not be able to play this recording. + err = c.rec.WriteCastHeader(c.ch) + close(c.initialCastHeaderSent) + }) + if err != nil { + return 0, fmt.Errorf("error writing CastHeader: %w", err) + } + + if !isInitialResize { + if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil { + return 0, fmt.Errorf("error writing resize message: %w", err) + } } } } + + c.currentReadMsg = readMsg } - c.currentReadMsg = readMsg return n, nil } diff --git a/k8s-operator/sessionrecording/ws/message.go b/k8s-operator/sessionrecording/ws/message.go index 713febec7..35667ae21 100644 --- a/k8s-operator/sessionrecording/ws/message.go +++ b/k8s-operator/sessionrecording/ws/message.go @@ -7,10 +7,10 @@ package ws import ( "encoding/binary" + "errors" "fmt" "sync/atomic" - "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/net/websocket" @@ -139,6 +139,8 @@ func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) { return false, errors.New("[unexpected] received a message fragment with no stream ID") } + // Stream ID will be one of the constants from: + // https://github.com/kubernetes/kubernetes/blob/f9ed14bf9b1119a2e091f4b487a3b54930661034/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57-L64 streamID := uint32(msgPayload[0]) if !isInitialFragment && msg.streamID.Load() != streamID { return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamID)