From 9ad2928f915bfdb5b42c51a0f0efea19c5388183 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 16 Jul 2025 08:13:08 -0700 Subject: [PATCH] ipn,net,tsnet,wgengine: make an eventbus mandatory where it is used In the components where an event bus is already plumbed through, remove the exceptions that allow it to be omitted, and update all the tests that relied on those workarounds execute properly. This change applies only to the places where we're already using the bus; it does not enforce the existence of a bus in other components (yet), Updates #15160 Change-Id: Iebb92243caba82b5eb420c49fc3e089a77454f65 Signed-off-by: M. J. Fromberger --- ipn/ipnlocal/dnsconfig_test.go | 2 +- ipn/ipnlocal/local.go | 11 +-- ipn/ipnlocal/local_test.go | 6 ++ ipn/ipnlocal/network-lock_test.go | 26 ++++--- ipn/ipnlocal/peerapi_test.go | 102 +++++++++++++-------------- net/portmapper/igd_test.go | 11 ++- net/portmapper/portmapper.go | 38 +++++----- net/udprelay/server.go | 3 + tsnet/tsnet.go | 5 +- wgengine/magicsock/magicsock.go | 39 +++++----- wgengine/magicsock/magicsock_test.go | 13 ++-- 11 files changed, 133 insertions(+), 123 deletions(-) diff --git a/ipn/ipnlocal/dnsconfig_test.go b/ipn/ipnlocal/dnsconfig_test.go index c0f5b25f3..71f175148 100644 --- a/ipn/ipnlocal/dnsconfig_test.go +++ b/ipn/ipnlocal/dnsconfig_test.go @@ -377,7 +377,7 @@ func peersMap(s []tailcfg.NodeView) map[tailcfg.NodeID]tailcfg.NodeView { } func TestAllowExitNodeDNSProxyToServeName(t *testing.T) { - b := &LocalBackend{} + b := newTestLocalBackend(t) if b.allowExitNodeDNSProxyToServeName("google.com") { t.Fatal("unexpected true on backend with nil NetMap") } diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 7154b942c..bf13b2ac1 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -99,7 +99,6 @@ import ( "tailscale.com/util/clientmetric" "tailscale.com/util/deephash" "tailscale.com/util/dnsname" - "tailscale.com/util/eventbus" "tailscale.com/util/goroutines" "tailscale.com/util/httpm" "tailscale.com/util/mak" @@ -618,15 +617,7 @@ func (b *LocalBackend) currentNode() *nodeBackend { if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() { return v } - // Auto-init [nodeBackend] in tests for LocalBackend created without the - // NewLocalBackend() constructor. Same reasoning for checking b.sys. - var bus *eventbus.Bus - if b.sys == nil { - bus = eventbus.New() - } else { - bus = b.sys.Bus.Get() - } - v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus) + v := newNodeBackend(cmp.Or(b.ctx, context.Background()), b.sys.Bus.Get()) if b.currentNodeAtomic.CompareAndSwap(nil, v) { v.ready() } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index dd2837022..b3853a617 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -463,6 +463,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { var logf logger.Logf = logger.Discard if _, ok := sys.StateStore.GetOK(); !ok { sys.Set(new(mem.Store)) + t.Log("Added memory store for testing") } if _, ok := sys.Engine.GetOK(); !ok { eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) @@ -471,6 +472,11 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { } t.Cleanup(eng.Close) sys.Set(eng) + t.Log("Added fake userspace engine for testing") + } + if _, ok := sys.Dialer.GetOK(); !ok { + sys.Set(tsdial.NewDialer(netmon.NewStatic())) + t.Log("Added static dialer for testing") } lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0) if err != nil { diff --git a/ipn/ipnlocal/network-lock_test.go b/ipn/ipnlocal/network-lock_test.go index 838f16cb9..443539aec 100644 --- a/ipn/ipnlocal/network-lock_test.go +++ b/ipn/ipnlocal/network-lock_test.go @@ -28,6 +28,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/tailcfg" "tailscale.com/tka" + "tailscale.com/tsd" "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/persist" @@ -935,18 +936,21 @@ func TestTKAForceDisable(t *testing.T) { defer ts.Close() cc := fakeControlClient(t, client) - b := LocalBackend{ - varRoot: temp, - cc: cc, - ccAuto: cc, - logf: t.Logf, - tka: &tkaState{ - authority: authority, - storage: chonk, - }, - pm: pm, - store: pm.Store(), + sys := tsd.NewSystem() + sys.Set(pm.Store()) + + b := newTestLocalBackendWithSys(t, sys) + b.SetVarRoot(temp) + b.SetControlClientGetterForTesting(func(controlclient.Options) (controlclient.Client, error) { + return cc, nil + }) + b.mu.Lock() + b.tka = &tkaState{ + authority: authority, + storage: chonk, } + b.pm = pm + b.mu.Unlock() if err := b.NetworkLockForceLocalDisable(); err != nil { t.Fatalf("NetworkLockForceLocalDisable() failed: %v", err) diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index d8655afa0..5654cf277 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -21,10 +21,10 @@ import ( "tailscale.com/ipn" "tailscale.com/ipn/store/mem" "tailscale.com/tailcfg" + "tailscale.com/tsd" "tailscale.com/tstest" "tailscale.com/types/logger" "tailscale.com/types/netmap" - "tailscale.com/util/eventbus" "tailscale.com/util/must" "tailscale.com/util/usermetric" "tailscale.com/wgengine" @@ -156,10 +156,9 @@ func TestHandlePeerAPI(t *testing.T) { selfNode.CapMap = tailcfg.NodeCapMap{tailcfg.CapabilityDebug: nil} } var e peerAPITestEnv - lb := &LocalBackend{ - logf: e.logBuf.Logf, - clock: &tstest.Clock{}, - } + lb := newTestLocalBackend(t) + lb.logf = e.logBuf.Logf + lb.clock = &tstest.Clock{} lb.currentNode().SetNetMap(&netmap.NetworkMap{SelfNode: selfNode.View()}) e.ph = &peerAPIHandler{ isSelf: tt.isSelf, @@ -195,20 +194,20 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) { h.isSelf = false h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) ht := new(health.Tracker) - reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - }, - } + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + + h.ps = &peerAPIServer{b: b} if h.ps.b.OfferingExitNode() { t.Fatal("unexpectedly offering exit node") } @@ -250,12 +249,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) ht := new(health.Tracker) reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -263,16 +262,14 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { } else { a = appc.NewAppConnector(t.Logf, &appctest.RouteCollector{}, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - // configure as an app connector just to enable the API. - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a // configure as an app connector just to enable the API. + + h.ps = &peerAPIServer{b: b} h.ps.resolver = &fakeResolver{build: func(b *dnsmessage.Builder) { b.CNAMEResource( dnsmessage.ResourceHeader{ @@ -326,27 +323,29 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) + rc := &appctest.RouteCollector{} ht := new(health.Tracker) - reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) + + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) var a *appc.AppConnector if shouldStore { a = appc.NewAppConnector(t.Logf, rc, &appc.RouteInfo{}, fakeStoreRoutes) } else { a = appc.NewAppConnector(t.Logf, rc, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a + + h.ps = &peerAPIServer{b: b} h.ps.b.appConnector.UpdateDomains([]string{"example.com"}) h.ps.b.appConnector.Wait(ctx) @@ -393,12 +392,13 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) + ht := new(health.Tracker) reg := new(usermetric.Registry) rc := &appctest.RouteCollector{} - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -406,14 +406,14 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { } else { a = appc.NewAppConnector(t.Logf, rc, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a + + h.ps = &peerAPIServer{b: b} h.ps.b.appConnector.UpdateDomains([]string{"www.example.com"}) h.ps.b.appConnector.Wait(ctx) diff --git a/net/portmapper/igd_test.go b/net/portmapper/igd_test.go index 3ef7989a3..cca87e0b8 100644 --- a/net/portmapper/igd_test.go +++ b/net/portmapper/igd_test.go @@ -263,16 +263,21 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) { } // newTestClient configures a new test client connected to igd for mapping updates. -// If bus != nil, update events are published to it. -// A cleanup for the resulting client is added to t. +// If bus == nil, a new empty event bus is constructed that is cleaned up when t exits. +// A cleanup for the resulting client is also added to t. func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client { + if bus == nil { + bus = eventbus.New() + t.Log("Created empty event bus for test client") + t.Cleanup(bus.Close) + } var c *Client c = NewClient(Config{ Logf: tstest.WhileTestRunningLogger(t), NetMon: netmon.NewStatic(), ControlKnobs: new(controlknobs.Knobs), EventBus: bus, - OnChange: func() { + OnChange: func() { // TODO(creachadair): Remove. t.Logf("port map changed") t.Logf("have mapping: %v", c.HaveMapping()) }, diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go index 1c6c7634b..6c798c5a9 100644 --- a/net/portmapper/portmapper.go +++ b/net/portmapper/portmapper.go @@ -85,7 +85,7 @@ const trustServiceStillAvailableDuration = 10 * time.Minute // Client is a port mapping client. type Client struct { - // The following two fields must either both be nil, or both non-nil. + // The following two fields must both be non-nil. // Both are immutable after construction. pubClient *eventbus.Client updates *eventbus.Publisher[Mapping] @@ -238,8 +238,11 @@ type Config struct { // NewClient constructs a new portmapping [Client] from c. It will panic if any // required parameters are omitted. func NewClient(c Config) *Client { - if c.NetMon == nil { - panic("nil netMon") + switch { + case c.NetMon == nil: + panic("nil NetMon") + case c.EventBus == nil: + panic("nil EventBus") } ret := &Client{ logf: c.Logf, @@ -248,10 +251,8 @@ func NewClient(c Config) *Client { onChange: c.OnChange, controlKnobs: c.ControlKnobs, } - if c.EventBus != nil { - ret.pubClient = c.EventBus.Client("portmapper") - ret.updates = eventbus.Publish[Mapping](ret.pubClient) - } + ret.pubClient = c.EventBus.Client("portmapper") + ret.updates = eventbus.Publish[Mapping](ret.pubClient) if ret.logf == nil { ret.logf = logger.Discard } @@ -286,10 +287,9 @@ func (c *Client) Close() error { } c.closed = true c.invalidateMappingsLocked(true) - if c.updates != nil { - c.updates.Close() - c.pubClient.Close() - } + c.updates.Close() + c.pubClient.Close() + // TODO: close some future ever-listening UDP socket(s), // waiting for multicast announcements from router. return nil @@ -508,14 +508,14 @@ func (c *Client) createMapping() { } return } - if c.updates != nil { - c.updates.Publish(Mapping{ - External: mapping.External(), - Type: mapping.MappingType(), - GoodUntil: mapping.GoodUntil(), - }) - } - if c.onChange != nil && c.pubClient == nil { + c.updates.Publish(Mapping{ + External: mapping.External(), + Type: mapping.MappingType(), + GoodUntil: mapping.GoodUntil(), + }) + // TODO(creachadair): Remove this entirely once there are no longer any + // places where the callback is set. + if c.onChange != nil { go c.onChange() } } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index c34a4b5f6..aece3bc59 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -291,6 +291,9 @@ func NewServer(logf logger.Logf, port int, overrideAddrs []netip.Addr) (s *Serve s.vniPool = append(s.vniPool, uint32(i)) } + // TODO(creachadair): Find a way to plumb this in during initialization. + // As-written, messages published here will not be seen by other components + // in a running client. bus := eventbus.New() s.bus = bus netMon, err := netmon.New(s.bus, logf) diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index 65367f235..d81dec7d6 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -435,10 +435,7 @@ func (s *Server) Close() error { ln.closeLocked() } wg.Wait() - - if bus := s.sys.Bus.Get(); bus != nil { - bus.Close() - } + s.sys.Bus.Get().Close() s.closed = true return nil } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index fb7f5edcb..2c1fb1cbf 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -715,8 +715,11 @@ func (c *Conn) Synchronize() { // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. func NewConn(opts Options) (*Conn, error) { - if opts.NetMon == nil { + switch { + case opts.NetMon == nil: return nil, errors.New("magicsock.Options.NetMon must be non-nil") + case opts.EventBus == nil: + return nil, errors.New("magicsock.Options.EventBus must be non-nil") } c := newConn(opts.logf()) @@ -729,22 +732,20 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity - if c.eventBus != nil { - c.eventClient = c.eventBus.Client("magicsock.Conn") + c.eventClient = c.eventBus.Client("magicsock.Conn") - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient) - c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) - c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) - c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) - c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) - c.syncPub = eventbus.Publish[syncPoint](c.eventClient) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) - c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) - c.subsDoneCh = make(chan struct{}) - go c.consumeEventbusTopics() - } + // Subscribe calls must return before NewConn otherwise published + // events can be missed. + c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient) + c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) + c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) + c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) + c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) + c.syncPub = eventbus.Publish[syncPoint](c.eventClient) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) + c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) + c.subsDoneCh = make(chan struct{}) + go c.consumeEventbusTopics() // Don't log the same log messages possibly every few seconds in our // portmapper. @@ -3327,10 +3328,8 @@ func (c *Conn) Close() error { // deadlock with c.Close(). // 2. Conn.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress Conn.Close() behaviors. - if c.eventClient != nil { - c.eventClient.Close() - <-c.subsDoneCh - } + c.eventClient.Close() + <-c.subsDoneCh c.mu.Lock() defer c.mu.Unlock() diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 8a09df27d..480faa694 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -179,7 +179,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen t.Helper() bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logf) if err != nil { @@ -191,6 +191,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary conn, err := NewConn(Options{ NetMon: netMon, + EventBus: bus, Metrics: ®, Logf: logf, HealthTracker: ht, @@ -406,7 +407,7 @@ func TestNewConn(t *testing.T) { } bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -424,6 +425,7 @@ func TestNewConn(t *testing.T) { EndpointsFunc: epFunc, Logf: t.Logf, NetMon: netMon, + EventBus: bus, Metrics: new(usermetric.Registry), }) if err != nil { @@ -542,7 +544,7 @@ func TestDeviceStartStop(t *testing.T) { tstest.ResourceCheck(t) bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -554,6 +556,7 @@ func TestDeviceStartStop(t *testing.T) { EndpointsFunc: func(eps []tailcfg.Endpoint) {}, Logf: t.Logf, NetMon: netMon, + EventBus: bus, Metrics: new(usermetric.Registry), }) if err != nil { @@ -1349,7 +1352,7 @@ func newTestConn(t testing.TB) *Conn { port := pickPort(t) bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -1359,6 +1362,7 @@ func newTestConn(t testing.TB) *Conn { conn, err := NewConn(Options{ NetMon: netMon, + EventBus: bus, HealthTracker: new(health.Tracker), Metrics: new(usermetric.Registry), DisablePortMapper: true, @@ -3147,6 +3151,7 @@ func TestNetworkDownSendErrors(t *testing.T) { Logf: t.Logf, NetMon: netMon, Metrics: reg, + EventBus: bus, })) defer conn.Close()