From 6010812f0c033caed665ef66daca7d103d8399af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20Lensb=C3=B8l?= Date: Wed, 11 Jun 2025 14:22:30 -0400 Subject: [PATCH] ipn/localapi,client/local: add debug watcher for bus events (#16239) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updates: #15160 Signed-off-by: Claus Lensbøl --- client/local/local.go | 20 +++++ cmd/tailscale/cli/debug.go | 24 ++++++ cmd/tailscaled/tailscaled.go | 1 + ipn/localapi/localapi.go | 137 +++++++++++++++++++++++++---------- util/eventbus/debug.go | 9 +++ 5 files changed, 154 insertions(+), 37 deletions(-) diff --git a/client/local/local.go b/client/local/local.go index 7a3a4b703..bc643ad79 100644 --- a/client/local/local.go +++ b/client/local/local.go @@ -414,6 +414,26 @@ func (lc *Client) TailDaemonLogs(ctx context.Context) (io.Reader, error) { return res.Body, nil } +// StreamBusEvents returns a stream of the Tailscale bus events as they arrive. +// Close the context to stop the stream. +// Expected response from the server is newline-delimited JSON. +// The caller must close the reader when it is finished reading. +func (lc *Client) StreamBusEvents(ctx context.Context) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, "GET", + "http://"+apitype.LocalAPIHost+"/localapi/v0/debug-bus-events", nil) + if err != nil { + return nil, err + } + res, err := lc.doLocalRequestNiceError(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + return nil, errors.New(res.Status) + } + return res.Body, nil +} + // Pprof returns a pprof profile of the Tailscale daemon. func (lc *Client) Pprof(ctx context.Context, pprofType string, sec int) ([]byte, error) { var secArg string diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index 213a0166e..025382ca9 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -102,6 +102,12 @@ func debugCmd() *ffcli.Command { return fs })(), }, + { + Name: "daemon-bus-events", + ShortUsage: "tailscale debug daemon-bus-events", + Exec: runDaemonBusEvents, + ShortHelp: "Watch events on the tailscaled bus", + }, { Name: "metrics", ShortUsage: "tailscale debug metrics", @@ -784,6 +790,24 @@ func runDaemonLogs(ctx context.Context, args []string) error { } } +func runDaemonBusEvents(ctx context.Context, args []string) error { + logs, err := localClient.StreamBusEvents(ctx) + if err != nil { + return err + } + defer logs.Close() + d := json.NewDecoder(bufio.NewReader(logs)) + for { + var line eventbus.DebugEvent + err := d.Decode(&line) + if err != nil { + return err + } + fmt.Printf("[%d][%q][from: %q][to: %q] %s\n", line.Count, line.Type, + line.From, line.To, line.Event) + } +} + var metricsArgs struct { watch bool } diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index 87750bc5d..61b811c12 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -719,6 +719,7 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo Dialer: sys.Dialer.Get(), SetSubsystem: sys.Set, ControlKnobs: sys.ControlKnobs(), + EventBus: sys.Bus.Get(), DriveForLocal: driveimpl.NewFileSystemForLocal(logf), } diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 78f95b2b1..6344da42d 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -20,6 +20,7 @@ import ( "net/url" "os" "path" + "reflect" "runtime" "slices" "strconv" @@ -91,6 +92,7 @@ var handler = map[string]LocalAPIHandler{ "check-udp-gro-forwarding": (*Handler).serveCheckUDPGROForwarding, "component-debug-logging": (*Handler).serveComponentDebugLogging, "debug": (*Handler).serveDebug, + "debug-bus-events": (*Handler).serveDebugBusEvents, "debug-derp-region": (*Handler).serveDebugDERPRegion, "debug-dial-types": (*Handler).serveDebugDialTypes, "debug-log": (*Handler).serveDebugLog, @@ -332,7 +334,7 @@ func (h *Handler) serveIDToken(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - httpReq, err := http.NewRequest("POST", "https://unused/machine/id-token", bytes.NewReader(b)) + httpReq, err := http.NewRequest(httpm.POST, "https://unused/machine/id-token", bytes.NewReader(b)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -355,7 +357,7 @@ func (h *Handler) serveBugReport(w http.ResponseWriter, r *http.Request) { http.Error(w, "bugreport access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "only POST allowed", http.StatusMethodNotAllowed) return } @@ -482,7 +484,7 @@ func (h *Handler) serveSetDeviceAttrs(w http.ResponseWriter, r *http.Request) { http.Error(w, "set-device-attrs access denied", http.StatusForbidden) return } - if r.Method != "PATCH" { + if r.Method != httpm.PATCH { http.Error(w, "only PATCH allowed", http.StatusMethodNotAllowed) return } @@ -587,7 +589,7 @@ func (h *Handler) serveLogTap(w http.ResponseWriter, r *http.Request) { http.Error(w, "logtap access denied", http.StatusForbidden) return } - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "GET required", http.StatusMethodNotAllowed) return } @@ -639,7 +641,7 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) { http.Error(w, "debug access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "POST required", http.StatusMethodNotAllowed) return } @@ -712,7 +714,7 @@ func (h *Handler) serveDevSetStateStore(w http.ResponseWriter, r *http.Request) http.Error(w, "debug access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "POST required", http.StatusMethodNotAllowed) return } @@ -917,6 +919,68 @@ func (h *Handler) serveDebugPortmap(w http.ResponseWriter, r *http.Request) { } } +// serveDebugBusEvents taps into the tailscaled/utils/eventbus and streams +// events to the client. +func (h *Handler) serveDebugBusEvents(w http.ResponseWriter, r *http.Request) { + // Require write access (~root) as the logs could contain something + // sensitive. + if !h.PermitWrite { + http.Error(w, "event bus access denied", http.StatusForbidden) + return + } + if r.Method != httpm.GET { + http.Error(w, "GET required", http.StatusMethodNotAllowed) + return + } + + bus, ok := h.LocalBackend().Sys().Bus.GetOK() + if !ok { + http.Error(w, "event bus not running", http.StatusNoContent) + return + } + + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + io.WriteString(w, `{"Event":"[event listener connected]\n"}`+"\n") + f.Flush() + + mon := bus.Debugger().WatchBus() + defer mon.Close() + + i := 0 + for { + select { + case <-r.Context().Done(): + fmt.Fprintf(w, `{"Event":"[event listener closed]\n"}`) + return + case <-mon.Done(): + return + case event := <-mon.Events(): + data := eventbus.DebugEvent{ + Count: i, + Type: reflect.TypeOf(event.Event).String(), + Event: event.Event, + From: event.From.Name(), + } + for _, client := range event.To { + data.To = append(data.To, client.Name()) + } + + if msg, err := json.Marshal(data); err != nil { + fmt.Fprintf(w, `{"Event":"[ERROR] failed to marshal JSON for %T"}\n`, event.Event) + } else { + w.Write(msg) + } + f.Flush() + i++ + } + } +} + func (h *Handler) serveComponentDebugLogging(w http.ResponseWriter, r *http.Request) { if !h.PermitWrite { http.Error(w, "debug access denied", http.StatusForbidden) @@ -1078,7 +1142,7 @@ func (h *Handler) serveResetAuth(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) { switch r.Method { - case "GET": + case httpm.GET: if !h.PermitRead { http.Error(w, "serve config denied", http.StatusForbidden) return @@ -1094,7 +1158,7 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Etag", etag) w.Header().Set("Content-Type", "application/json") w.Write(bts) - case "POST": + case httpm.POST: if !h.PermitWrite { http.Error(w, "serve config denied", http.StatusForbidden) return @@ -1157,7 +1221,6 @@ func authorizeServeConfigForGOOSAndUserContext(goos string, configIn *ipn.ServeC // should never happen. panic("unreachable") } - } func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) { @@ -1291,7 +1354,7 @@ func (h *Handler) serveDebugPeerEndpointChanges(w http.ResponseWriter, r *http.R // (in ipnserver.Server) provides the blocking until the connection is no longer // in use. func InUseOtherUserIPNStream(w http.ResponseWriter, r *http.Request, err error) (handled bool) { - if r.Method != "GET" || r.URL.Path != "/localapi/v0/watch-ipn-bus" { + if r.Method != httpm.GET || r.URL.Path != "/localapi/v0/watch-ipn-bus" { return false } js, err := json.Marshal(&ipn.Notify{ @@ -1356,7 +1419,7 @@ func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) http.Error(w, "login access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "want POST", http.StatusBadRequest) return } @@ -1370,7 +1433,7 @@ func (h *Handler) serveStart(w http.ResponseWriter, r *http.Request) { http.Error(w, "access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "want POST", http.StatusBadRequest) return } @@ -1393,7 +1456,7 @@ func (h *Handler) serveLogout(w http.ResponseWriter, r *http.Request) { http.Error(w, "logout access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "want POST", http.StatusBadRequest) return } @@ -1412,7 +1475,7 @@ func (h *Handler) servePrefs(w http.ResponseWriter, r *http.Request) { } var prefs ipn.PrefsView switch r.Method { - case "PATCH": + case httpm.PATCH: if !h.PermitWrite { http.Error(w, "prefs write access denied", http.StatusForbidden) return @@ -1436,7 +1499,7 @@ func (h *Handler) servePrefs(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resJSON{Error: err.Error()}) return } - case "GET", "HEAD": + case httpm.GET, httpm.HEAD: prefs = h.b.Prefs() default: http.Error(w, "unsupported method", http.StatusMethodNotAllowed) @@ -1476,9 +1539,9 @@ func (h *Handler) servePolicy(w http.ResponseWriter, r *http.Request) { var effectivePolicy *setting.Snapshot switch r.Method { - case "GET": + case httpm.GET: effectivePolicy = policy.Get() - case "POST": + case httpm.POST: effectivePolicy, err = policy.Reload() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -1504,7 +1567,7 @@ func (h *Handler) serveCheckPrefs(w http.ResponseWriter, r *http.Request) { http.Error(w, "checkprefs access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "unsupported method", http.StatusMethodNotAllowed) return } @@ -1542,7 +1605,7 @@ func (h *Handler) serveSetDNS(w http.ResponseWriter, r *http.Request) { http.Error(w, "access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "want POST", http.StatusBadRequest) return } @@ -1557,7 +1620,7 @@ func (h *Handler) serveSetDNS(w http.ResponseWriter, r *http.Request) { } func (h *Handler) serveDERPMap(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "want GET", http.StatusBadRequest) return } @@ -1574,7 +1637,7 @@ func (h *Handler) serveSetExpirySooner(w http.ResponseWriter, r *http.Request) { http.Error(w, "access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "POST required", http.StatusMethodNotAllowed) return } @@ -1602,7 +1665,7 @@ func (h *Handler) serveSetExpirySooner(w http.ResponseWriter, r *http.Request) { func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "want POST", http.StatusBadRequest) return } @@ -1648,7 +1711,7 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { } func (h *Handler) serveDial(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "POST required", http.StatusMethodNotAllowed) return } @@ -1711,7 +1774,7 @@ func (h *Handler) serveSetPushDeviceToken(w http.ResponseWriter, r *http.Request http.Error(w, "set push device token access denied", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "unsupported method", http.StatusMethodNotAllowed) return } @@ -1729,7 +1792,7 @@ func (h *Handler) serveHandlePushMessage(w http.ResponseWriter, r *http.Request) http.Error(w, "handle push message not allowed", http.StatusForbidden) return } - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "unsupported method", http.StatusMethodNotAllowed) return } @@ -1746,7 +1809,7 @@ func (h *Handler) serveHandlePushMessage(w http.ResponseWriter, r *http.Request) } func (h *Handler) serveUploadClientMetrics(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "unsupported method", http.StatusMethodNotAllowed) return } @@ -2337,7 +2400,7 @@ func (h *Handler) serveQueryFeature(w http.ResponseWriter, r *http.Request) { } req, err := http.NewRequestWithContext(r.Context(), - "POST", "https://unused/machine/feature/query", bytes.NewReader(b)) + httpm.POST, "https://unused/machine/feature/query", bytes.NewReader(b)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -2416,7 +2479,7 @@ func (h *Handler) serveDebugLog(w http.ResponseWriter, r *http.Request) { // Effectively, it tells us whether serveUpdateInstall will be able to install // an update for us. func (h *Handler) serveUpdateCheck(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "only GET allowed", http.StatusMethodNotAllowed) return } @@ -2445,7 +2508,7 @@ func (h *Handler) serveUpdateCheck(w http.ResponseWriter, r *http.Request) { // serveUpdateProgress after pinging this endpoint to check how the update is // going. func (h *Handler) serveUpdateInstall(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { + if r.Method != httpm.POST { http.Error(w, "only POST allowed", http.StatusMethodNotAllowed) return } @@ -2460,7 +2523,7 @@ func (h *Handler) serveUpdateInstall(w http.ResponseWriter, r *http.Request) { // log messages in order from oldest to newest. If an update is not in progress, // the returned slice will be empty. func (h *Handler) serveUpdateProgress(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "only GET allowed", http.StatusMethodNotAllowed) return } @@ -2516,7 +2579,7 @@ func (h *Handler) serveDNSOSConfig(w http.ResponseWriter, r *http.Request) { // // The response if successful is a DNSQueryResponse JSON object. func (h *Handler) serveDNSQuery(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "only GET allowed", http.StatusMethodNotAllowed) return } @@ -2553,7 +2616,7 @@ func (h *Handler) serveDNSQuery(w http.ResponseWriter, r *http.Request) { // serveDriveServerAddr handles updates of the Taildrive file server address. func (h *Handler) serveDriveServerAddr(w http.ResponseWriter, r *http.Request) { - if r.Method != "PUT" { + if r.Method != httpm.PUT { http.Error(w, "only PUT allowed", http.StatusMethodNotAllowed) return } @@ -2580,7 +2643,7 @@ func (h *Handler) serveShares(w http.ResponseWriter, r *http.Request) { return } switch r.Method { - case "PUT": + case httpm.PUT: var share drive.Share err := json.NewDecoder(r.Body).Decode(&share) if err != nil { @@ -2616,7 +2679,7 @@ func (h *Handler) serveShares(w http.ResponseWriter, r *http.Request) { return } w.WriteHeader(http.StatusCreated) - case "DELETE": + case httpm.DELETE: b, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -2632,7 +2695,7 @@ func (h *Handler) serveShares(w http.ResponseWriter, r *http.Request) { return } w.WriteHeader(http.StatusNoContent) - case "POST": + case httpm.POST: var names [2]string err := json.NewDecoder(r.Body).Decode(&names) if err != nil { @@ -2657,7 +2720,7 @@ func (h *Handler) serveShares(w http.ResponseWriter, r *http.Request) { return } w.WriteHeader(http.StatusNoContent) - case "GET": + case httpm.GET: shares := h.b.DriveGetShares() err := json.NewEncoder(w).Encode(shares) if err != nil { @@ -2671,7 +2734,7 @@ func (h *Handler) serveShares(w http.ResponseWriter, r *http.Request) { // serveSuggestExitNode serves a POST endpoint for returning a suggested exit node. func (h *Handler) serveSuggestExitNode(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { + if r.Method != httpm.GET { http.Error(w, "only GET allowed", http.StatusMethodNotAllowed) return } diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 832d72ac0..b6264f82f 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -186,3 +186,12 @@ type hookFn[T any] struct { ID uint64 Fn func(T) } + +// DebugEvent is a representation of an event used for debug clients. +type DebugEvent struct { + Count int + Type string + From string + To []string + Event any +}