mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-02 13:08:40 +00:00
k8s-operator: adding first fix for terminal size race
Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
parent
0a64e86a0d
commit
a754341169
@ -168,6 +168,8 @@ func runAPIServerProxy(ts *tsnet.Server, rt http.RoundTripper, log *zap.SugaredL
|
||||
mux.HandleFunc("/", ap.serveDefault)
|
||||
mux.HandleFunc("POST /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecSPDY)
|
||||
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecWS)
|
||||
mux.HandleFunc("POST /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachSPDY)
|
||||
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachWS)
|
||||
|
||||
hs := &http.Server{
|
||||
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
|
||||
@ -211,16 +213,28 @@ func (ap *apiserverProxy) serveDefault(w http.ResponseWriter, r *http.Request) {
|
||||
// serveExecSPDY serves 'kubectl exec' requests for sessions streamed over SPDY,
|
||||
// optionally configuring the kubectl exec sessions to be recorded.
|
||||
func (ap *apiserverProxy) serveExecSPDY(w http.ResponseWriter, r *http.Request) {
|
||||
ap.execForProto(w, r, ksr.SPDYProtocol)
|
||||
ap.subForProto(w, r, ksr.ExecSubcommand, ksr.SPDYProtocol)
|
||||
}
|
||||
|
||||
// serveExecWS serves 'kubectl exec' requests for sessions streamed over WebSocket,
|
||||
// optionally configuring the kubectl exec sessions to be recorded.
|
||||
func (ap *apiserverProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
|
||||
ap.execForProto(w, r, ksr.WSProtocol)
|
||||
ap.subForProto(w, r, ksr.ExecSubcommand, ksr.WSProtocol)
|
||||
}
|
||||
|
||||
func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) {
|
||||
// serveExecSPDY serves 'kubectl attach' requests for sessions streamed over SPDY,
|
||||
// optionally configuring the kubectl exec sessions to be recorded.
|
||||
func (ap *apiserverProxy) serveAttachSPDY(w http.ResponseWriter, r *http.Request) {
|
||||
ap.subForProto(w, r, ksr.AttachSubcommand, ksr.SPDYProtocol)
|
||||
}
|
||||
|
||||
// serveExecWS serves 'kubectl attach' requests for sessions streamed over WebSocket,
|
||||
// optionally configuring the kubectl exec sessions to be recorded.
|
||||
func (ap *apiserverProxy) serveAttachWS(w http.ResponseWriter, r *http.Request) {
|
||||
ap.subForProto(w, r, ksr.AttachSubcommand, ksr.WSProtocol)
|
||||
}
|
||||
|
||||
func (ap *apiserverProxy) subForProto(w http.ResponseWriter, r *http.Request, subcommand string, proto ksr.Protocol) {
|
||||
const (
|
||||
podNameKey = "pod"
|
||||
namespaceNameKey = "namespace"
|
||||
@ -235,7 +249,7 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
|
||||
counterNumRequestsProxied.Add(1)
|
||||
failOpen, addrs, err := determineRecorderConfig(who)
|
||||
if err != nil {
|
||||
ap.log.Errorf("error trying to determine whether the 'kubectl exec' session needs to be recorded: %v", err)
|
||||
ap.log.Errorf("error trying to determine whether the 'kubectl %s' session needs to be recorded: %v", subcommand, err)
|
||||
return
|
||||
}
|
||||
if failOpen && len(addrs) == 0 { // will not record
|
||||
@ -244,8 +258,8 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
|
||||
}
|
||||
ksr.CounterSessionRecordingsAttempted.Add(1) // at this point we know that users intended for this session to be recorded
|
||||
if !failOpen && len(addrs) == 0 {
|
||||
msg := "forbidden: 'kubectl exec' session must be recorded, but no recorders are available."
|
||||
ap.log.Error(msg)
|
||||
msg := "forbidden: 'kubectl %s' session must be recorded, but no recorders are available."
|
||||
ap.log.Error(fmt.Sprintf(msg, subcommand))
|
||||
http.Error(w, msg, http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
@ -266,16 +280,17 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p
|
||||
}
|
||||
|
||||
opts := ksr.HijackerOpts{
|
||||
Req: r,
|
||||
W: w,
|
||||
Proto: proto,
|
||||
TS: ap.ts,
|
||||
Who: who,
|
||||
Addrs: addrs,
|
||||
FailOpen: failOpen,
|
||||
Pod: r.PathValue(podNameKey),
|
||||
Namespace: r.PathValue(namespaceNameKey),
|
||||
Log: ap.log,
|
||||
Req: r,
|
||||
W: w,
|
||||
Proto: proto,
|
||||
Subcommand: subcommand,
|
||||
TS: ap.ts,
|
||||
Who: who,
|
||||
Addrs: addrs,
|
||||
FailOpen: failOpen,
|
||||
Pod: r.PathValue(podNameKey),
|
||||
Namespace: r.PathValue(namespaceNameKey),
|
||||
Log: ap.log,
|
||||
}
|
||||
h := ksr.New(opts)
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
//go:build !plan9
|
||||
|
||||
// Package sessionrecording contains functionality for recording Kubernetes API
|
||||
// server proxy 'kubectl exec' sessions.
|
||||
// server proxy 'kubectl exec/attach' sessions.
|
||||
package sessionrecording
|
||||
|
||||
import (
|
||||
@ -35,8 +35,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
SPDYProtocol Protocol = "SPDY"
|
||||
WSProtocol Protocol = "WebSocket"
|
||||
SPDYProtocol Protocol = "SPDY"
|
||||
WSProtocol Protocol = "WebSocket"
|
||||
ExecSubcommand = "exec"
|
||||
AttachSubcommand = "attach"
|
||||
)
|
||||
|
||||
// Protocol is the streaming protocol of the hijacked session. Supported
|
||||
@ -68,20 +70,21 @@ func New(opts HijackerOpts) *Hijacker {
|
||||
}
|
||||
|
||||
type HijackerOpts struct {
|
||||
TS *tsnet.Server
|
||||
Req *http.Request
|
||||
W http.ResponseWriter
|
||||
Who *apitype.WhoIsResponse
|
||||
Addrs []netip.AddrPort
|
||||
Log *zap.SugaredLogger
|
||||
Pod string
|
||||
Namespace string
|
||||
FailOpen bool
|
||||
Proto Protocol
|
||||
TS *tsnet.Server
|
||||
Req *http.Request
|
||||
W http.ResponseWriter
|
||||
Who *apitype.WhoIsResponse
|
||||
Addrs []netip.AddrPort
|
||||
Log *zap.SugaredLogger
|
||||
Pod string
|
||||
Namespace string
|
||||
FailOpen bool
|
||||
Proto Protocol
|
||||
Subcommand string
|
||||
}
|
||||
|
||||
// Hijacker implements [net/http.Hijacker] interface.
|
||||
// It must be configured with an http request for a 'kubectl exec' session that
|
||||
// It must be configured with an http request for a 'kubectl exec/attach' session that
|
||||
// needs to be recorded. It knows how to hijack the connection and configure for
|
||||
// the session contents to be sent to a tsrecorder instance.
|
||||
type Hijacker struct {
|
||||
@ -90,12 +93,13 @@ type Hijacker struct {
|
||||
req *http.Request
|
||||
who *apitype.WhoIsResponse
|
||||
log *zap.SugaredLogger
|
||||
pod string // pod being exec-d
|
||||
ns string // namespace of the pod being exec-d
|
||||
pod string // pod being exec/attach-d
|
||||
ns string // namespace of the pod being exec/attach-d
|
||||
addrs []netip.AddrPort // tsrecorder addresses
|
||||
failOpen bool // whether to fail open if recording fails
|
||||
connectToRecorder RecorderDialFn
|
||||
proto Protocol // streaming protocol
|
||||
subcommand string // subcommand, e.g., "exec, attach"
|
||||
}
|
||||
|
||||
// RecorderDialFn dials the specified netip.AddrPorts that should be tsrecorder
|
||||
@ -105,7 +109,7 @@ type Hijacker struct {
|
||||
// after having been established, an error is sent down the channel.
|
||||
type RecorderDialFn func(context.Context, []netip.AddrPort, netx.DialFunc) (io.WriteCloser, []*tailcfg.SSHRecordingAttempt, <-chan error, error)
|
||||
|
||||
// Hijack hijacks a 'kubectl exec' session and configures for the session
|
||||
// Hijack hijacks a 'kubectl exec/attach' session and configures for the session
|
||||
// contents to be sent to a recorder.
|
||||
func (h *Hijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
h.log.Infof("recorder addrs: %v, failOpen: %v", h.addrs, h.failOpen)
|
||||
@ -138,7 +142,7 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
|
||||
err error
|
||||
errChan <-chan error
|
||||
)
|
||||
h.log.Infof("kubectl exec session will be recorded, recorders: %v, fail open policy: %t", h.addrs, h.failOpen)
|
||||
h.log.Infof("kubectl %s session will be recorded, recorders: %v, fail open policy: %t", h.subcommand, h.addrs, h.failOpen)
|
||||
qp := h.req.URL.Query()
|
||||
container := strings.Join(qp[containerKey], "")
|
||||
var recorderAddr net.Addr
|
||||
@ -161,7 +165,7 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
|
||||
}
|
||||
return nil, errors.New(msg)
|
||||
} else {
|
||||
h.log.Infof("exec session to container %q in Pod %q namespace %q will be recorded, the recording will be sent to a tsrecorder instance at %q", container, h.pod, h.ns, recorderAddr)
|
||||
h.log.Infof("%s session to container %q in Pod %q namespace %q will be recorded, the recording will be sent to a tsrecorder instance at %q", h.subcommand, container, h.pod, h.ns, recorderAddr)
|
||||
}
|
||||
|
||||
cl := tstime.DefaultClock{}
|
||||
|
@ -4,7 +4,7 @@
|
||||
//go:build !plan9
|
||||
|
||||
// Package spdy contains functionality for parsing SPDY streaming sessions. This
|
||||
// is used for 'kubectl exec' session recording.
|
||||
// is used for 'kubectl exec/attach' session recording.
|
||||
package spdy
|
||||
|
||||
import (
|
||||
@ -24,7 +24,7 @@ import (
|
||||
)
|
||||
|
||||
// New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
|
||||
// The connection must be a hijacked connection for a 'kubectl exec' session using SPDY.
|
||||
// The connection must be a hijacked connection for a 'kubectl exec/attach' session using SPDY.
|
||||
// The hijacked connection is used to transmit SPDY streams between Kubernetes client ('kubectl') and the destination container.
|
||||
// Data read from the underlying network connection is data sent via one of the SPDY streams from the client to the container.
|
||||
// Data written to the underlying connection is data sent from the container to the client.
|
||||
@ -32,17 +32,18 @@ import (
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#background-remotecommand-subprotocol
|
||||
func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn {
|
||||
return &conn{
|
||||
Conn: nc,
|
||||
rec: rec,
|
||||
ch: ch,
|
||||
log: log,
|
||||
hasTerm: hasTerm,
|
||||
initialTermSizeSet: make(chan struct{}),
|
||||
Conn: nc,
|
||||
rec: rec,
|
||||
ch: ch,
|
||||
log: log,
|
||||
hasTerm: hasTerm,
|
||||
initialTermSizeSet: make(chan struct{}),
|
||||
initialCastHeaderSent: false,
|
||||
}
|
||||
}
|
||||
|
||||
// conn is a wrapper around net.Conn. It reads the bytestream for a 'kubectl
|
||||
// exec' session streamed using SPDY protocol, sends session recording data to
|
||||
// exec/attach' session streamed using SPDY protocol, sends session recording data to
|
||||
// the configured recorder and forwards the raw bytes to the original
|
||||
// destination.
|
||||
type conn struct {
|
||||
@ -63,7 +64,7 @@ type conn struct {
|
||||
// CastHeader must be sent before any payload. If the session has a
|
||||
// terminal attached, the CastHeader must have '.Width' and '.Height'
|
||||
// fields set for the tsrecorder UI to be able to play the recording.
|
||||
// For 'kubectl exec' sessions, terminal width and height are sent as a
|
||||
// For 'kubectl exec/attach' sessions, terminal width and height are sent as a
|
||||
// resize message on resize stream from the client when the session
|
||||
// starts as well as at any time the client detects a terminal change.
|
||||
// We can intercept the resize message on Read calls. As there is no
|
||||
@ -84,6 +85,10 @@ type conn struct {
|
||||
// be set to a buffered channel to prevent Reads being blocked on the
|
||||
// first stdout/stderr write reading from the channel.
|
||||
initialTermSizeSet chan struct{}
|
||||
// initialCastHeaderSent is a boolean that is set to ensure that the cast
|
||||
// header is the first thing that is streamed to the session recorder.
|
||||
// Otherwise the stream will fail.
|
||||
initialCastHeaderSent bool
|
||||
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
|
||||
// initialTermSizeSet channel only once, when the initial resize message
|
||||
// is received.
|
||||
@ -144,7 +149,7 @@ func (c *conn) Read(b []byte) (int, error) {
|
||||
isInitialResize = true
|
||||
close(c.initialTermSizeSet) // unblock sending of CastHeader
|
||||
})
|
||||
if !isInitialResize {
|
||||
if !isInitialResize && c.initialCastHeaderSent {
|
||||
if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil {
|
||||
return 0, fmt.Errorf("error writing resize message: %w", err)
|
||||
}
|
||||
@ -206,6 +211,7 @@ func (c *conn) Write(b []byte) (int, error) {
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error writing CastHeader: %w", err)
|
||||
}
|
||||
c.initialCastHeaderSent = true
|
||||
if err := c.rec.WriteOutput(sf.Payload); err != nil {
|
||||
return 0, fmt.Errorf("error sending payload to session recorder: %w", err)
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
// package ws has functionality to parse 'kubectl exec' sessions streamed using
|
||||
// package ws has functionality to parse 'kubectl exec/attach' sessions streamed using
|
||||
// WebSocket protocol.
|
||||
package ws
|
||||
|
||||
@ -24,7 +24,7 @@ import (
|
||||
)
|
||||
|
||||
// New wraps the provided network connection and returns a connection whose reads and writes will get triggered as data is received on the hijacked connection.
|
||||
// The connection must be a hijacked connection for a 'kubectl exec' session using WebSocket protocol and a *.channel.k8s.io subprotocol.
|
||||
// The connection must be a hijacked connection for a 'kubectl exec/attach' session using WebSocket protocol and a *.channel.k8s.io subprotocol.
|
||||
// The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes.
|
||||
// Data read from the underlying network connection is data sent via one of the streams from the client to the container.
|
||||
// Data written to the underlying connection is data sent from the container to the client.
|
||||
@ -32,20 +32,21 @@ import (
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio
|
||||
func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn {
|
||||
return &conn{
|
||||
Conn: c,
|
||||
rec: rec,
|
||||
ch: ch,
|
||||
hasTerm: hasTerm,
|
||||
log: log,
|
||||
initialTermSizeSet: make(chan struct{}, 1),
|
||||
Conn: c,
|
||||
rec: rec,
|
||||
ch: ch,
|
||||
hasTerm: hasTerm,
|
||||
log: log,
|
||||
initialTermSizeSet: make(chan struct{}, 1),
|
||||
initialCastHeaderSent: false,
|
||||
}
|
||||
}
|
||||
|
||||
// conn is a wrapper around net.Conn. It reads the bytestream
|
||||
// for a 'kubectl exec' session, sends session recording data to the configured
|
||||
// for a 'kubectl exec/attach' session, sends session recording data to the configured
|
||||
// recorder and forwards the raw bytes to the original destination.
|
||||
// A new conn is created per session.
|
||||
// conn only knows to how to read a 'kubectl exec' session that is streamed using WebSocket protocol.
|
||||
// conn only knows to how to read a 'kubectl exec/attach' session that is streamed using WebSocket protocol.
|
||||
// https://www.rfc-editor.org/rfc/rfc6455
|
||||
type conn struct {
|
||||
net.Conn
|
||||
@ -56,7 +57,7 @@ type conn struct {
|
||||
// CastHeader must be sent before any payload. If the session has a
|
||||
// terminal attached, the CastHeader must have '.Width' and '.Height'
|
||||
// fields set for the tsrecorder UI to be able to play the recording.
|
||||
// For 'kubectl exec' sessions, terminal width and height are sent as a
|
||||
// For 'kubectl exec/attach' sessions, terminal width and height are sent as a
|
||||
// resize message on resize stream from the client when the session
|
||||
// starts as well as at any time the client detects a terminal change.
|
||||
// We can intercept the resize message on Read calls. As there is no
|
||||
@ -77,6 +78,10 @@ type conn struct {
|
||||
// be set to a buffered channel to prevent Reads being blocked on the
|
||||
// first stdout/stderr write reading from the channel.
|
||||
initialTermSizeSet chan struct{}
|
||||
// initialCastHeaderSent is a boolean that is set to ensure that the cast
|
||||
// header is the first thing that is streamed to the session recorder.
|
||||
// Otherwise the stream will fail.
|
||||
initialCastHeaderSent bool
|
||||
// sendInitialTermSizeSetOnce is used to ensure that a value is sent to
|
||||
// initialTermSizeSet channel only once, when the initial resize message
|
||||
// is received.
|
||||
@ -191,7 +196,7 @@ func (c *conn) Read(b []byte) (int, error) {
|
||||
isInitialResize = true
|
||||
close(c.initialTermSizeSet) // unblock sending of CastHeader
|
||||
})
|
||||
if !isInitialResize {
|
||||
if !isInitialResize && c.initialCastHeaderSent {
|
||||
if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil {
|
||||
return 0, fmt.Errorf("error writing resize message: %w", err)
|
||||
}
|
||||
@ -268,6 +273,7 @@ func (c *conn) Write(b []byte) (int, error) {
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error writing CastHeader: %w", err)
|
||||
}
|
||||
c.initialCastHeaderSent = true
|
||||
if err := c.rec.WriteOutput(writeMsg.payload); err != nil {
|
||||
return 0, fmt.Errorf("error writing message to recorder: %v", err)
|
||||
}
|
||||
@ -321,6 +327,7 @@ func (c *conn) writeMsgIsIncomplete() bool {
|
||||
func (c *conn) readMsgIsIncomplete() bool {
|
||||
return c.currentReadMsg != nil && !c.currentReadMsg.isFinalized
|
||||
}
|
||||
|
||||
func (c *conn) curReadMsgType() (messageType, error) {
|
||||
if c.currentReadMsg != nil {
|
||||
return c.currentReadMsg.typ, nil
|
||||
|
@ -66,7 +66,7 @@ type CastHeader struct {
|
||||
Kubernetes *Kubernetes `json:"kubernetes,omitempty"`
|
||||
}
|
||||
|
||||
// Kubernetes contains 'kubectl exec' session specific information for
|
||||
// Kubernetes contains 'kubectl exec/attach' session specific information for
|
||||
// tsrecorder.
|
||||
type Kubernetes struct {
|
||||
// PodName is the name of the Pod being exec-ed.
|
||||
@ -75,4 +75,6 @@ type Kubernetes struct {
|
||||
Namespace string
|
||||
// Container is the container being exec-ed.
|
||||
Container string
|
||||
// Subcommand is the subcommand that was executed (e.g., exec, attach)
|
||||
Subcommand string
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user