diff --git a/wgengine/router/osrouter/router_linux.go b/wgengine/router/osrouter/router_linux.go index 835a9050f..58bd0513a 100644 --- a/wgengine/router/osrouter/router_linux.go +++ b/wgengine/router/osrouter/router_linux.go @@ -60,7 +60,7 @@ type linuxRouter struct { tunname string netMon *netmon.Monitor health *health.Tracker - eventSubs eventbus.Monitor + eventClient *eventbus.Client rulesAddedPub *eventbus.Publisher[AddIPRules] unregNetMon func() @@ -120,7 +120,16 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon } ec := bus.Client("router-linux") r.rulesAddedPub = eventbus.Publish[AddIPRules](ec) - r.eventSubs = ec.Monitor(r.consumeEventbusTopics(ec)) + eventbus.SubscribeFunc(ec, func(rs netmon.RuleDeleted) { + r.onIPRuleDeleted(rs.Table, rs.Priority) + }) + eventbus.SubscribeFunc(ec, func(pu router.PortUpdate) { + r.logf("portUpdate(port=%v, network=%s)", pu.UDPPort, pu.EndpointNetwork) + if err := r.updateMagicsockPort(pu.UDPPort, pu.EndpointNetwork); err != nil { + r.logf("updateMagicsockPort(port=%v, network=%s) failed: %v", pu.UDPPort, pu.EndpointNetwork, err) + } + }) + r.eventClient = ec if r.useIPCommand() { r.ipRuleAvailable = (cmd.run("ip", "rule") == nil) @@ -164,31 +173,6 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon return r, nil } -// consumeEventbusTopics consumes events from all [Conn]-relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (r *linuxRouter) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { - ruleDeletedSub := eventbus.Subscribe[netmon.RuleDeleted](ec) - portUpdateSub := eventbus.Subscribe[router.PortUpdate](ec) - return func(ec *eventbus.Client) { - for { - select { - case <-ec.Done(): - return - case rs := <-ruleDeletedSub.Events(): - r.onIPRuleDeleted(rs.Table, rs.Priority) - case pu := <-portUpdateSub.Events(): - r.logf("portUpdate(port=%v, network=%s)", pu.UDPPort, pu.EndpointNetwork) - if err := r.updateMagicsockPort(pu.UDPPort, pu.EndpointNetwork); err != nil { - r.logf("updateMagicsockPort(port=%v, network=%s) failed: %v", pu.UDPPort, pu.EndpointNetwork, err) - } - } - } - } -} - // ipCmdSupportsFwmask returns true if the system 'ip' binary supports using a // fwmark stanza with a mask specified. To our knowledge, everything except busybox // pre-1.33 supports this. @@ -385,7 +369,7 @@ func (r *linuxRouter) Close() error { if r.unregNetMon != nil { r.unregNetMon() } - r.eventSubs.Close() + r.eventClient.Close() if err := r.downInterface(); err != nil { return err }