From 1f3ca21ca18f00ad8d0a9984315fdc6582cb0b70 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 19 Mar 2025 10:47:25 -0700 Subject: [PATCH] net/netmon: publish events to event bus Updates #15160 Signed-off-by: David Anderson --- cmd/derper/depaware.txt | 3 +- cmd/tailscale/cli/debug.go | 6 ++- cmd/tailscale/cli/netcheck.go | 5 ++- cmd/tailscale/depaware.txt | 2 +- cmd/tailscaled/debug.go | 6 ++- cmd/tailscaled/tailscaled.go | 2 +- cmd/tailscaled/tailscaled_windows.go | 2 +- ipn/ipnlocal/local_test.go | 4 +- ipn/ipnlocal/loglines_test.go | 2 +- ipn/ipnlocal/peerapi_test.go | 19 ++++++++-- ipn/ipnlocal/serve_test.go | 1 + ipn/ipnlocal/state_test.go | 6 +-- ipn/ipnserver/server_test.go | 2 +- ipn/localapi/localapi.go | 5 ++- ipn/localapi/localapi_test.go | 2 +- net/dns/resolver/forwarder_test.go | 5 ++- net/dns/resolver/tsdns_test.go | 6 ++- net/dnsfallback/dnsfallback_test.go | 6 ++- net/netmon/loghelper_test.go | 6 ++- net/netmon/netmon.go | 13 +++++-- net/netmon/netmon_darwin.go | 3 +- net/netmon/netmon_freebsd.go | 3 +- net/netmon/netmon_linux.go | 38 ++++++++++++++++--- net/netmon/netmon_polling.go | 3 +- net/netmon/netmon_test.go | 22 +++++++++-- net/netmon/netmon_windows.go | 3 +- net/netutil/netutil_test.go | 6 ++- ssh/tailssh/tailssh_test.go | 2 +- tsnet/tsnet.go | 2 +- .../tailscaled_deps_test_darwin.go | 1 + .../tailscaled_deps_test_freebsd.go | 1 + .../integration/tailscaled_deps_test_linux.go | 1 + .../tailscaled_deps_test_openbsd.go | 1 + .../tailscaled_deps_test_windows.go | 1 + wgengine/magicsock/magicsock_test.go | 26 ++++++++++--- wgengine/netstack/netstack_test.go | 2 + wgengine/router/router_linux_test.go | 10 ++++- wgengine/userspace.go | 2 +- wgengine/userspace_ext_test.go | 2 + wgengine/userspace_test.go | 15 ++++++-- wgengine/watchdog_test.go | 5 ++- 41 files changed, 196 insertions(+), 56 deletions(-) diff --git a/cmd/derper/depaware.txt b/cmd/derper/depaware.txt index 5d375a515..7dd6eb565 100644 --- a/cmd/derper/depaware.txt +++ b/cmd/derper/depaware.txt @@ -155,6 +155,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa 💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics tailscale.com/util/dnsname from tailscale.com/hostinfo+ + tailscale.com/util/eventbus from tailscale.com/net/netmon 💣 tailscale.com/util/hashx from tailscale.com/util/deephash tailscale.com/util/httpm from tailscale.com/client/tailscale tailscale.com/util/lineiter from tailscale.com/hostinfo+ @@ -308,7 +309,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa hash/fnv from google.golang.org/protobuf/internal/detrand hash/maphash from go4.org/mem html from net/http/pprof+ - html/template from tailscale.com/cmd/derper + html/template from tailscale.com/cmd/derper+ internal/abi from crypto/x509/internal/macos+ internal/asan from internal/runtime/maps+ internal/bisect from internal/godebug diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index 9c77570d5..213a0166e 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -43,6 +43,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" "tailscale.com/util/must" ) @@ -956,7 +957,10 @@ func runTS2021(ctx context.Context, args []string) error { logf = log.Printf } - netMon, err := netmon.New(logger.WithPrefix(logf, "netmon: ")) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logger.WithPrefix(logf, "netmon: ")) if err != nil { return fmt.Errorf("creating netmon: %w", err) } diff --git a/cmd/tailscale/cli/netcheck.go b/cmd/tailscale/cli/netcheck.go index 14e337b89..3cf05a3b7 100644 --- a/cmd/tailscale/cli/netcheck.go +++ b/cmd/tailscale/cli/netcheck.go @@ -24,6 +24,7 @@ import ( "tailscale.com/net/tlsdial" "tailscale.com/tailcfg" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) var netcheckCmd = &ffcli.Command{ @@ -48,7 +49,9 @@ var netcheckArgs struct { func runNetcheck(ctx context.Context, args []string) error { logf := logger.WithPrefix(log.Printf, "portmap: ") - netMon, err := netmon.New(logf) + bus := eventbus.New() + defer bus.Close() + netMon, err := netmon.New(bus, logf) if err != nil { return err } diff --git a/cmd/tailscale/depaware.txt b/cmd/tailscale/depaware.txt index be8f53a29..968f0c5f1 100644 --- a/cmd/tailscale/depaware.txt +++ b/cmd/tailscale/depaware.txt @@ -162,7 +162,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep 💣 tailscale.com/util/deephash from tailscale.com/util/syspolicy/setting L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+ - tailscale.com/util/eventbus from tailscale.com/net/portmapper + tailscale.com/util/eventbus from tailscale.com/net/portmapper+ tailscale.com/util/groupmember from tailscale.com/client/web 💣 tailscale.com/util/hashx from tailscale.com/util/deephash tailscale.com/util/httpm from tailscale.com/client/tailscale+ diff --git a/cmd/tailscaled/debug.go b/cmd/tailscaled/debug.go index b41604d29..2f469a0d1 100644 --- a/cmd/tailscaled/debug.go +++ b/cmd/tailscaled/debug.go @@ -27,6 +27,7 @@ import ( "tailscale.com/net/tshttpproxy" "tailscale.com/tailcfg" "tailscale.com/types/key" + "tailscale.com/util/eventbus" ) var debugArgs struct { @@ -72,11 +73,14 @@ func debugMode(args []string) error { } func runMonitor(ctx context.Context, loop bool) error { + b := eventbus.New() + defer b.Close() + dump := func(st *netmon.State) { j, _ := json.MarshalIndent(st, "", " ") os.Stderr.Write(j) } - mon, err := netmon.New(log.Printf) + mon, err := netmon.New(b, log.Printf) if err != nil { return err } diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index af40f647a..4473d9af9 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -356,7 +356,7 @@ func run() (err error) { var netMon *netmon.Monitor isWinSvc := isWindowsService() if !isWinSvc { - netMon, err = netmon.New(logf) + netMon, err = netmon.New(sys.Bus.Get(), logf) if err != nil { return fmt.Errorf("netmon.New: %w", err) } diff --git a/cmd/tailscaled/tailscaled_windows.go b/cmd/tailscaled/tailscaled_windows.go index 0af434107..ccb3455a1 100644 --- a/cmd/tailscaled/tailscaled_windows.go +++ b/cmd/tailscaled/tailscaled_windows.go @@ -328,7 +328,7 @@ func beWindowsSubprocess() bool { } sys := tsd.NewSystem() - netMon, err := netmon.New(log.Printf) + netMon, err := netmon.New(sys.Bus.Get(), log.Printf) if err != nil { log.Fatalf("Could not create netMon: %v", err) } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 69f58ac2d..7175b3d55 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -448,7 +448,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { sys.Set(new(mem.Store)) } if _, ok := sys.Engine.GetOK(); !ok { - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -4414,7 +4414,7 @@ func newLocalBackendWithTestControl(t *testing.T, enableLogging bool, newControl sys := tsd.NewSystem() store := new(mem.Store) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/ipnlocal/loglines_test.go b/ipn/ipnlocal/loglines_test.go index f4a77824e..5bea6cabc 100644 --- a/ipn/ipnlocal/loglines_test.go +++ b/ipn/ipnlocal/loglines_test.go @@ -50,7 +50,7 @@ func TestLocalLogLines(t *testing.T) { sys := tsd.NewSystem() store := new(mem.Store) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatal(err) } diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index ff9b62769..7a3f05a9c 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -34,6 +34,7 @@ import ( "tailscale.com/tstest" "tailscale.com/types/logger" "tailscale.com/types/netmap" + "tailscale.com/util/eventbus" "tailscale.com/util/must" "tailscale.com/util/usermetric" "tailscale.com/wgengine" @@ -643,9 +644,12 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) { h.isSelf = false h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") + bus := eventbus.New() + defer bus.Close() + ht := new(health.Tracker) reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) h.ps = &peerAPIServer{ b: &LocalBackend{ @@ -695,9 +699,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") + bus := eventbus.New() + defer bus.Close() + ht := new(health.Tracker) reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -768,10 +775,12 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") + bus := eventbus.New() + defer bus.Close() rc := &appctest.RouteCollector{} ht := new(health.Tracker) reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -833,10 +842,12 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") + bus := eventbus.New() + defer bus.Close() ht := new(health.Tracker) reg := new(usermetric.Registry) rc := &appctest.RouteCollector{} - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { diff --git a/ipn/ipnlocal/serve_test.go b/ipn/ipnlocal/serve_test.go index 5e148a8a4..0279ea9be 100644 --- a/ipn/ipnlocal/serve_test.go +++ b/ipn/ipnlocal/serve_test.go @@ -882,6 +882,7 @@ func newTestBackend(t *testing.T) *LocalBackend { SetSubsystem: sys.Set, HealthTracker: sys.HealthTracker(), Metrics: sys.UserMetricsRegistry(), + EventBus: sys.Bus.Get(), }) if err != nil { t.Fatal(err) diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 3a851cc23..a39f78cff 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -298,7 +298,7 @@ func TestStateMachine(t *testing.T) { sys := tsd.NewSystem() store := new(testStateStorage) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -936,7 +936,7 @@ func TestEditPrefsHasNoKeys(t *testing.T) { logf := tstest.WhileTestRunningLogger(t) sys := tsd.NewSystem() sys.Set(new(mem.Store)) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -1017,7 +1017,7 @@ func TestWGEngineStatusRace(t *testing.T) { sys := tsd.NewSystem() sys.Set(new(mem.Store)) - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.Bus.Get()) c.Assert(err, qt.IsNil) t.Cleanup(eng.Close) sys.Set(eng) diff --git a/ipn/ipnserver/server_test.go b/ipn/ipnserver/server_test.go index e34172ff9..9340fd1c6 100644 --- a/ipn/ipnserver/server_test.go +++ b/ipn/ipnserver/server_test.go @@ -522,7 +522,7 @@ func newLocalBackendWithTestControl(tb testing.TB, newControl newControlClientFn sys.Set(store) logf := testLogger(tb, enableLogging) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { tb.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 5901855e3..40e3b7586 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -56,6 +56,7 @@ import ( "tailscale.com/types/ptr" "tailscale.com/types/tkatype" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/httphdr" "tailscale.com/util/httpm" "tailscale.com/util/mak" @@ -840,7 +841,9 @@ func (h *Handler) serveDebugPortmap(w http.ResponseWriter, r *http.Request) { }) defer c.Close() - netMon, err := netmon.New(logger.WithPrefix(logf, "monitor: ")) + bus := eventbus.New() + defer bus.Close() + netMon, err := netmon.New(bus, logger.WithPrefix(logf, "monitor: ")) if err != nil { logf("error creating monitor: %v", err) return diff --git a/ipn/localapi/localapi_test.go b/ipn/localapi/localapi_test.go index 4f304bb1b..970f798d0 100644 --- a/ipn/localapi/localapi_test.go +++ b/ipn/localapi/localapi_test.go @@ -339,7 +339,7 @@ func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend { sys := tsd.NewSystem() store := new(mem.Store) sys.Set(store) - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/net/dns/resolver/forwarder_test.go b/net/dns/resolver/forwarder_test.go index f3e592d4f..f7cda15f6 100644 --- a/net/dns/resolver/forwarder_test.go +++ b/net/dns/resolver/forwarder_test.go @@ -29,6 +29,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/tstest" "tailscale.com/types/dnstype" + "tailscale.com/util/eventbus" ) func (rr resolverAndDelay) String() string { @@ -454,7 +455,9 @@ func makeLargeResponse(tb testing.TB, domain string) (request, response []byte) func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports ...uint16) ([]byte, error) { logf := tstest.WhileTestRunningLogger(tb) - netMon, err := netmon.New(logf) + bus := eventbus.New() + defer bus.Close() + netMon, err := netmon.New(bus, logf) if err != nil { tb.Fatal(err) } diff --git a/net/dns/resolver/tsdns_test.go b/net/dns/resolver/tsdns_test.go index d7b9fb360..de08450d2 100644 --- a/net/dns/resolver/tsdns_test.go +++ b/net/dns/resolver/tsdns_test.go @@ -31,6 +31,7 @@ import ( "tailscale.com/types/dnstype" "tailscale.com/types/logger" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus" ) var ( @@ -1059,7 +1060,10 @@ func TestForwardLinkSelection(t *testing.T) { // routes differently. specialIP := netaddr.IPv4(1, 2, 3, 4) - netMon, err := netmon.New(logger.WithPrefix(t.Logf, ".... netmon: ")) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, ".... netmon: ")) if err != nil { t.Fatal(err) } diff --git a/net/dnsfallback/dnsfallback_test.go b/net/dnsfallback/dnsfallback_test.go index 16f5027d4..7f8810574 100644 --- a/net/dnsfallback/dnsfallback_test.go +++ b/net/dnsfallback/dnsfallback_test.go @@ -15,6 +15,7 @@ import ( "tailscale.com/net/netmon" "tailscale.com/tailcfg" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) func TestGetDERPMap(t *testing.T) { @@ -185,7 +186,10 @@ func TestLookup(t *testing.T) { logf, closeLogf := logger.LogfCloser(t.Logf) defer closeLogf() - netMon, err := netmon.New(logf) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logf) if err != nil { t.Fatal(err) } diff --git a/net/netmon/loghelper_test.go b/net/netmon/loghelper_test.go index 31777f4bc..44aa46783 100644 --- a/net/netmon/loghelper_test.go +++ b/net/netmon/loghelper_test.go @@ -7,10 +7,14 @@ import ( "bytes" "fmt" "testing" + + "tailscale.com/util/eventbus" ) func TestLinkChangeLogLimiter(t *testing.T) { - mon, err := New(t.Logf) + bus := eventbus.New() + defer bus.Close() + mon, err := New(bus, t.Logf) if err != nil { t.Fatal(err) } diff --git a/net/netmon/netmon.go b/net/netmon/netmon.go index bd62ab270..015d1d942 100644 --- a/net/netmon/netmon.go +++ b/net/netmon/netmon.go @@ -16,6 +16,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/set" ) @@ -50,7 +51,10 @@ type osMon interface { // Monitor represents a monitoring instance. type Monitor struct { - logf logger.Logf + logf logger.Logf + b *eventbus.Client + changed *eventbus.Publisher[*ChangeDelta] + om osMon // nil means not supported on this platform change chan bool // send false to wake poller, true to also force ChangeDeltas be sent stop chan struct{} // closed on Stop @@ -114,21 +118,23 @@ type ChangeDelta struct { // New instantiates and starts a monitoring instance. // The returned monitor is inactive until it's started by the Start method. // Use RegisterChangeCallback to get notified of network changes. -func New(logf logger.Logf) (*Monitor, error) { +func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) { logf = logger.WithPrefix(logf, "monitor: ") m := &Monitor{ logf: logf, + b: bus.Client("netmon"), change: make(chan bool, 1), stop: make(chan struct{}), lastWall: wallTime(), } + m.changed = eventbus.Publish[*ChangeDelta](m.b) st, err := m.interfaceStateUncached() if err != nil { return nil, err } m.ifState = st - m.om, err = newOSMon(logf, m) + m.om, err = newOSMon(bus, logf, m) if err != nil { return nil, err } @@ -465,6 +471,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) { if delta.TimeJumped { metricChangeTimeJump.Add(1) } + m.changed.Publish(delta) for _, cb := range m.cbs { go cb(delta) } diff --git a/net/netmon/netmon_darwin.go b/net/netmon/netmon_darwin.go index 8a521919b..9c5e76475 100644 --- a/net/netmon/netmon_darwin.go +++ b/net/netmon/netmon_darwin.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sys/unix" "tailscale.com/net/netaddr" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) const debugRouteMessages = false @@ -24,7 +25,7 @@ type unspecifiedMessage struct{} func (unspecifiedMessage) ignore() bool { return false } -func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) { +func newOSMon(_ *eventbus.Bus, logf logger.Logf, _ *Monitor) (osMon, error) { fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0) if err != nil { return nil, err diff --git a/net/netmon/netmon_freebsd.go b/net/netmon/netmon_freebsd.go index 30480a1d3..842cbdb0d 100644 --- a/net/netmon/netmon_freebsd.go +++ b/net/netmon/netmon_freebsd.go @@ -10,6 +10,7 @@ import ( "strings" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) // unspecifiedMessage is a minimal message implementation that should not @@ -24,7 +25,7 @@ type devdConn struct { conn net.Conn } -func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { +func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) { conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe") if err != nil { logf("devd dial error: %v, falling back to polling method", err) diff --git a/net/netmon/netmon_linux.go b/net/netmon/netmon_linux.go index dd23dd342..659fcc74b 100644 --- a/net/netmon/netmon_linux.go +++ b/net/netmon/netmon_linux.go @@ -16,6 +16,7 @@ import ( "tailscale.com/envknob" "tailscale.com/net/tsaddr" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) var debugNetlinkMessages = envknob.RegisterBool("TS_DEBUG_NETLINK") @@ -27,15 +28,26 @@ type unspecifiedMessage struct{} func (unspecifiedMessage) ignore() bool { return false } +// RuleDeleted reports that one of Tailscale's policy routing rules +// was deleted. +type RuleDeleted struct { + // Table is the table number that the deleted rule referenced. + Table uint8 + // Priority is the lookup priority of the deleted rule. + Priority uint32 +} + // nlConn wraps a *netlink.Conn and returns a monitor.Message // instead of a netlink.Message. Currently, messages are discarded, // but down the line, when messages trigger different logic depending // on the type of event, this provides the capability of handling // each architecture-specific message in a generic fashion. type nlConn struct { - logf logger.Logf - conn *netlink.Conn - buffered []netlink.Message + busClient *eventbus.Client + rulesDeleted *eventbus.Publisher[RuleDeleted] + logf logger.Logf + conn *netlink.Conn + buffered []netlink.Message // addrCache maps interface indices to a set of addresses, and is // used to suppress duplicate RTM_NEWADDR messages. It is populated @@ -44,7 +56,7 @@ type nlConn struct { addrCache map[uint32]map[netip.Addr]bool } -func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { +func newOSMon(bus *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) { conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{ // Routes get us most of the events of interest, but we need // address as well to cover things like DHCP deciding to give @@ -59,12 +71,22 @@ func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling") return newPollingMon(logf, m) } - return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil + client := bus.Client("netmon-iprules") + return &nlConn{ + busClient: client, + rulesDeleted: eventbus.Publish[RuleDeleted](client), + logf: logf, + conn: conn, + addrCache: make(map[uint32]map[netip.Addr]bool), + }, nil } func (c *nlConn) IsInterestingInterface(iface string) bool { return true } -func (c *nlConn) Close() error { return c.conn.Close() } +func (c *nlConn) Close() error { + c.busClient.Close() + return c.conn.Close() +} func (c *nlConn) Receive() (message, error) { if len(c.buffered) == 0 { @@ -219,6 +241,10 @@ func (c *nlConn) Receive() (message, error) { // On `ip -4 rule del pref 5210 table main`, logs: // monitor: ip rule deleted: {Family:2 DstLength:0 SrcLength:0 Tos:0 Table:254 Protocol:0 Scope:0 Type:1 Flags:0 Attributes:{Dst: Src: Gateway: OutIface:0 Priority:5210 Table:254 Mark:4294967295 Expires: Metrics: Multipath:[]}} } + c.rulesDeleted.Publish(RuleDeleted{ + Table: rmsg.Table, + Priority: rmsg.Attributes.Priority, + }) rdm := ipRuleDeletedMessage{ table: rmsg.Table, priority: rmsg.Attributes.Priority, diff --git a/net/netmon/netmon_polling.go b/net/netmon/netmon_polling.go index 3d6f94731..3b5ef6fe9 100644 --- a/net/netmon/netmon_polling.go +++ b/net/netmon/netmon_polling.go @@ -7,9 +7,10 @@ package netmon import ( "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) -func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) { +func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) { return newPollingMon(logf, m) } diff --git a/net/netmon/netmon_test.go b/net/netmon/netmon_test.go index ce55d1946..a9af8fb00 100644 --- a/net/netmon/netmon_test.go +++ b/net/netmon/netmon_test.go @@ -11,11 +11,15 @@ import ( "testing" "time" + "tailscale.com/util/eventbus" "tailscale.com/util/mak" ) func TestMonitorStartClose(t *testing.T) { - mon, err := New(t.Logf) + bus := eventbus.New() + defer bus.Close() + + mon, err := New(bus, t.Logf) if err != nil { t.Fatal(err) } @@ -26,7 +30,10 @@ func TestMonitorStartClose(t *testing.T) { } func TestMonitorJustClose(t *testing.T) { - mon, err := New(t.Logf) + bus := eventbus.New() + defer bus.Close() + + mon, err := New(bus, t.Logf) if err != nil { t.Fatal(err) } @@ -36,7 +43,10 @@ func TestMonitorJustClose(t *testing.T) { } func TestMonitorInjectEvent(t *testing.T) { - mon, err := New(t.Logf) + bus := eventbus.New() + defer bus.Close() + + mon, err := New(bus, t.Logf) if err != nil { t.Fatal(err) } @@ -71,7 +81,11 @@ func TestMonitorMode(t *testing.T) { default: t.Skipf(`invalid --monitor value: must be "raw" or "callback"`) } - mon, err := New(t.Logf) + + bus := eventbus.New() + defer bus.Close() + + mon, err := New(bus, t.Logf) if err != nil { t.Fatal(err) } diff --git a/net/netmon/netmon_windows.go b/net/netmon/netmon_windows.go index ddf13a2e4..718724b6d 100644 --- a/net/netmon/netmon_windows.go +++ b/net/netmon/netmon_windows.go @@ -13,6 +13,7 @@ import ( "golang.zx2c4.com/wireguard/windows/tunnel/winipcfg" "tailscale.com/net/tsaddr" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) var ( @@ -45,7 +46,7 @@ type winMon struct { noDeadlockTicker *time.Ticker } -func newOSMon(logf logger.Logf, pm *Monitor) (osMon, error) { +func newOSMon(_ *eventbus.Bus, logf logger.Logf, pm *Monitor) (osMon, error) { m := &winMon{ logf: logf, isActive: pm.isActive, diff --git a/net/netutil/netutil_test.go b/net/netutil/netutil_test.go index fdc26b02f..0523946e6 100644 --- a/net/netutil/netutil_test.go +++ b/net/netutil/netutil_test.go @@ -10,6 +10,7 @@ import ( "testing" "tailscale.com/net/netmon" + "tailscale.com/util/eventbus" ) type conn struct { @@ -72,7 +73,10 @@ func TestCheckReversePathFiltering(t *testing.T) { if runtime.GOOS != "linux" { t.Skipf("skipping on %s", runtime.GOOS) } - netMon, err := netmon.New(t.Logf) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, t.Logf) if err != nil { t.Fatal(err) } diff --git a/ssh/tailssh/tailssh_test.go b/ssh/tailssh/tailssh_test.go index 3dbd16047..980c77414 100644 --- a/ssh/tailssh/tailssh_test.go +++ b/ssh/tailssh/tailssh_test.go @@ -1038,7 +1038,7 @@ func TestSSHAuthFlow(t *testing.T) { func TestSSH(t *testing.T) { var logf logger.Logf = t.Logf sys := tsd.NewSystem() - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatal(err) } diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index 596ed1e44..0aecbf567 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -562,7 +562,7 @@ func (s *Server) start() (reterr error) { return err } - s.netMon, err = netmon.New(tsLogf) + s.netMon, err = netmon.New(sys.Bus.Get(), tsLogf) if err != nil { return err } diff --git a/tstest/integration/tailscaled_deps_test_darwin.go b/tstest/integration/tailscaled_deps_test_darwin.go index 470085f5e..321ba2566 100644 --- a/tstest/integration/tailscaled_deps_test_darwin.go +++ b/tstest/integration/tailscaled_deps_test_darwin.go @@ -48,6 +48,7 @@ import ( _ "tailscale.com/types/logger" _ "tailscale.com/types/logid" _ "tailscale.com/util/clientmetric" + _ "tailscale.com/util/eventbus" _ "tailscale.com/util/multierr" _ "tailscale.com/util/osshare" _ "tailscale.com/version" diff --git a/tstest/integration/tailscaled_deps_test_freebsd.go b/tstest/integration/tailscaled_deps_test_freebsd.go index 470085f5e..321ba2566 100644 --- a/tstest/integration/tailscaled_deps_test_freebsd.go +++ b/tstest/integration/tailscaled_deps_test_freebsd.go @@ -48,6 +48,7 @@ import ( _ "tailscale.com/types/logger" _ "tailscale.com/types/logid" _ "tailscale.com/util/clientmetric" + _ "tailscale.com/util/eventbus" _ "tailscale.com/util/multierr" _ "tailscale.com/util/osshare" _ "tailscale.com/version" diff --git a/tstest/integration/tailscaled_deps_test_linux.go b/tstest/integration/tailscaled_deps_test_linux.go index 470085f5e..321ba2566 100644 --- a/tstest/integration/tailscaled_deps_test_linux.go +++ b/tstest/integration/tailscaled_deps_test_linux.go @@ -48,6 +48,7 @@ import ( _ "tailscale.com/types/logger" _ "tailscale.com/types/logid" _ "tailscale.com/util/clientmetric" + _ "tailscale.com/util/eventbus" _ "tailscale.com/util/multierr" _ "tailscale.com/util/osshare" _ "tailscale.com/version" diff --git a/tstest/integration/tailscaled_deps_test_openbsd.go b/tstest/integration/tailscaled_deps_test_openbsd.go index 470085f5e..321ba2566 100644 --- a/tstest/integration/tailscaled_deps_test_openbsd.go +++ b/tstest/integration/tailscaled_deps_test_openbsd.go @@ -48,6 +48,7 @@ import ( _ "tailscale.com/types/logger" _ "tailscale.com/types/logid" _ "tailscale.com/util/clientmetric" + _ "tailscale.com/util/eventbus" _ "tailscale.com/util/multierr" _ "tailscale.com/util/osshare" _ "tailscale.com/version" diff --git a/tstest/integration/tailscaled_deps_test_windows.go b/tstest/integration/tailscaled_deps_test_windows.go index a6df2f9ff..ed118ec05 100644 --- a/tstest/integration/tailscaled_deps_test_windows.go +++ b/tstest/integration/tailscaled_deps_test_windows.go @@ -56,6 +56,7 @@ import ( _ "tailscale.com/types/logger" _ "tailscale.com/types/logid" _ "tailscale.com/util/clientmetric" + _ "tailscale.com/util/eventbus" _ "tailscale.com/util/multierr" _ "tailscale.com/util/osdiag" _ "tailscale.com/util/osshare" diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 090c1218f..f50f21f56 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -62,6 +62,7 @@ import ( "tailscale.com/types/nettype" "tailscale.com/types/ptr" "tailscale.com/util/cibuild" + "tailscale.com/util/eventbus" "tailscale.com/util/must" "tailscale.com/util/racebuild" "tailscale.com/util/set" @@ -173,7 +174,10 @@ func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, der func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap, privateKey key.NodePrivate) *magicStack { t.Helper() - netMon, err := netmon.New(logf) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logf) if err != nil { t.Fatalf("netmon.New: %v", err) } @@ -390,7 +394,10 @@ func TestNewConn(t *testing.T) { } } - netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { t.Fatalf("netmon.New: %v", err) } @@ -523,7 +530,10 @@ func TestDeviceStartStop(t *testing.T) { tstest.PanicOnLog() tstest.ResourceCheck(t) - netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { t.Fatalf("netmon.New: %v", err) } @@ -1362,7 +1372,10 @@ func newTestConn(t testing.TB) *Conn { t.Helper() port := pickPort(t) - netMon, err := netmon.New(logger.WithPrefix(t.Logf, "... netmon: ")) + bus := eventbus.New() + defer bus.Close() + + netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { t.Fatalf("netmon.New: %v", err) } @@ -3117,7 +3130,10 @@ func TestMaybeRebindOnError(t *testing.T) { } func TestNetworkDownSendErrors(t *testing.T) { - netMon := must.Get(netmon.New(t.Logf)) + bus := eventbus.New() + defer bus.Close() + + netMon := must.Get(netmon.New(bus, t.Logf)) defer netMon.Close() reg := new(usermetric.Registry) diff --git a/wgengine/netstack/netstack_test.go b/wgengine/netstack/netstack_test.go index bc33d829a..157715490 100644 --- a/wgengine/netstack/netstack_test.go +++ b/wgengine/netstack/netstack_test.go @@ -51,6 +51,7 @@ func TestInjectInboundLeak(t *testing.T) { SetSubsystem: sys.Set, HealthTracker: sys.HealthTracker(), Metrics: sys.UserMetricsRegistry(), + EventBus: sys.Bus.Get(), }) if err != nil { t.Fatal(err) @@ -110,6 +111,7 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl { SetSubsystem: sys.Set, HealthTracker: sys.HealthTracker(), Metrics: sys.UserMetricsRegistry(), + EventBus: sys.Bus.Get(), }) if err != nil { tb.Fatal(err) diff --git a/wgengine/router/router_linux_test.go b/wgengine/router/router_linux_test.go index 9a159aea8..7ddd7385d 100644 --- a/wgengine/router/router_linux_test.go +++ b/wgengine/router/router_linux_test.go @@ -27,6 +27,7 @@ import ( "tailscale.com/net/tsaddr" "tailscale.com/tstest" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" "tailscale.com/util/linuxfw" "tailscale.com/version/distro" ) @@ -363,7 +364,9 @@ ip route add throw 192.168.0.0/24 table 52` + basic, }, } - mon, err := netmon.New(logger.Discard) + bus := eventbus.New() + defer bus.Close() + mon, err := netmon.New(bus, logger.Discard) if err != nil { t.Fatal(err) } @@ -973,7 +976,10 @@ func newLinuxRootTest(t *testing.T) *linuxTest { logf := lt.logOutput.Logf - mon, err := netmon.New(logger.Discard) + bus := eventbus.New() + defer bus.Close() + + mon, err := netmon.New(bus, logger.Discard) if err != nil { lt.Close() t.Fatal(err) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 201d7df51..78f614310 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -363,7 +363,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) if conf.NetMon != nil { e.netMon = conf.NetMon } else { - mon, err := netmon.New(logf) + mon, err := netmon.New(conf.EventBus, logf) if err != nil { return nil, err } diff --git a/wgengine/userspace_ext_test.go b/wgengine/userspace_ext_test.go index b0caffd1e..5e7d1ce6a 100644 --- a/wgengine/userspace_ext_test.go +++ b/wgengine/userspace_ext_test.go @@ -23,6 +23,7 @@ func TestIsNetstack(t *testing.T) { SetSubsystem: sys.Set, HealthTracker: sys.HealthTracker(), Metrics: sys.UserMetricsRegistry(), + EventBus: sys.Bus.Get(), }, ) if err != nil { @@ -74,6 +75,7 @@ func TestIsNetstackRouter(t *testing.T) { conf.SetSubsystem = sys.Set conf.HealthTracker = sys.HealthTracker() conf.Metrics = sys.UserMetricsRegistry() + conf.EventBus = sys.Bus.Get() e, err := wgengine.NewUserspaceEngine(logger.Discard, conf) if err != nil { t.Fatal(err) diff --git a/wgengine/userspace_test.go b/wgengine/userspace_test.go index 051421862..87a36c673 100644 --- a/wgengine/userspace_test.go +++ b/wgengine/userspace_test.go @@ -25,6 +25,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/opt" + "tailscale.com/util/eventbus" "tailscale.com/util/usermetric" "tailscale.com/wgengine/router" "tailscale.com/wgengine/wgcfg" @@ -100,9 +101,12 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView { } func TestUserspaceEngineReconfig(t *testing.T) { + bus := eventbus.New() + defer bus.Close() + ht := new(health.Tracker) reg := new(usermetric.Registry) - e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) + e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus) if err != nil { t.Fatal(err) } @@ -166,13 +170,16 @@ func TestUserspaceEnginePortReconfig(t *testing.T) { var knobs controlknobs.Knobs + bus := eventbus.New() + defer bus.Close() + // Keep making a wgengine until we find an unused port var ue *userspaceEngine ht := new(health.Tracker) reg := new(usermetric.Registry) for i := range 100 { attempt := uint16(defaultPort + i) - e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg) + e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg, bus) if err != nil { t.Fatal(err) } @@ -251,9 +258,11 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) { var knobs controlknobs.Knobs + bus := eventbus.New() + defer bus.Close() ht := new(health.Tracker) reg := new(usermetric.Registry) - e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg) + e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg, bus) if err != nil { t.Fatal(err) } diff --git a/wgengine/watchdog_test.go b/wgengine/watchdog_test.go index b05cd421f..a54a0d3fa 100644 --- a/wgengine/watchdog_test.go +++ b/wgengine/watchdog_test.go @@ -9,6 +9,7 @@ import ( "time" "tailscale.com/health" + "tailscale.com/util/eventbus" "tailscale.com/util/usermetric" ) @@ -24,9 +25,11 @@ func TestWatchdog(t *testing.T) { t.Run("default watchdog does not fire", func(t *testing.T) { t.Parallel() + bus := eventbus.New() + defer bus.Close() ht := new(health.Tracker) reg := new(usermetric.Registry) - e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) + e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus) if err != nil { t.Fatal(err) }