diff --git a/ipn/ipnlocal/dnsconfig_test.go b/ipn/ipnlocal/dnsconfig_test.go index c0f5b25f3..71f175148 100644 --- a/ipn/ipnlocal/dnsconfig_test.go +++ b/ipn/ipnlocal/dnsconfig_test.go @@ -377,7 +377,7 @@ func peersMap(s []tailcfg.NodeView) map[tailcfg.NodeID]tailcfg.NodeView { } func TestAllowExitNodeDNSProxyToServeName(t *testing.T) { - b := &LocalBackend{} + b := newTestLocalBackend(t) if b.allowExitNodeDNSProxyToServeName("google.com") { t.Fatal("unexpected true on backend with nil NetMap") } diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 7154b942c..bf13b2ac1 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -99,7 +99,6 @@ import ( "tailscale.com/util/clientmetric" "tailscale.com/util/deephash" "tailscale.com/util/dnsname" - "tailscale.com/util/eventbus" "tailscale.com/util/goroutines" "tailscale.com/util/httpm" "tailscale.com/util/mak" @@ -618,15 +617,7 @@ func (b *LocalBackend) currentNode() *nodeBackend { if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() { return v } - // Auto-init [nodeBackend] in tests for LocalBackend created without the - // NewLocalBackend() constructor. Same reasoning for checking b.sys. - var bus *eventbus.Bus - if b.sys == nil { - bus = eventbus.New() - } else { - bus = b.sys.Bus.Get() - } - v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus) + v := newNodeBackend(cmp.Or(b.ctx, context.Background()), b.sys.Bus.Get()) if b.currentNodeAtomic.CompareAndSwap(nil, v) { v.ready() } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index dd2837022..b3853a617 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -463,6 +463,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { var logf logger.Logf = logger.Discard if _, ok := sys.StateStore.GetOK(); !ok { sys.Set(new(mem.Store)) + t.Log("Added memory store for testing") } if _, ok := sys.Engine.GetOK(); !ok { eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) @@ -471,6 +472,11 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { } t.Cleanup(eng.Close) sys.Set(eng) + t.Log("Added fake userspace engine for testing") + } + if _, ok := sys.Dialer.GetOK(); !ok { + sys.Set(tsdial.NewDialer(netmon.NewStatic())) + t.Log("Added static dialer for testing") } lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0) if err != nil { diff --git a/ipn/ipnlocal/network-lock_test.go b/ipn/ipnlocal/network-lock_test.go index 838f16cb9..443539aec 100644 --- a/ipn/ipnlocal/network-lock_test.go +++ b/ipn/ipnlocal/network-lock_test.go @@ -28,6 +28,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/tailcfg" "tailscale.com/tka" + "tailscale.com/tsd" "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/persist" @@ -935,18 +936,21 @@ func TestTKAForceDisable(t *testing.T) { defer ts.Close() cc := fakeControlClient(t, client) - b := LocalBackend{ - varRoot: temp, - cc: cc, - ccAuto: cc, - logf: t.Logf, - tka: &tkaState{ - authority: authority, - storage: chonk, - }, - pm: pm, - store: pm.Store(), + sys := tsd.NewSystem() + sys.Set(pm.Store()) + + b := newTestLocalBackendWithSys(t, sys) + b.SetVarRoot(temp) + b.SetControlClientGetterForTesting(func(controlclient.Options) (controlclient.Client, error) { + return cc, nil + }) + b.mu.Lock() + b.tka = &tkaState{ + authority: authority, + storage: chonk, } + b.pm = pm + b.mu.Unlock() if err := b.NetworkLockForceLocalDisable(); err != nil { t.Fatalf("NetworkLockForceLocalDisable() failed: %v", err) diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index d8655afa0..5654cf277 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -21,10 +21,10 @@ import ( "tailscale.com/ipn" "tailscale.com/ipn/store/mem" "tailscale.com/tailcfg" + "tailscale.com/tsd" "tailscale.com/tstest" "tailscale.com/types/logger" "tailscale.com/types/netmap" - "tailscale.com/util/eventbus" "tailscale.com/util/must" "tailscale.com/util/usermetric" "tailscale.com/wgengine" @@ -156,10 +156,9 @@ func TestHandlePeerAPI(t *testing.T) { selfNode.CapMap = tailcfg.NodeCapMap{tailcfg.CapabilityDebug: nil} } var e peerAPITestEnv - lb := &LocalBackend{ - logf: e.logBuf.Logf, - clock: &tstest.Clock{}, - } + lb := newTestLocalBackend(t) + lb.logf = e.logBuf.Logf + lb.clock = &tstest.Clock{} lb.currentNode().SetNetMap(&netmap.NetworkMap{SelfNode: selfNode.View()}) e.ph = &peerAPIHandler{ isSelf: tt.isSelf, @@ -195,20 +194,20 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) { h.isSelf = false h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) ht := new(health.Tracker) - reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - }, - } + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + + h.ps = &peerAPIServer{b: b} if h.ps.b.OfferingExitNode() { t.Fatal("unexpectedly offering exit node") } @@ -250,12 +249,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) ht := new(health.Tracker) reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -263,16 +262,14 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { } else { a = appc.NewAppConnector(t.Logf, &appctest.RouteCollector{}, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - // configure as an app connector just to enable the API. - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a // configure as an app connector just to enable the API. + + h.ps = &peerAPIServer{b: b} h.ps.resolver = &fakeResolver{build: func(b *dnsmessage.Builder) { b.CNAMEResource( dnsmessage.ResourceHeader{ @@ -326,27 +323,29 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) + rc := &appctest.RouteCollector{} ht := new(health.Tracker) - reg := new(usermetric.Registry) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) + + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) var a *appc.AppConnector if shouldStore { a = appc.NewAppConnector(t.Logf, rc, &appc.RouteInfo{}, fakeStoreRoutes) } else { a = appc.NewAppConnector(t.Logf, rc, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a + + h.ps = &peerAPIServer{b: b} h.ps.b.appConnector.UpdateDomains([]string{"example.com"}) h.ps.b.appConnector.Wait(ctx) @@ -393,12 +392,13 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - bus := eventbus.New() - defer bus.Close() + sys := tsd.NewSystem() + t.Cleanup(sys.Bus.Get().Close) + ht := new(health.Tracker) reg := new(usermetric.Registry) rc := &appctest.RouteCollector{} - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -406,14 +406,14 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { } else { a = appc.NewAppConnector(t.Logf, rc, nil, nil) } - h.ps = &peerAPIServer{ - b: &LocalBackend{ - e: eng, - pm: pm, - store: pm.Store(), - appConnector: a, - }, - } + sys.Set(pm.Store()) + sys.Set(eng) + + b := newTestLocalBackendWithSys(t, sys) + b.pm = pm + b.appConnector = a + + h.ps = &peerAPIServer{b: b} h.ps.b.appConnector.UpdateDomains([]string{"www.example.com"}) h.ps.b.appConnector.Wait(ctx) diff --git a/net/portmapper/igd_test.go b/net/portmapper/igd_test.go index 3ef7989a3..cca87e0b8 100644 --- a/net/portmapper/igd_test.go +++ b/net/portmapper/igd_test.go @@ -263,16 +263,21 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) { } // newTestClient configures a new test client connected to igd for mapping updates. -// If bus != nil, update events are published to it. -// A cleanup for the resulting client is added to t. +// If bus == nil, a new empty event bus is constructed that is cleaned up when t exits. +// A cleanup for the resulting client is also added to t. func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client { + if bus == nil { + bus = eventbus.New() + t.Log("Created empty event bus for test client") + t.Cleanup(bus.Close) + } var c *Client c = NewClient(Config{ Logf: tstest.WhileTestRunningLogger(t), NetMon: netmon.NewStatic(), ControlKnobs: new(controlknobs.Knobs), EventBus: bus, - OnChange: func() { + OnChange: func() { // TODO(creachadair): Remove. t.Logf("port map changed") t.Logf("have mapping: %v", c.HaveMapping()) }, diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go index 1c6c7634b..6c798c5a9 100644 --- a/net/portmapper/portmapper.go +++ b/net/portmapper/portmapper.go @@ -85,7 +85,7 @@ const trustServiceStillAvailableDuration = 10 * time.Minute // Client is a port mapping client. type Client struct { - // The following two fields must either both be nil, or both non-nil. + // The following two fields must both be non-nil. // Both are immutable after construction. pubClient *eventbus.Client updates *eventbus.Publisher[Mapping] @@ -238,8 +238,11 @@ type Config struct { // NewClient constructs a new portmapping [Client] from c. It will panic if any // required parameters are omitted. func NewClient(c Config) *Client { - if c.NetMon == nil { - panic("nil netMon") + switch { + case c.NetMon == nil: + panic("nil NetMon") + case c.EventBus == nil: + panic("nil EventBus") } ret := &Client{ logf: c.Logf, @@ -248,10 +251,8 @@ func NewClient(c Config) *Client { onChange: c.OnChange, controlKnobs: c.ControlKnobs, } - if c.EventBus != nil { - ret.pubClient = c.EventBus.Client("portmapper") - ret.updates = eventbus.Publish[Mapping](ret.pubClient) - } + ret.pubClient = c.EventBus.Client("portmapper") + ret.updates = eventbus.Publish[Mapping](ret.pubClient) if ret.logf == nil { ret.logf = logger.Discard } @@ -286,10 +287,9 @@ func (c *Client) Close() error { } c.closed = true c.invalidateMappingsLocked(true) - if c.updates != nil { - c.updates.Close() - c.pubClient.Close() - } + c.updates.Close() + c.pubClient.Close() + // TODO: close some future ever-listening UDP socket(s), // waiting for multicast announcements from router. return nil @@ -508,14 +508,14 @@ func (c *Client) createMapping() { } return } - if c.updates != nil { - c.updates.Publish(Mapping{ - External: mapping.External(), - Type: mapping.MappingType(), - GoodUntil: mapping.GoodUntil(), - }) - } - if c.onChange != nil && c.pubClient == nil { + c.updates.Publish(Mapping{ + External: mapping.External(), + Type: mapping.MappingType(), + GoodUntil: mapping.GoodUntil(), + }) + // TODO(creachadair): Remove this entirely once there are no longer any + // places where the callback is set. + if c.onChange != nil { go c.onChange() } } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index c34a4b5f6..aece3bc59 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -291,6 +291,9 @@ func NewServer(logf logger.Logf, port int, overrideAddrs []netip.Addr) (s *Serve s.vniPool = append(s.vniPool, uint32(i)) } + // TODO(creachadair): Find a way to plumb this in during initialization. + // As-written, messages published here will not be seen by other components + // in a running client. bus := eventbus.New() s.bus = bus netMon, err := netmon.New(s.bus, logf) diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index 65367f235..d81dec7d6 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -435,10 +435,7 @@ func (s *Server) Close() error { ln.closeLocked() } wg.Wait() - - if bus := s.sys.Bus.Get(); bus != nil { - bus.Close() - } + s.sys.Bus.Get().Close() s.closed = true return nil } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index fb7f5edcb..2c1fb1cbf 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -715,8 +715,11 @@ func (c *Conn) Synchronize() { // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. func NewConn(opts Options) (*Conn, error) { - if opts.NetMon == nil { + switch { + case opts.NetMon == nil: return nil, errors.New("magicsock.Options.NetMon must be non-nil") + case opts.EventBus == nil: + return nil, errors.New("magicsock.Options.EventBus must be non-nil") } c := newConn(opts.logf()) @@ -729,22 +732,20 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity - if c.eventBus != nil { - c.eventClient = c.eventBus.Client("magicsock.Conn") + c.eventClient = c.eventBus.Client("magicsock.Conn") - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient) - c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) - c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) - c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) - c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) - c.syncPub = eventbus.Publish[syncPoint](c.eventClient) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) - c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) - c.subsDoneCh = make(chan struct{}) - go c.consumeEventbusTopics() - } + // Subscribe calls must return before NewConn otherwise published + // events can be missed. + c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient) + c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) + c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) + c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) + c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) + c.syncPub = eventbus.Publish[syncPoint](c.eventClient) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) + c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) + c.subsDoneCh = make(chan struct{}) + go c.consumeEventbusTopics() // Don't log the same log messages possibly every few seconds in our // portmapper. @@ -3327,10 +3328,8 @@ func (c *Conn) Close() error { // deadlock with c.Close(). // 2. Conn.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress Conn.Close() behaviors. - if c.eventClient != nil { - c.eventClient.Close() - <-c.subsDoneCh - } + c.eventClient.Close() + <-c.subsDoneCh c.mu.Lock() defer c.mu.Unlock() diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 8a09df27d..480faa694 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -179,7 +179,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen t.Helper() bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logf) if err != nil { @@ -191,6 +191,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary conn, err := NewConn(Options{ NetMon: netMon, + EventBus: bus, Metrics: ®, Logf: logf, HealthTracker: ht, @@ -406,7 +407,7 @@ func TestNewConn(t *testing.T) { } bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -424,6 +425,7 @@ func TestNewConn(t *testing.T) { EndpointsFunc: epFunc, Logf: t.Logf, NetMon: netMon, + EventBus: bus, Metrics: new(usermetric.Registry), }) if err != nil { @@ -542,7 +544,7 @@ func TestDeviceStartStop(t *testing.T) { tstest.ResourceCheck(t) bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -554,6 +556,7 @@ func TestDeviceStartStop(t *testing.T) { EndpointsFunc: func(eps []tailcfg.Endpoint) {}, Logf: t.Logf, NetMon: netMon, + EventBus: bus, Metrics: new(usermetric.Registry), }) if err != nil { @@ -1349,7 +1352,7 @@ func newTestConn(t testing.TB) *Conn { port := pickPort(t) bus := eventbus.New() - defer bus.Close() + t.Cleanup(bus.Close) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -1359,6 +1362,7 @@ func newTestConn(t testing.TB) *Conn { conn, err := NewConn(Options{ NetMon: netMon, + EventBus: bus, HealthTracker: new(health.Tracker), Metrics: new(usermetric.Registry), DisablePortMapper: true, @@ -3147,6 +3151,7 @@ func TestNetworkDownSendErrors(t *testing.T) { Logf: t.Logf, NetMon: netMon, Metrics: reg, + EventBus: bus, })) defer conn.Close()