diff --git a/wgengine/router/router_linux.go b/wgengine/router/router_linux.go index a9edd7f96..dc1425708 100644 --- a/wgengine/router/router_linux.go +++ b/wgengine/router/router_linux.go @@ -49,8 +49,7 @@ type linuxRouter struct { tunname string netMon *netmon.Monitor health *health.Tracker - eventClient *eventbus.Client - ruleDeletedSub *eventbus.Subscriber[netmon.RuleDeleted] + eventSubs eventbus.Monitor rulesAddedPub *eventbus.Publisher[AddIPRules] unregNetMon func() addrs map[netip.Prefix]bool @@ -100,7 +99,6 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon tunname: tunname, netfilterMode: netfilterOff, netMon: netMon, - eventClient: bus.Client("router-linux"), health: health, cmd: cmd, @@ -108,9 +106,9 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon ipRuleFixLimiter: rate.NewLimiter(rate.Every(5*time.Second), 10), ipPolicyPrefBase: 5200, } - r.ruleDeletedSub = eventbus.Subscribe[netmon.RuleDeleted](r.eventClient) - r.rulesAddedPub = eventbus.Publish[AddIPRules](r.eventClient) - go r.consumeEventbusTopics() + ec := bus.Client("router-linux") + r.rulesAddedPub = eventbus.Publish[AddIPRules](ec) + r.eventSubs = ec.Monitor(r.consumeEventbusTopics(ec)) if r.useIPCommand() { r.ipRuleAvailable = (cmd.run("ip", "rule") == nil) @@ -159,13 +157,16 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the // [eventbus.Client] is closed. -func (r *linuxRouter) consumeEventbusTopics() { - for { - select { - case <-r.eventClient.Done(): - return - case rulesDeleted := <-r.ruleDeletedSub.Events(): - r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority) +func (r *linuxRouter) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { + ruleDeletedSub := eventbus.Subscribe[netmon.RuleDeleted](ec) + return func(ec *eventbus.Client) { + for { + select { + case <-ec.Done(): + return + case rs := <-ruleDeletedSub.Events(): + r.onIPRuleDeleted(rs.Table, rs.Priority) + } } } } @@ -362,7 +363,7 @@ func (r *linuxRouter) Close() error { if r.unregNetMon != nil { r.unregNetMon() } - r.eventClient.Close() + r.eventSubs.Close() if err := r.downInterface(); err != nil { return err }