util/eventbus: add a debug HTTP handler for the bus

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-07 08:18:33 -08:00 committed by Dave Anderson
parent 640b2fa3ae
commit d83024a63f
10 changed files with 541 additions and 3 deletions

View File

@ -0,0 +1,6 @@
<li id="monitor" hx-swap-oob="afterbegin">
<details>
<summary>{{.Count}}: {{.Type}} from {{.Event.From.Name}}, {{len .Event.To}} recipients</summary>
{{.Event.Event}}
</details>
</li>

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,97 @@
<!DOCTYPE html>
<html>
<head>
<script src="bus/htmx.min.js"></script>
<script src="bus/htmx-websocket.min.js"></script>
<link rel="stylesheet" href="bus/style.css">
</head>
<body hx-ext="ws">
<h1>Event bus</h1>
<section>
<h2>General</h2>
{{with $.PublishQueue}}
{{len .}} pending
{{end}}
<button hx-post="bus/monitor" hx-swap="outerHTML">Monitor all events</button>
</section>
<section>
<h2>Clients</h2>
<table>
<thead>
<tr>
<th>Name</th>
<th>Publishing</th>
<th>Subscribing</th>
<th>Pending</th>
</tr>
</thead>
{{range .Clients}}
<tr id="{{.Name}}">
<td>{{.Name}}</td>
<td class="list">
<ul>
{{range .Publish}}
<li><a href="#{{.}}">{{.}}</a></li>
{{end}}
</ul>
</td>
<td class="list">
<ul>
{{range .Subscribe}}
<li><a href="#{{.}}">{{.}}</a></li>
{{end}}
</ul>
</td>
<td>
{{len ($.SubscribeQueue .Client)}}
</td>
</tr>
{{end}}
</table>
</section>
<section>
<h2>Types</h2>
{{range .Types}}
<section id="{{.}}">
<h3>{{.Name}}</h3>
<h4>Definition</h4>
<code>{{prettyPrintStruct .}}</code>
<h4>Published by:</h4>
{{if len (.Publish)}}
<ul>
{{range .Publish}}
<li><a href="#{{.Name}}">{{.Name}}</a></li>
{{end}}
</ul>
{{else}}
<ul>
<li>No publishers.</li>
</ul>
{{end}}
<h4>Received by:</h4>
{{if len (.Subscribe)}}
<ul>
{{range .Subscribe}}
<li><a href="#{{.Name}}">{{.Name}}</a></li>
{{end}}
</ul>
{{else}}
<ul>
<li>No subscribers.</li>
</ul>
{{end}}
</section>
{{end}}
</section>
</body>
</html>

View File

@ -0,0 +1,5 @@
<div>
<ul id="monitor" ws-connect="bus/monitor">
</ul>
<button hx-get="bus" hx-target="body">Stop monitoring</button>
</div>

View File

@ -0,0 +1,90 @@
/* CSS reset, thanks Josh Comeau: https://www.joshwcomeau.com/css/custom-css-reset/ */
*, *::before, *::after { box-sizing: border-box; }
* { margin: 0; }
input, button, textarea, select { font: inherit; }
p, h1, h2, h3, h4, h5, h6 { overflow-wrap: break-word; }
p { text-wrap: pretty; }
h1, h2, h3, h4, h5, h6 { text-wrap: balance; }
#root, #__next { isolation: isolate; }
body {
line-height: 1.5;
-webkit-font-smoothing: antialiased;
}
img, picture, video, canvas, svg {
display: block;
max-width: 100%;
}
/* Local styling begins */
body {
padding: 12px;
}
div {
width: 100%;
}
section {
display: flex;
flex-direction: column;
flex-gap: 6px;
align-items: flex-start;
padding: 12px 0;
}
section > * {
margin-left: 24px;
}
section > h2, section > h3 {
margin-left: 0;
padding-bottom: 6px;
padding-top: 12px;
}
details {
padding-bottom: 12px;
}
table {
table-layout: fixed;
width: calc(100% - 48px);
border-collapse: collapse;
border: 1px solid black;
}
th, td {
padding: 12px;
border: 1px solid black;
}
td.list {
vertical-align: top;
}
ul {
list-style: none;
}
td ul {
margin: 0;
padding: 0;
}
code {
padding: 12px;
white-space: pre;
}
#monitor {
width: calc(100% - 48px);
resize: vertical;
padding: 12px;
overflow: scroll;
height: 15lh;
border: 1px inset;
min-height: 1em;
display: flex;
flex-direction: column-reverse;
}

View File

@ -73,8 +73,8 @@ func (b *Bus) Client(name string) *Client {
}
// Debugger returns the debugging facility for the bus.
func (b *Bus) Debugger() Debugger {
return Debugger{b}
func (b *Bus) Debugger() *Debugger {
return &Debugger{b}
}
// Close closes the bus. Implicitly closes all clients, publishers and

View File

@ -4,11 +4,14 @@
package eventbus
import (
"cmp"
"fmt"
"reflect"
"slices"
"sync"
"sync/atomic"
"tailscale.com/tsweb"
)
// A Debugger offers access to a bus's privileged introspection and
@ -29,7 +32,11 @@ type Debugger struct {
// Clients returns a list of all clients attached to the bus.
func (d *Debugger) Clients() []*Client {
return d.bus.listClients()
ret := d.bus.listClients()
slices.SortFunc(ret, func(a, b *Client) int {
return cmp.Compare(a.Name(), b.Name())
})
return ret
}
// PublishQueue returns the contents of the publish queue.
@ -130,6 +137,8 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
return client.subscribeTypes()
}
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) }
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
sync.Mutex

238
util/eventbus/debughttp.go Normal file
View File

@ -0,0 +1,238 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"bytes"
"cmp"
"embed"
"fmt"
"html/template"
"io"
"io/fs"
"log"
"net/http"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"github.com/coder/websocket"
"tailscale.com/tsweb"
)
type httpDebugger struct {
*Debugger
}
func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) {
dh := httpDebugger{d}
td.Handle("bus", "Event bus", dh)
td.HandleSilent("bus/monitor", http.HandlerFunc(dh.serveMonitor))
td.HandleSilent("bus/style.css", serveStatic("style.css"))
td.HandleSilent("bus/htmx.min.js", serveStatic("htmx.min.js.gz"))
td.HandleSilent("bus/htmx-websocket.min.js", serveStatic("htmx-websocket.min.js.gz"))
}
//go:embed assets/*.html
var templatesSrc embed.FS
var templates = sync.OnceValue(func() *template.Template {
d, err := fs.Sub(templatesSrc, "assets")
if err != nil {
panic(fmt.Errorf("getting eventbus debughttp templates subdir: %w", err))
}
ret := template.New("").Funcs(map[string]any{
"prettyPrintStruct": prettyPrintStruct,
})
return template.Must(ret.ParseFS(d, "*"))
})
//go:generate go run fetch-htmx.go
//go:embed assets/*.css assets/*.min.js.gz
var static embed.FS
func serveStatic(name string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(name, ".css"):
w.Header().Set("Content-Type", "text/css")
case strings.HasSuffix(name, ".min.js.gz"):
w.Header().Set("Content-Type", "text/javascript")
w.Header().Set("Content-Encoding", "gzip")
case strings.HasSuffix(name, ".js"):
w.Header().Set("Content-Type", "text/javascript")
default:
http.Error(w, "not found", http.StatusNotFound)
return
}
f, err := static.Open(filepath.Join("assets", name))
if err != nil {
http.Error(w, fmt.Sprintf("opening asset: %v", err), http.StatusInternalServerError)
return
}
defer f.Close()
if _, err := io.Copy(w, f); err != nil {
http.Error(w, fmt.Sprintf("serving asset: %v", err), http.StatusInternalServerError)
return
}
})
}
func render(w http.ResponseWriter, name string, data any) {
err := templates().ExecuteTemplate(w, name+".html", data)
if err != nil {
err := fmt.Errorf("rendering template: %v", err)
log.Print(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func (h httpDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type clientInfo struct {
*Client
Publish []reflect.Type
Subscribe []reflect.Type
}
type typeInfo struct {
reflect.Type
Publish []*Client
Subscribe []*Client
}
type info struct {
*Debugger
Clients map[string]*clientInfo
Types map[string]*typeInfo
}
data := info{
Debugger: h.Debugger,
Clients: map[string]*clientInfo{},
Types: map[string]*typeInfo{},
}
getTypeInfo := func(t reflect.Type) *typeInfo {
if data.Types[t.Name()] == nil {
data.Types[t.Name()] = &typeInfo{
Type: t,
}
}
return data.Types[t.Name()]
}
for _, c := range h.Clients() {
ci := &clientInfo{
Client: c,
Publish: h.PublishTypes(c),
Subscribe: h.SubscribeTypes(c),
}
slices.SortFunc(ci.Publish, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
slices.SortFunc(ci.Subscribe, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
data.Clients[c.Name()] = ci
for _, t := range ci.Publish {
ti := getTypeInfo(t)
ti.Publish = append(ti.Publish, c)
}
for _, t := range ci.Subscribe {
ti := getTypeInfo(t)
ti.Subscribe = append(ti.Subscribe, c)
}
}
render(w, "main", data)
}
func (h httpDebugger) serveMonitor(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
h.serveMonitorStream(w, r)
return
}
render(w, "monitor", nil)
}
func (h httpDebugger) serveMonitorStream(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
defer conn.CloseNow()
wsCtx := conn.CloseRead(r.Context())
mon := h.WatchBus()
defer mon.Close()
i := 0
for {
select {
case <-r.Context().Done():
return
case <-wsCtx.Done():
return
case <-mon.Done():
return
case event := <-mon.Events():
msg, err := conn.Writer(r.Context(), websocket.MessageText)
if err != nil {
return
}
data := map[string]any{
"Count": i,
"Type": reflect.TypeOf(event.Event),
"Event": event,
}
i++
if err := templates().ExecuteTemplate(msg, "event.html", data); err != nil {
log.Println(err)
return
}
if err := msg.Close(); err != nil {
return
}
}
}
}
func prettyPrintStruct(t reflect.Type) string {
if t.Kind() != reflect.Struct {
return t.String()
}
var rec func(io.Writer, int, reflect.Type)
rec = func(out io.Writer, indent int, t reflect.Type) {
ind := strings.Repeat(" ", indent)
fmt.Fprintf(out, "%s", t.String())
fs := collectFields(t)
if len(fs) > 0 {
io.WriteString(out, " {\n")
for _, f := range fs {
fmt.Fprintf(out, "%s %s ", ind, f.Name)
if f.Type.Kind() == reflect.Struct {
rec(out, indent+1, f.Type)
} else {
fmt.Fprint(out, f.Type)
}
io.WriteString(out, "\n")
}
fmt.Fprintf(out, "%s}", ind)
}
}
var ret bytes.Buffer
rec(&ret, 0, t)
return ret.String()
}
func collectFields(t reflect.Type) (ret []reflect.StructField) {
for _, f := range reflect.VisibleFields(t) {
if !f.IsExported() {
continue
}
ret = append(ret, f)
}
return ret
}

View File

@ -0,0 +1,93 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build ignore
// Program fetch-htmx fetches and installs local copies of the HTMX
// library and its dependencies, used by the debug UI. It is meant to
// be run via go generate.
package main
import (
"compress/gzip"
"crypto/sha512"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"os"
)
func main() {
// Hash from https://htmx.org/docs/#installing
htmx, err := fetchHashed("https://unpkg.com/htmx.org@2.0.4", "HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+")
if err != nil {
log.Fatalf("fetching htmx: %v", err)
}
// Hash SHOULD be from https://htmx.org/extensions/ws/ , but the
// hash is currently incorrect, see
// https://github.com/bigskysoftware/htmx-extensions/issues/153
//
// Until that bug is resolved, hash was obtained by rebuilding the
// extension from git source, and verifying that the hash matches
// what unpkg is serving.
ws, err := fetchHashed("https://unpkg.com/htmx-ext-ws@2.0.2", "932iIqjARv+Gy0+r6RTGrfCkCKS5MsF539Iqf6Vt8L4YmbnnWI2DSFoMD90bvXd0")
if err != nil {
log.Fatalf("fetching htmx-websockets: %v", err)
}
if err := writeGz("assets/htmx.min.js.gz", htmx); err != nil {
log.Fatalf("writing htmx.min.js.gz: %v", err)
}
if err := writeGz("assets/htmx-websocket.min.js.gz", ws); err != nil {
log.Fatalf("writing htmx-websocket.min.js.gz: %v", err)
}
}
func writeGz(path string, bs []byte) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
g, err := gzip.NewWriterLevel(f, gzip.BestCompression)
if err != nil {
return err
}
if _, err := g.Write(bs); err != nil {
return err
}
if err := g.Flush(); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}
func fetchHashed(url, wantHash string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetching %q returned error status: %s", url, resp.Status)
}
ret, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading file from %q: %v", url, err)
}
h := sha512.Sum384(ret)
got := base64.StdEncoding.EncodeToString(h[:])
if got != wantHash {
return nil, fmt.Errorf("wrong hash for %q: got %q, want %q", url, got, wantHash)
}
return ret, nil
}