wgengine/router: rely on events for deleted IP rules (#16744)

Adds the eventbus to the router subsystem.

The event is currently only used on linux.

Also includes facilities to inject events into the bus.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl
2025-08-05 08:31:51 -04:00
committed by GitHub
parent b0018f1e7d
commit 5bb42e3018
15 changed files with 132 additions and 82 deletions

View File

@@ -29,6 +29,7 @@ import (
"tailscale.com/types/logger"
"tailscale.com/types/opt"
"tailscale.com/types/preftype"
"tailscale.com/util/eventbus"
"tailscale.com/util/linuxfw"
"tailscale.com/util/multierr"
"tailscale.com/version/distro"
@@ -48,6 +49,9 @@ type linuxRouter struct {
tunname string
netMon *netmon.Monitor
health *health.Tracker
eventClient *eventbus.Client
ruleDeletedSub *eventbus.Subscriber[netmon.RuleDeleted]
rulesAddedPub *eventbus.Publisher[AddIPRules]
unregNetMon func()
addrs map[netip.Prefix]bool
routes map[netip.Prefix]bool
@@ -77,7 +81,7 @@ type linuxRouter struct {
magicsockPortV6 uint16
}
func newUserspaceRouter(logf logger.Logf, tunDev tun.Device, netMon *netmon.Monitor, health *health.Tracker) (Router, error) {
func newUserspaceRouter(logf logger.Logf, tunDev tun.Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus) (Router, error) {
tunname, err := tunDev.Name()
if err != nil {
return nil, err
@@ -87,15 +91,16 @@ func newUserspaceRouter(logf logger.Logf, tunDev tun.Device, netMon *netmon.Moni
ambientCapNetAdmin: useAmbientCaps(),
}
return newUserspaceRouterAdvanced(logf, tunname, netMon, cmd, health)
return newUserspaceRouterAdvanced(logf, tunname, netMon, cmd, health, bus)
}
func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon.Monitor, cmd commandRunner, health *health.Tracker) (Router, error) {
func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon.Monitor, cmd commandRunner, health *health.Tracker, bus *eventbus.Bus) (Router, error) {
r := &linuxRouter{
logf: logf,
tunname: tunname,
netfilterMode: netfilterOff,
netMon: netMon,
eventClient: bus.Client("router-linux"),
health: health,
cmd: cmd,
@@ -103,6 +108,10 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
ipRuleFixLimiter: rate.NewLimiter(rate.Every(5*time.Second), 10),
ipPolicyPrefBase: 5200,
}
r.ruleDeletedSub = eventbus.Subscribe[netmon.RuleDeleted](r.eventClient)
r.rulesAddedPub = eventbus.Publish[AddIPRules](r.eventClient)
go r.consumeEventbusTopics()
if r.useIPCommand() {
r.ipRuleAvailable = (cmd.run("ip", "rule") == nil)
} else {
@@ -145,6 +154,24 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
return r, nil
}
// consumeEventbusTopics consumes events from all [Conn]-relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [portmapper.Mapping] subscriber is closed, which is interpreted to be the
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
// all open or all closed).
func (r *linuxRouter) consumeEventbusTopics() {
for {
select {
case <-r.ruleDeletedSub.Done():
return
case rulesDeleted := <-r.ruleDeletedSub.Events():
r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority)
}
}
}
// ipCmdSupportsFwmask returns true if the system 'ip' binary supports using a
// fwmark stanza with a mask specified. To our knowledge, everything except busybox
// pre-1.33 supports this.
@@ -276,6 +303,10 @@ func (r *linuxRouter) fwmaskWorks() bool {
return v
}
// AddIPRules is used as an event signal to signify that rules have been added.
// It is added to aid testing, but could be extended if there's a reason for it.
type AddIPRules struct{}
// onIPRuleDeleted is the callback from the network monitor for when an IP
// policy rule is deleted. See Issue 1591.
//
@@ -303,6 +334,9 @@ func (r *linuxRouter) onIPRuleDeleted(table uint8, priority uint32) {
r.ruleRestorePending.Swap(false)
return
}
r.rulesAddedPub.Publish(AddIPRules{})
time.AfterFunc(rr.Delay()+250*time.Millisecond, func() {
if r.ruleRestorePending.Swap(false) && !r.closed.Load() {
r.logf("somebody (likely systemd-networkd) deleted ip rules; restoring Tailscale's")
@@ -312,9 +346,6 @@ func (r *linuxRouter) onIPRuleDeleted(table uint8, priority uint32) {
}
func (r *linuxRouter) Up() error {
if r.unregNetMon == nil && r.netMon != nil {
r.unregNetMon = r.netMon.RegisterRuleDeleteCallback(r.onIPRuleDeleted)
}
if err := r.setNetfilterMode(netfilterOff); err != nil {
return fmt.Errorf("setting netfilter mode: %w", err)
}
@@ -333,6 +364,7 @@ func (r *linuxRouter) Close() error {
if r.unregNetMon != nil {
r.unregNetMon()
}
r.eventClient.Close()
if err := r.downInterface(); err != nil {
return err
}
@@ -1276,7 +1308,6 @@ func (r *linuxRouter) justAddIPRules() error {
}
var errAcc error
for _, family := range r.addrFamilies() {
for _, ru := range ipRules() {
// Note: r is a value type here; safe to mutate it.
ru.Family = family.netlinkInt()