net/netmon: publish events to event bus

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson 2025-03-19 10:47:25 -07:00 committed by M. J. Fromberger
parent b0aca9d066
commit 1f3ca21ca1
41 changed files with 196 additions and 56 deletions

View File

@ -155,6 +155,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting 💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/hostinfo+ tailscale.com/util/dnsname from tailscale.com/hostinfo+
tailscale.com/util/eventbus from tailscale.com/net/netmon
💣 tailscale.com/util/hashx from tailscale.com/util/deephash 💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale tailscale.com/util/httpm from tailscale.com/client/tailscale
tailscale.com/util/lineiter from tailscale.com/hostinfo+ tailscale.com/util/lineiter from tailscale.com/hostinfo+
@ -308,7 +309,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
hash/fnv from google.golang.org/protobuf/internal/detrand hash/fnv from google.golang.org/protobuf/internal/detrand
hash/maphash from go4.org/mem hash/maphash from go4.org/mem
html from net/http/pprof+ html from net/http/pprof+
html/template from tailscale.com/cmd/derper html/template from tailscale.com/cmd/derper+
internal/abi from crypto/x509/internal/macos+ internal/abi from crypto/x509/internal/macos+
internal/asan from internal/runtime/maps+ internal/asan from internal/runtime/maps+
internal/bisect from internal/godebug internal/bisect from internal/godebug

View File

@ -43,6 +43,7 @@ import (
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
"tailscale.com/util/must" "tailscale.com/util/must"
) )
@ -956,7 +957,10 @@ func runTS2021(ctx context.Context, args []string) error {
logf = log.Printf logf = log.Printf
} }
netMon, err := netmon.New(logger.WithPrefix(logf, "netmon: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(logf, "netmon: "))
if err != nil { if err != nil {
return fmt.Errorf("creating netmon: %w", err) return fmt.Errorf("creating netmon: %w", err)
} }

View File

@ -24,6 +24,7 @@ import (
"tailscale.com/net/tlsdial" "tailscale.com/net/tlsdial"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
var netcheckCmd = &ffcli.Command{ var netcheckCmd = &ffcli.Command{
@ -48,7 +49,9 @@ var netcheckArgs struct {
func runNetcheck(ctx context.Context, args []string) error { func runNetcheck(ctx context.Context, args []string) error {
logf := logger.WithPrefix(log.Printf, "portmap: ") logf := logger.WithPrefix(log.Printf, "portmap: ")
netMon, err := netmon.New(logf) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil { if err != nil {
return err return err
} }

View File

@ -162,7 +162,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting 💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+ tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+
tailscale.com/util/eventbus from tailscale.com/net/portmapper tailscale.com/util/eventbus from tailscale.com/net/portmapper+
tailscale.com/util/groupmember from tailscale.com/client/web tailscale.com/util/groupmember from tailscale.com/client/web
💣 tailscale.com/util/hashx from tailscale.com/util/deephash 💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale+ tailscale.com/util/httpm from tailscale.com/client/tailscale+

View File

@ -27,6 +27,7 @@ import (
"tailscale.com/net/tshttpproxy" "tailscale.com/net/tshttpproxy"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/util/eventbus"
) )
var debugArgs struct { var debugArgs struct {
@ -72,11 +73,14 @@ func debugMode(args []string) error {
} }
func runMonitor(ctx context.Context, loop bool) error { func runMonitor(ctx context.Context, loop bool) error {
b := eventbus.New()
defer b.Close()
dump := func(st *netmon.State) { dump := func(st *netmon.State) {
j, _ := json.MarshalIndent(st, "", " ") j, _ := json.MarshalIndent(st, "", " ")
os.Stderr.Write(j) os.Stderr.Write(j)
} }
mon, err := netmon.New(log.Printf) mon, err := netmon.New(b, log.Printf)
if err != nil { if err != nil {
return err return err
} }

View File

@ -356,7 +356,7 @@ func run() (err error) {
var netMon *netmon.Monitor var netMon *netmon.Monitor
isWinSvc := isWindowsService() isWinSvc := isWindowsService()
if !isWinSvc { if !isWinSvc {
netMon, err = netmon.New(logf) netMon, err = netmon.New(sys.Bus.Get(), logf)
if err != nil { if err != nil {
return fmt.Errorf("netmon.New: %w", err) return fmt.Errorf("netmon.New: %w", err)
} }

View File

@ -328,7 +328,7 @@ func beWindowsSubprocess() bool {
} }
sys := tsd.NewSystem() sys := tsd.NewSystem()
netMon, err := netmon.New(log.Printf) netMon, err := netmon.New(sys.Bus.Get(), log.Printf)
if err != nil { if err != nil {
log.Fatalf("Could not create netMon: %v", err) log.Fatalf("Could not create netMon: %v", err)
} }

View File

@ -448,7 +448,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend {
sys.Set(new(mem.Store)) sys.Set(new(mem.Store))
} }
if _, ok := sys.Engine.GetOK(); !ok { if _, ok := sys.Engine.GetOK(); !ok {
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }
@ -4414,7 +4414,7 @@ func newLocalBackendWithTestControl(t *testing.T, enableLogging bool, newControl
sys := tsd.NewSystem() sys := tsd.NewSystem()
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -50,7 +50,7 @@ func TestLocalLogLines(t *testing.T) {
sys := tsd.NewSystem() sys := tsd.NewSystem()
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -34,6 +34,7 @@ import (
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/util/eventbus"
"tailscale.com/util/must" "tailscale.com/util/must"
"tailscale.com/util/usermetric" "tailscale.com/util/usermetric"
"tailscale.com/wgengine" "tailscale.com/wgengine"
@ -643,9 +644,12 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) {
h.isSelf = false h.isSelf = false
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
h.ps = &peerAPIServer{ h.ps = &peerAPIServer{
b: &LocalBackend{ b: &LocalBackend{
@ -695,9 +699,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
var h peerAPIHandler var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {
@ -768,10 +775,12 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
var h peerAPIHandler var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
rc := &appctest.RouteCollector{} rc := &appctest.RouteCollector{}
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {
@ -833,10 +842,12 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
var h peerAPIHandler var h peerAPIHandler
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
rc := &appctest.RouteCollector{} rc := &appctest.RouteCollector{}
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
var a *appc.AppConnector var a *appc.AppConnector
if shouldStore { if shouldStore {

View File

@ -882,6 +882,7 @@ func newTestBackend(t *testing.T) *LocalBackend {
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(), Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -298,7 +298,7 @@ func TestStateMachine(t *testing.T) {
sys := tsd.NewSystem() sys := tsd.NewSystem()
store := new(testStateStorage) store := new(testStateStorage)
sys.Set(store) sys.Set(store)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }
@ -936,7 +936,7 @@ func TestEditPrefsHasNoKeys(t *testing.T) {
logf := tstest.WhileTestRunningLogger(t) logf := tstest.WhileTestRunningLogger(t)
sys := tsd.NewSystem() sys := tsd.NewSystem()
sys.Set(new(mem.Store)) sys.Set(new(mem.Store))
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }
@ -1017,7 +1017,7 @@ func TestWGEngineStatusRace(t *testing.T) {
sys := tsd.NewSystem() sys := tsd.NewSystem()
sys.Set(new(mem.Store)) sys.Set(new(mem.Store))
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.Bus.Get())
c.Assert(err, qt.IsNil) c.Assert(err, qt.IsNil)
t.Cleanup(eng.Close) t.Cleanup(eng.Close)
sys.Set(eng) sys.Set(eng)

View File

@ -522,7 +522,7 @@ func newLocalBackendWithTestControl(tb testing.TB, newControl newControlClientFn
sys.Set(store) sys.Set(store)
logf := testLogger(tb, enableLogging) logf := testLogger(tb, enableLogging)
e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
tb.Fatalf("NewFakeUserspaceEngine: %v", err) tb.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -56,6 +56,7 @@ import (
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/types/tkatype" "tailscale.com/types/tkatype"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/httphdr" "tailscale.com/util/httphdr"
"tailscale.com/util/httpm" "tailscale.com/util/httpm"
"tailscale.com/util/mak" "tailscale.com/util/mak"
@ -840,7 +841,9 @@ func (h *Handler) serveDebugPortmap(w http.ResponseWriter, r *http.Request) {
}) })
defer c.Close() defer c.Close()
netMon, err := netmon.New(logger.WithPrefix(logf, "monitor: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(logf, "monitor: "))
if err != nil { if err != nil {
logf("error creating monitor: %v", err) logf("error creating monitor: %v", err)
return return

View File

@ -339,7 +339,7 @@ func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend {
sys := tsd.NewSystem() sys := tsd.NewSystem()
store := new(mem.Store) store := new(mem.Store)
sys.Set(store) sys.Set(store)
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatalf("NewFakeUserspaceEngine: %v", err) t.Fatalf("NewFakeUserspaceEngine: %v", err)
} }

View File

@ -29,6 +29,7 @@ import (
"tailscale.com/net/tsdial" "tailscale.com/net/tsdial"
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/types/dnstype" "tailscale.com/types/dnstype"
"tailscale.com/util/eventbus"
) )
func (rr resolverAndDelay) String() string { func (rr resolverAndDelay) String() string {
@ -454,7 +455,9 @@ func makeLargeResponse(tb testing.TB, domain string) (request, response []byte)
func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports ...uint16) ([]byte, error) { func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports ...uint16) ([]byte, error) {
logf := tstest.WhileTestRunningLogger(tb) logf := tstest.WhileTestRunningLogger(tb)
netMon, err := netmon.New(logf) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil { if err != nil {
tb.Fatal(err) tb.Fatal(err)
} }

View File

@ -31,6 +31,7 @@ import (
"tailscale.com/types/dnstype" "tailscale.com/types/dnstype"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"tailscale.com/util/eventbus"
) )
var ( var (
@ -1059,7 +1060,10 @@ func TestForwardLinkSelection(t *testing.T) {
// routes differently. // routes differently.
specialIP := netaddr.IPv4(1, 2, 3, 4) specialIP := netaddr.IPv4(1, 2, 3, 4)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, ".... netmon: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, ".... netmon: "))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -15,6 +15,7 @@ import (
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
func TestGetDERPMap(t *testing.T) { func TestGetDERPMap(t *testing.T) {
@ -185,7 +186,10 @@ func TestLookup(t *testing.T) {
logf, closeLogf := logger.LogfCloser(t.Logf) logf, closeLogf := logger.LogfCloser(t.Logf)
defer closeLogf() defer closeLogf()
netMon, err := netmon.New(logf) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -7,10 +7,14 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"testing" "testing"
"tailscale.com/util/eventbus"
) )
func TestLinkChangeLogLimiter(t *testing.T) { func TestLinkChangeLogLimiter(t *testing.T) {
mon, err := New(t.Logf) bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -16,6 +16,7 @@ import (
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/set" "tailscale.com/util/set"
) )
@ -50,7 +51,10 @@ type osMon interface {
// Monitor represents a monitoring instance. // Monitor represents a monitoring instance.
type Monitor struct { type Monitor struct {
logf logger.Logf logf logger.Logf
b *eventbus.Client
changed *eventbus.Publisher[*ChangeDelta]
om osMon // nil means not supported on this platform om osMon // nil means not supported on this platform
change chan bool // send false to wake poller, true to also force ChangeDeltas be sent change chan bool // send false to wake poller, true to also force ChangeDeltas be sent
stop chan struct{} // closed on Stop stop chan struct{} // closed on Stop
@ -114,21 +118,23 @@ type ChangeDelta struct {
// New instantiates and starts a monitoring instance. // New instantiates and starts a monitoring instance.
// The returned monitor is inactive until it's started by the Start method. // The returned monitor is inactive until it's started by the Start method.
// Use RegisterChangeCallback to get notified of network changes. // Use RegisterChangeCallback to get notified of network changes.
func New(logf logger.Logf) (*Monitor, error) { func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) {
logf = logger.WithPrefix(logf, "monitor: ") logf = logger.WithPrefix(logf, "monitor: ")
m := &Monitor{ m := &Monitor{
logf: logf, logf: logf,
b: bus.Client("netmon"),
change: make(chan bool, 1), change: make(chan bool, 1),
stop: make(chan struct{}), stop: make(chan struct{}),
lastWall: wallTime(), lastWall: wallTime(),
} }
m.changed = eventbus.Publish[*ChangeDelta](m.b)
st, err := m.interfaceStateUncached() st, err := m.interfaceStateUncached()
if err != nil { if err != nil {
return nil, err return nil, err
} }
m.ifState = st m.ifState = st
m.om, err = newOSMon(logf, m) m.om, err = newOSMon(bus, logf, m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -465,6 +471,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) {
if delta.TimeJumped { if delta.TimeJumped {
metricChangeTimeJump.Add(1) metricChangeTimeJump.Add(1)
} }
m.changed.Publish(delta)
for _, cb := range m.cbs { for _, cb := range m.cbs {
go cb(delta) go cb(delta)
} }

View File

@ -13,6 +13,7 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"tailscale.com/net/netaddr" "tailscale.com/net/netaddr"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
const debugRouteMessages = false const debugRouteMessages = false
@ -24,7 +25,7 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false } func (unspecifiedMessage) ignore() bool { return false }
func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) { func newOSMon(_ *eventbus.Bus, logf logger.Logf, _ *Monitor) (osMon, error) {
fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0) fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -10,6 +10,7 @@ import (
"strings" "strings"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
// unspecifiedMessage is a minimal message implementation that should not // unspecifiedMessage is a minimal message implementation that should not
@ -24,7 +25,7 @@ type devdConn struct {
conn net.Conn conn net.Conn
} }
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe") conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe")
if err != nil { if err != nil {
logf("devd dial error: %v, falling back to polling method", err) logf("devd dial error: %v, falling back to polling method", err)

View File

@ -16,6 +16,7 @@ import (
"tailscale.com/envknob" "tailscale.com/envknob"
"tailscale.com/net/tsaddr" "tailscale.com/net/tsaddr"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
var debugNetlinkMessages = envknob.RegisterBool("TS_DEBUG_NETLINK") var debugNetlinkMessages = envknob.RegisterBool("TS_DEBUG_NETLINK")
@ -27,15 +28,26 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false } func (unspecifiedMessage) ignore() bool { return false }
// RuleDeleted reports that one of Tailscale's policy routing rules
// was deleted.
type RuleDeleted struct {
// Table is the table number that the deleted rule referenced.
Table uint8
// Priority is the lookup priority of the deleted rule.
Priority uint32
}
// nlConn wraps a *netlink.Conn and returns a monitor.Message // nlConn wraps a *netlink.Conn and returns a monitor.Message
// instead of a netlink.Message. Currently, messages are discarded, // instead of a netlink.Message. Currently, messages are discarded,
// but down the line, when messages trigger different logic depending // but down the line, when messages trigger different logic depending
// on the type of event, this provides the capability of handling // on the type of event, this provides the capability of handling
// each architecture-specific message in a generic fashion. // each architecture-specific message in a generic fashion.
type nlConn struct { type nlConn struct {
logf logger.Logf busClient *eventbus.Client
conn *netlink.Conn rulesDeleted *eventbus.Publisher[RuleDeleted]
buffered []netlink.Message logf logger.Logf
conn *netlink.Conn
buffered []netlink.Message
// addrCache maps interface indices to a set of addresses, and is // addrCache maps interface indices to a set of addresses, and is
// used to suppress duplicate RTM_NEWADDR messages. It is populated // used to suppress duplicate RTM_NEWADDR messages. It is populated
@ -44,7 +56,7 @@ type nlConn struct {
addrCache map[uint32]map[netip.Addr]bool addrCache map[uint32]map[netip.Addr]bool
} }
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { func newOSMon(bus *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{ conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{
// Routes get us most of the events of interest, but we need // Routes get us most of the events of interest, but we need
// address as well to cover things like DHCP deciding to give // address as well to cover things like DHCP deciding to give
@ -59,12 +71,22 @@ func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling") logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
return newPollingMon(logf, m) return newPollingMon(logf, m)
} }
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil client := bus.Client("netmon-iprules")
return &nlConn{
busClient: client,
rulesDeleted: eventbus.Publish[RuleDeleted](client),
logf: logf,
conn: conn,
addrCache: make(map[uint32]map[netip.Addr]bool),
}, nil
} }
func (c *nlConn) IsInterestingInterface(iface string) bool { return true } func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
func (c *nlConn) Close() error { return c.conn.Close() } func (c *nlConn) Close() error {
c.busClient.Close()
return c.conn.Close()
}
func (c *nlConn) Receive() (message, error) { func (c *nlConn) Receive() (message, error) {
if len(c.buffered) == 0 { if len(c.buffered) == 0 {
@ -219,6 +241,10 @@ func (c *nlConn) Receive() (message, error) {
// On `ip -4 rule del pref 5210 table main`, logs: // On `ip -4 rule del pref 5210 table main`, logs:
// monitor: ip rule deleted: {Family:2 DstLength:0 SrcLength:0 Tos:0 Table:254 Protocol:0 Scope:0 Type:1 Flags:0 Attributes:{Dst:<nil> Src:<nil> Gateway:<nil> OutIface:0 Priority:5210 Table:254 Mark:4294967295 Expires:<nil> Metrics:<nil> Multipath:[]}} // monitor: ip rule deleted: {Family:2 DstLength:0 SrcLength:0 Tos:0 Table:254 Protocol:0 Scope:0 Type:1 Flags:0 Attributes:{Dst:<nil> Src:<nil> Gateway:<nil> OutIface:0 Priority:5210 Table:254 Mark:4294967295 Expires:<nil> Metrics:<nil> Multipath:[]}}
} }
c.rulesDeleted.Publish(RuleDeleted{
Table: rmsg.Table,
Priority: rmsg.Attributes.Priority,
})
rdm := ipRuleDeletedMessage{ rdm := ipRuleDeletedMessage{
table: rmsg.Table, table: rmsg.Table,
priority: rmsg.Attributes.Priority, priority: rmsg.Attributes.Priority,

View File

@ -7,9 +7,10 @@ package netmon
import ( import (
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
return newPollingMon(logf, m) return newPollingMon(logf, m)
} }

View File

@ -11,11 +11,15 @@ import (
"testing" "testing"
"time" "time"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak" "tailscale.com/util/mak"
) )
func TestMonitorStartClose(t *testing.T) { func TestMonitorStartClose(t *testing.T) {
mon, err := New(t.Logf) bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -26,7 +30,10 @@ func TestMonitorStartClose(t *testing.T) {
} }
func TestMonitorJustClose(t *testing.T) { func TestMonitorJustClose(t *testing.T) {
mon, err := New(t.Logf) bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -36,7 +43,10 @@ func TestMonitorJustClose(t *testing.T) {
} }
func TestMonitorInjectEvent(t *testing.T) { func TestMonitorInjectEvent(t *testing.T) {
mon, err := New(t.Logf) bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -71,7 +81,11 @@ func TestMonitorMode(t *testing.T) {
default: default:
t.Skipf(`invalid --monitor value: must be "raw" or "callback"`) t.Skipf(`invalid --monitor value: must be "raw" or "callback"`)
} }
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -13,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/windows/tunnel/winipcfg" "golang.zx2c4.com/wireguard/windows/tunnel/winipcfg"
"tailscale.com/net/tsaddr" "tailscale.com/net/tsaddr"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
) )
var ( var (
@ -45,7 +46,7 @@ type winMon struct {
noDeadlockTicker *time.Ticker noDeadlockTicker *time.Ticker
} }
func newOSMon(logf logger.Logf, pm *Monitor) (osMon, error) { func newOSMon(_ *eventbus.Bus, logf logger.Logf, pm *Monitor) (osMon, error) {
m := &winMon{ m := &winMon{
logf: logf, logf: logf,
isActive: pm.isActive, isActive: pm.isActive,

View File

@ -10,6 +10,7 @@ import (
"testing" "testing"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/util/eventbus"
) )
type conn struct { type conn struct {
@ -72,7 +73,10 @@ func TestCheckReversePathFiltering(t *testing.T) {
if runtime.GOOS != "linux" { if runtime.GOOS != "linux" {
t.Skipf("skipping on %s", runtime.GOOS) t.Skipf("skipping on %s", runtime.GOOS)
} }
netMon, err := netmon.New(t.Logf) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, t.Logf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -1038,7 +1038,7 @@ func TestSSHAuthFlow(t *testing.T) {
func TestSSH(t *testing.T) { func TestSSH(t *testing.T) {
var logf logger.Logf = t.Logf var logf logger.Logf = t.Logf
sys := tsd.NewSystem() sys := tsd.NewSystem()
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -562,7 +562,7 @@ func (s *Server) start() (reterr error) {
return err return err
} }
s.netMon, err = netmon.New(tsLogf) s.netMon, err = netmon.New(sys.Bus.Get(), tsLogf)
if err != nil { if err != nil {
return err return err
} }

View File

@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger" _ "tailscale.com/types/logger"
_ "tailscale.com/types/logid" _ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric" _ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr" _ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare" _ "tailscale.com/util/osshare"
_ "tailscale.com/version" _ "tailscale.com/version"

View File

@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger" _ "tailscale.com/types/logger"
_ "tailscale.com/types/logid" _ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric" _ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr" _ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare" _ "tailscale.com/util/osshare"
_ "tailscale.com/version" _ "tailscale.com/version"

View File

@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger" _ "tailscale.com/types/logger"
_ "tailscale.com/types/logid" _ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric" _ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr" _ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare" _ "tailscale.com/util/osshare"
_ "tailscale.com/version" _ "tailscale.com/version"

View File

@ -48,6 +48,7 @@ import (
_ "tailscale.com/types/logger" _ "tailscale.com/types/logger"
_ "tailscale.com/types/logid" _ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric" _ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr" _ "tailscale.com/util/multierr"
_ "tailscale.com/util/osshare" _ "tailscale.com/util/osshare"
_ "tailscale.com/version" _ "tailscale.com/version"

View File

@ -56,6 +56,7 @@ import (
_ "tailscale.com/types/logger" _ "tailscale.com/types/logger"
_ "tailscale.com/types/logid" _ "tailscale.com/types/logid"
_ "tailscale.com/util/clientmetric" _ "tailscale.com/util/clientmetric"
_ "tailscale.com/util/eventbus"
_ "tailscale.com/util/multierr" _ "tailscale.com/util/multierr"
_ "tailscale.com/util/osdiag" _ "tailscale.com/util/osdiag"
_ "tailscale.com/util/osshare" _ "tailscale.com/util/osshare"

View File

@ -62,6 +62,7 @@ import (
"tailscale.com/types/nettype" "tailscale.com/types/nettype"
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/util/cibuild" "tailscale.com/util/cibuild"
"tailscale.com/util/eventbus"
"tailscale.com/util/must" "tailscale.com/util/must"
"tailscale.com/util/racebuild" "tailscale.com/util/racebuild"
"tailscale.com/util/set" "tailscale.com/util/set"
@ -173,7 +174,10 @@ func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, der
func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap, privateKey key.NodePrivate) *magicStack { func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap, privateKey key.NodePrivate) *magicStack {
t.Helper() t.Helper()
netMon, err := netmon.New(logf) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logf)
if err != nil { if err != nil {
t.Fatalf("netmon.New: %v", err) t.Fatalf("netmon.New: %v", err)
} }
@ -390,7 +394,10 @@ func TestNewConn(t *testing.T) {
} }
} }
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil { if err != nil {
t.Fatalf("netmon.New: %v", err) t.Fatalf("netmon.New: %v", err)
} }
@ -523,7 +530,10 @@ func TestDeviceStartStop(t *testing.T) {
tstest.PanicOnLog() tstest.PanicOnLog()
tstest.ResourceCheck(t) tstest.ResourceCheck(t)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil { if err != nil {
t.Fatalf("netmon.New: %v", err) t.Fatalf("netmon.New: %v", err)
} }
@ -1362,7 +1372,10 @@ func newTestConn(t testing.TB) *Conn {
t.Helper() t.Helper()
port := pickPort(t) port := pickPort(t)
netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) bus := eventbus.New()
defer bus.Close()
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil { if err != nil {
t.Fatalf("netmon.New: %v", err) t.Fatalf("netmon.New: %v", err)
} }
@ -3117,7 +3130,10 @@ func TestMaybeRebindOnError(t *testing.T) {
} }
func TestNetworkDownSendErrors(t *testing.T) { func TestNetworkDownSendErrors(t *testing.T) {
netMon := must.Get(netmon.New(t.Logf)) bus := eventbus.New()
defer bus.Close()
netMon := must.Get(netmon.New(bus, t.Logf))
defer netMon.Close() defer netMon.Close()
reg := new(usermetric.Registry) reg := new(usermetric.Registry)

View File

@ -51,6 +51,7 @@ func TestInjectInboundLeak(t *testing.T) {
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(), Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -110,6 +111,7 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl {
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(), Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
}) })
if err != nil { if err != nil {
tb.Fatal(err) tb.Fatal(err)

View File

@ -27,6 +27,7 @@ import (
"tailscale.com/net/tsaddr" "tailscale.com/net/tsaddr"
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/eventbus"
"tailscale.com/util/linuxfw" "tailscale.com/util/linuxfw"
"tailscale.com/version/distro" "tailscale.com/version/distro"
) )
@ -363,7 +364,9 @@ ip route add throw 192.168.0.0/24 table 52` + basic,
}, },
} }
mon, err := netmon.New(logger.Discard) bus := eventbus.New()
defer bus.Close()
mon, err := netmon.New(bus, logger.Discard)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -973,7 +976,10 @@ func newLinuxRootTest(t *testing.T) *linuxTest {
logf := lt.logOutput.Logf logf := lt.logOutput.Logf
mon, err := netmon.New(logger.Discard) bus := eventbus.New()
defer bus.Close()
mon, err := netmon.New(bus, logger.Discard)
if err != nil { if err != nil {
lt.Close() lt.Close()
t.Fatal(err) t.Fatal(err)

View File

@ -363,7 +363,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
if conf.NetMon != nil { if conf.NetMon != nil {
e.netMon = conf.NetMon e.netMon = conf.NetMon
} else { } else {
mon, err := netmon.New(logf) mon, err := netmon.New(conf.EventBus, logf)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -23,6 +23,7 @@ func TestIsNetstack(t *testing.T) {
SetSubsystem: sys.Set, SetSubsystem: sys.Set,
HealthTracker: sys.HealthTracker(), HealthTracker: sys.HealthTracker(),
Metrics: sys.UserMetricsRegistry(), Metrics: sys.UserMetricsRegistry(),
EventBus: sys.Bus.Get(),
}, },
) )
if err != nil { if err != nil {
@ -74,6 +75,7 @@ func TestIsNetstackRouter(t *testing.T) {
conf.SetSubsystem = sys.Set conf.SetSubsystem = sys.Set
conf.HealthTracker = sys.HealthTracker() conf.HealthTracker = sys.HealthTracker()
conf.Metrics = sys.UserMetricsRegistry() conf.Metrics = sys.UserMetricsRegistry()
conf.EventBus = sys.Bus.Get()
e, err := wgengine.NewUserspaceEngine(logger.Discard, conf) e, err := wgengine.NewUserspaceEngine(logger.Discard, conf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -25,6 +25,7 @@ import (
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/opt" "tailscale.com/types/opt"
"tailscale.com/util/eventbus"
"tailscale.com/util/usermetric" "tailscale.com/util/usermetric"
"tailscale.com/wgengine/router" "tailscale.com/wgengine/router"
"tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg"
@ -100,9 +101,12 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView {
} }
func TestUserspaceEngineReconfig(t *testing.T) { func TestUserspaceEngineReconfig(t *testing.T) {
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -166,13 +170,16 @@ func TestUserspaceEnginePortReconfig(t *testing.T) {
var knobs controlknobs.Knobs var knobs controlknobs.Knobs
bus := eventbus.New()
defer bus.Close()
// Keep making a wgengine until we find an unused port // Keep making a wgengine until we find an unused port
var ue *userspaceEngine var ue *userspaceEngine
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
for i := range 100 { for i := range 100 {
attempt := uint16(defaultPort + i) attempt := uint16(defaultPort + i)
e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg) e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg, bus)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -251,9 +258,11 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) {
var knobs controlknobs.Knobs var knobs controlknobs.Knobs
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg) e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg, bus)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"tailscale.com/health" "tailscale.com/health"
"tailscale.com/util/eventbus"
"tailscale.com/util/usermetric" "tailscale.com/util/usermetric"
) )
@ -24,9 +25,11 @@ func TestWatchdog(t *testing.T) {
t.Run("default watchdog does not fire", func(t *testing.T) { t.Run("default watchdog does not fire", func(t *testing.T) {
t.Parallel() t.Parallel()
bus := eventbus.New()
defer bus.Close()
ht := new(health.Tracker) ht := new(health.Tracker)
reg := new(usermetric.Registry) reg := new(usermetric.Registry)
e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }