client/local: use an iterator to stream bus events (#16269)

This means the caller does not have to remember to close the reader, and avoids
having to duplicate the logic to decode JSON into events.

Updates #15160

Change-Id: I20186fabb02f72522f61d5908c4cc80b86b8936b
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-06-13 15:47:35 -07:00 committed by GitHub
parent 6a4d92ecef
commit fe391d5694
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 41 additions and 29 deletions

View File

@ -1,12 +1,11 @@
// Copyright (c) Tailscale Inc & AUTHORS // Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause // SPDX-License-Identifier: BSD-3-Clause
//go:build go1.22
// Package local contains a Go client for the Tailscale LocalAPI. // Package local contains a Go client for the Tailscale LocalAPI.
package local package local
import ( import (
"bufio"
"bytes" "bytes"
"cmp" "cmp"
"context" "context"
@ -16,6 +15,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"iter"
"net" "net"
"net/http" "net/http"
"net/http/httptrace" "net/http/httptrace"
@ -42,6 +42,7 @@ import (
"tailscale.com/types/dnstype" "tailscale.com/types/dnstype"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/tkatype" "tailscale.com/types/tkatype"
"tailscale.com/util/eventbus"
"tailscale.com/util/syspolicy/setting" "tailscale.com/util/syspolicy/setting"
) )
@ -414,24 +415,42 @@ func (lc *Client) TailDaemonLogs(ctx context.Context) (io.Reader, error) {
return res.Body, nil return res.Body, nil
} }
// StreamBusEvents returns a stream of the Tailscale bus events as they arrive. // StreamBusEvents returns an iterator of Tailscale bus events as they arrive.
// Close the context to stop the stream. // Each pair is a valid event and a nil error, or a zero event a non-nil error.
// Expected response from the server is newline-delimited JSON. // In case of error, the iterator ends after the pair reporting the error.
// The caller must close the reader when it is finished reading. // Iteration stops if ctx ends.
func (lc *Client) StreamBusEvents(ctx context.Context) (io.ReadCloser, error) { func (lc *Client) StreamBusEvents(ctx context.Context) iter.Seq2[eventbus.DebugEvent, error] {
req, err := http.NewRequestWithContext(ctx, "GET", return func(yield func(eventbus.DebugEvent, error) bool) {
"http://"+apitype.LocalAPIHost+"/localapi/v0/debug-bus-events", nil) req, err := http.NewRequestWithContext(ctx, "GET",
if err != nil { "http://"+apitype.LocalAPIHost+"/localapi/v0/debug-bus-events", nil)
return nil, err if err != nil {
yield(eventbus.DebugEvent{}, err)
return
}
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
yield(eventbus.DebugEvent{}, err)
return
}
if res.StatusCode != http.StatusOK {
yield(eventbus.DebugEvent{}, errors.New(res.Status))
return
}
defer res.Body.Close()
dec := json.NewDecoder(bufio.NewReader(res.Body))
for {
var evt eventbus.DebugEvent
if err := dec.Decode(&evt); err == io.EOF {
return
} else if err != nil {
yield(eventbus.DebugEvent{}, err)
return
}
if !yield(evt, nil) {
return
}
}
} }
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, errors.New(res.Status)
}
return res.Body, nil
} }
// Pprof returns a pprof profile of the Tailscale daemon. // Pprof returns a pprof profile of the Tailscale daemon.

View File

@ -157,7 +157,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting 💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/hostinfo+ tailscale.com/util/dnsname from tailscale.com/hostinfo+
tailscale.com/util/eventbus from tailscale.com/net/netmon tailscale.com/util/eventbus from tailscale.com/net/netmon+
💣 tailscale.com/util/hashx from tailscale.com/util/deephash 💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale tailscale.com/util/httpm from tailscale.com/client/tailscale
tailscale.com/util/lineiter from tailscale.com/hostinfo+ tailscale.com/util/lineiter from tailscale.com/hostinfo+

View File

@ -791,21 +791,14 @@ func runDaemonLogs(ctx context.Context, args []string) error {
} }
func runDaemonBusEvents(ctx context.Context, args []string) error { func runDaemonBusEvents(ctx context.Context, args []string) error {
logs, err := localClient.StreamBusEvents(ctx) for line, err := range localClient.StreamBusEvents(ctx) {
if err != nil {
return err
}
defer logs.Close()
d := json.NewDecoder(bufio.NewReader(logs))
for {
var line eventbus.DebugEvent
err := d.Decode(&line)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("[%d][%q][from: %q][to: %q] %s\n", line.Count, line.Type, fmt.Printf("[%d][%q][from: %q][to: %q] %s\n", line.Count, line.Type,
line.From, line.To, line.Event) line.From, line.To, line.Event)
} }
return nil
} }
var metricsArgs struct { var metricsArgs struct {