ipn,net,tsnet,wgengine: make an eventbus mandatory where it is used (#16594)

In the components where an event bus is already plumbed through, remove the
exceptions that allow it to be omitted, and update all the tests that relied on
those workarounds execute properly.

This change applies only to the places where we're already using the bus; it
does not enforce the existence of a bus in other components (yet),

Updates #15160

Change-Id: Iebb92243caba82b5eb420c49fc3e089a77454f65
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger
2025-07-29 09:04:08 -07:00
committed by GitHub
parent e5e4386f33
commit b34cdc9710
11 changed files with 133 additions and 123 deletions

View File

@@ -715,8 +715,11 @@ func (c *Conn) Synchronize() {
// As the set of possible endpoints for a Conn changes, the
// callback opts.EndpointsFunc is called.
func NewConn(opts Options) (*Conn, error) {
if opts.NetMon == nil {
switch {
case opts.NetMon == nil:
return nil, errors.New("magicsock.Options.NetMon must be non-nil")
case opts.EventBus == nil:
return nil, errors.New("magicsock.Options.EventBus must be non-nil")
}
c := newConn(opts.logf())
@@ -729,22 +732,20 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
if c.eventBus != nil {
c.eventClient = c.eventBus.Client("magicsock.Conn")
c.eventClient = c.eventBus.Client("magicsock.Conn")
// Subscribe calls must return before NewConn otherwise published
// events can be missed.
c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient)
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient)
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
}
// Subscribe calls must return before NewConn otherwise published
// events can be missed.
c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient)
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient)
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
// Don't log the same log messages possibly every few seconds in our
// portmapper.
@@ -3327,10 +3328,8 @@ func (c *Conn) Close() error {
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
if c.eventClient != nil {
c.eventClient.Close()
<-c.subsDoneCh
}
c.eventClient.Close()
<-c.subsDoneCh
c.mu.Lock()
defer c.mu.Unlock()

View File

@@ -179,7 +179,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
t.Helper()
bus := eventbus.New()
defer bus.Close()
t.Cleanup(bus.Close)
netMon, err := netmon.New(bus, logf)
if err != nil {
@@ -191,6 +191,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary
conn, err := NewConn(Options{
NetMon: netMon,
EventBus: bus,
Metrics: &reg,
Logf: logf,
HealthTracker: ht,
@@ -406,7 +407,7 @@ func TestNewConn(t *testing.T) {
}
bus := eventbus.New()
defer bus.Close()
t.Cleanup(bus.Close)
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
@@ -424,6 +425,7 @@ func TestNewConn(t *testing.T) {
EndpointsFunc: epFunc,
Logf: t.Logf,
NetMon: netMon,
EventBus: bus,
Metrics: new(usermetric.Registry),
})
if err != nil {
@@ -542,7 +544,7 @@ func TestDeviceStartStop(t *testing.T) {
tstest.ResourceCheck(t)
bus := eventbus.New()
defer bus.Close()
t.Cleanup(bus.Close)
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
@@ -554,6 +556,7 @@ func TestDeviceStartStop(t *testing.T) {
EndpointsFunc: func(eps []tailcfg.Endpoint) {},
Logf: t.Logf,
NetMon: netMon,
EventBus: bus,
Metrics: new(usermetric.Registry),
})
if err != nil {
@@ -1349,7 +1352,7 @@ func newTestConn(t testing.TB) *Conn {
port := pickPort(t)
bus := eventbus.New()
defer bus.Close()
t.Cleanup(bus.Close)
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
if err != nil {
@@ -1359,6 +1362,7 @@ func newTestConn(t testing.TB) *Conn {
conn, err := NewConn(Options{
NetMon: netMon,
EventBus: bus,
HealthTracker: new(health.Tracker),
Metrics: new(usermetric.Registry),
DisablePortMapper: true,
@@ -3147,6 +3151,7 @@ func TestNetworkDownSendErrors(t *testing.T) {
Logf: t.Logf,
NetMon: netMon,
Metrics: reg,
EventBus: bus,
}))
defer conn.Close()