cmd/tailscale/cli, ipn/ipnlocal: [funnel] add stream mode

Adds ability to start Funnel in the foreground and stream incoming
connections. When foreground process is stopped, Funnel is turned
back off for the port.

Exampe usage:
```
TAILSCALE_FUNNEL_V2=on tailscale funnel 8080
```

Updates #8489

Signed-off-by: Marwan Sulaiman <marwan@tailscale.com>
This commit is contained in:
Marwan Sulaiman
2023-08-17 11:47:35 -04:00
committed by Marwan Sulaiman
parent cb4a61f951
commit 35ff5bf5a6
11 changed files with 399 additions and 2 deletions

View File

@@ -244,6 +244,9 @@ type LocalBackend struct {
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
// serveStreamers is a map for those running Funnel in the foreground
// and streaming incoming requests.
serveStreamers map[uint16]map[uint32]func(ipn.FunnelRequestLog) // serve port => map of stream loggers (key is UUID)
// statusLock must be held before calling statusChanged.Wait() or
// statusChanged.Broadcast().

View File

@@ -23,6 +23,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"tailscale.com/ipn"
"tailscale.com/logtail/backoff"
"tailscale.com/net/netutil"
@@ -257,6 +258,165 @@ func (b *LocalBackend) ServeConfig() ipn.ServeConfigView {
return b.serveConfig
}
// StreamServe opens a stream to write any incoming connections made
// to the given HostPort out to the listening io.Writer.
//
// If Serve and Funnel were not already enabled for the HostPort in the ServeConfig,
// the backend enables it for the duration of the context's lifespan and
// then turns it back off once the context is closed. If either are already enabled,
// then they remain that way but logs are still streamed
func (b *LocalBackend) StreamServe(ctx context.Context, w io.Writer, req ipn.ServeStreamRequest) (err error) {
f, ok := w.(http.Flusher)
if !ok {
return errors.New("writer not a flusher")
}
f.Flush()
port, err := req.HostPort.Port()
if err != nil {
return err
}
// Turn on Funnel for the given HostPort.
sc := b.ServeConfig().AsStruct()
if sc == nil {
sc = &ipn.ServeConfig{}
}
setHandler(sc, req)
if err := b.SetServeConfig(sc); err != nil {
return fmt.Errorf("errro setting serve config: %w", err)
}
// Defer turning off Funnel once stream ends.
defer func() {
sc := b.ServeConfig().AsStruct()
deleteHandler(sc, req, port)
err = errors.Join(err, b.SetServeConfig(sc))
}()
var writeErrs []error
writeToStream := func(log ipn.FunnelRequestLog) {
jsonLog, err := json.Marshal(log)
if err != nil {
writeErrs = append(writeErrs, err)
return
}
if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil {
writeErrs = append(writeErrs, err)
return
}
f.Flush()
}
// Hook up connections stream.
b.mu.Lock()
mak.NonNilMapForJSON(&b.serveStreamers)
if b.serveStreamers[port] == nil {
b.serveStreamers[port] = make(map[uint32]func(ipn.FunnelRequestLog))
}
id := uuid.New().ID()
b.serveStreamers[port][id] = writeToStream
b.mu.Unlock()
// Clean up streamer when done.
defer func() {
b.mu.Lock()
mak.NonNilMapForJSON(&b.serveStreamers)
delete(b.serveStreamers[port], id)
b.mu.Unlock()
}()
select {
case <-ctx.Done():
// Triggered by foreground `tailscale funnel` process
// (the streamer) getting closed, or by turning off Tailscale.
}
return errors.Join(writeErrs...)
}
func setHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest) {
if sc.TCP == nil {
sc.TCP = make(map[uint16]*ipn.TCPPortHandler)
}
if _, ok := sc.TCP[443]; !ok {
sc.TCP[443] = &ipn.TCPPortHandler{
HTTPS: true,
}
}
if sc.Web == nil {
sc.Web = make(map[ipn.HostPort]*ipn.WebServerConfig)
}
wsc, ok := sc.Web[req.HostPort]
if !ok {
wsc = &ipn.WebServerConfig{}
sc.Web[req.HostPort] = wsc
}
if wsc.Handlers == nil {
wsc.Handlers = make(map[string]*ipn.HTTPHandler)
}
wsc.Handlers[req.MountPoint] = &ipn.HTTPHandler{
Proxy: req.Source,
}
if sc.AllowFunnel == nil {
sc.AllowFunnel = make(map[ipn.HostPort]bool)
}
sc.AllowFunnel[req.HostPort] = true
}
func deleteHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest, port uint16) {
delete(sc.AllowFunnel, req.HostPort)
if sc.TCP != nil {
delete(sc.TCP, port)
}
if sc.Web == nil {
return
}
if sc.Web[req.HostPort] == nil {
return
}
wsc, ok := sc.Web[req.HostPort]
if !ok {
return
}
if wsc.Handlers == nil {
return
}
if _, ok := wsc.Handlers[req.MountPoint]; !ok {
return
}
delete(wsc.Handlers, req.MountPoint)
if len(wsc.Handlers) == 0 {
delete(sc.Web, req.HostPort)
}
}
func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.AddrPort) {
b.mu.Lock()
streamers := b.serveStreamers[destPort]
b.mu.Unlock()
if len(streamers) == 0 {
return
}
var log ipn.FunnelRequestLog
log.SrcAddr = srcAddr
log.Time = time.Now() // TODO: use a different clock somewhere?
if node, user, ok := b.WhoIs(srcAddr); ok {
log.NodeName = node.ComputedName()
if node.IsTagged() {
log.NodeTags = node.Tags().AsSlice()
} else {
log.UserLoginName = user.LoginName
log.UserDisplayName = user.DisplayName
}
}
for _, stream := range streamers {
stream(log)
}
}
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer tailcfg.NodeView, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) {
b.mu.Lock()
sc := b.serveConfig
@@ -359,6 +519,7 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort)
if backDst := tcph.TCPForward(); backDst != "" {
return func(conn net.Conn) error {
defer conn.Close()
b.maybeLogServeConnection(dport, srcAddr)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
cancel()
@@ -527,6 +688,9 @@ func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
if c, ok := getServeHTTPContext(r); ok {
b.maybeLogServeConnection(c.DestPort, c.SrcAddr)
}
if s := h.Text(); s != "" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
io.WriteString(w, s)