net/netmon: publish events to event bus

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
David Anderson
2025-03-19 10:47:25 -07:00
committed by M. J. Fromberger
parent 6d6f69e735
commit 5399fa159a
41 changed files with 196 additions and 56 deletions

View File

@@ -7,10 +7,14 @@ import (
"bytes"
"fmt"
"testing"
"tailscale.com/util/eventbus"
)
func TestLinkChangeLogLimiter(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -16,6 +16,7 @@ import (
"tailscale.com/types/logger"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/set"
)
@@ -50,7 +51,10 @@ type osMon interface {
// Monitor represents a monitoring instance.
type Monitor struct {
logf logger.Logf
logf logger.Logf
b *eventbus.Client
changed *eventbus.Publisher[*ChangeDelta]
om osMon // nil means not supported on this platform
change chan bool // send false to wake poller, true to also force ChangeDeltas be sent
stop chan struct{} // closed on Stop
@@ -114,21 +118,23 @@ type ChangeDelta struct {
// New instantiates and starts a monitoring instance.
// The returned monitor is inactive until it's started by the Start method.
// Use RegisterChangeCallback to get notified of network changes.
func New(logf logger.Logf) (*Monitor, error) {
func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) {
logf = logger.WithPrefix(logf, "monitor: ")
m := &Monitor{
logf: logf,
b: bus.Client("netmon"),
change: make(chan bool, 1),
stop: make(chan struct{}),
lastWall: wallTime(),
}
m.changed = eventbus.Publish[*ChangeDelta](m.b)
st, err := m.interfaceStateUncached()
if err != nil {
return nil, err
}
m.ifState = st
m.om, err = newOSMon(logf, m)
m.om, err = newOSMon(bus, logf, m)
if err != nil {
return nil, err
}
@@ -465,6 +471,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) {
if delta.TimeJumped {
metricChangeTimeJump.Add(1)
}
m.changed.Publish(delta)
for _, cb := range m.cbs {
go cb(delta)
}

View File

@@ -13,6 +13,7 @@ import (
"golang.org/x/sys/unix"
"tailscale.com/net/netaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
const debugRouteMessages = false
@@ -24,7 +25,7 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false }
func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, _ *Monitor) (osMon, error) {
fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0)
if err != nil {
return nil, err

View File

@@ -10,6 +10,7 @@ import (
"strings"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
// unspecifiedMessage is a minimal message implementation that should not
@@ -24,7 +25,7 @@ type devdConn struct {
conn net.Conn
}
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe")
if err != nil {
logf("devd dial error: %v, falling back to polling method", err)

View File

@@ -16,6 +16,7 @@ import (
"tailscale.com/envknob"
"tailscale.com/net/tsaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
var debugNetlinkMessages = envknob.RegisterBool("TS_DEBUG_NETLINK")
@@ -27,15 +28,26 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false }
// RuleDeleted reports that one of Tailscale's policy routing rules
// was deleted.
type RuleDeleted struct {
// Table is the table number that the deleted rule referenced.
Table uint8
// Priority is the lookup priority of the deleted rule.
Priority uint32
}
// nlConn wraps a *netlink.Conn and returns a monitor.Message
// instead of a netlink.Message. Currently, messages are discarded,
// but down the line, when messages trigger different logic depending
// on the type of event, this provides the capability of handling
// each architecture-specific message in a generic fashion.
type nlConn struct {
logf logger.Logf
conn *netlink.Conn
buffered []netlink.Message
busClient *eventbus.Client
rulesDeleted *eventbus.Publisher[RuleDeleted]
logf logger.Logf
conn *netlink.Conn
buffered []netlink.Message
// addrCache maps interface indices to a set of addresses, and is
// used to suppress duplicate RTM_NEWADDR messages. It is populated
@@ -44,7 +56,7 @@ type nlConn struct {
addrCache map[uint32]map[netip.Addr]bool
}
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(bus *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{
// Routes get us most of the events of interest, but we need
// address as well to cover things like DHCP deciding to give
@@ -59,12 +71,22 @@ func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
return newPollingMon(logf, m)
}
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil
client := bus.Client("netmon-iprules")
return &nlConn{
busClient: client,
rulesDeleted: eventbus.Publish[RuleDeleted](client),
logf: logf,
conn: conn,
addrCache: make(map[uint32]map[netip.Addr]bool),
}, nil
}
func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
func (c *nlConn) Close() error { return c.conn.Close() }
func (c *nlConn) Close() error {
c.busClient.Close()
return c.conn.Close()
}
func (c *nlConn) Receive() (message, error) {
if len(c.buffered) == 0 {
@@ -219,6 +241,10 @@ func (c *nlConn) Receive() (message, error) {
// On `ip -4 rule del pref 5210 table main`, logs:
// monitor: ip rule deleted: {Family:2 DstLength:0 SrcLength:0 Tos:0 Table:254 Protocol:0 Scope:0 Type:1 Flags:0 Attributes:{Dst:<nil> Src:<nil> Gateway:<nil> OutIface:0 Priority:5210 Table:254 Mark:4294967295 Expires:<nil> Metrics:<nil> Multipath:[]}}
}
c.rulesDeleted.Publish(RuleDeleted{
Table: rmsg.Table,
Priority: rmsg.Attributes.Priority,
})
rdm := ipRuleDeletedMessage{
table: rmsg.Table,
priority: rmsg.Attributes.Priority,

View File

@@ -7,9 +7,10 @@ package netmon
import (
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
func newOSMon(logf logger.Logf, m *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, m *Monitor) (osMon, error) {
return newPollingMon(logf, m)
}

View File

@@ -11,11 +11,15 @@ import (
"testing"
"time"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak"
)
func TestMonitorStartClose(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -26,7 +30,10 @@ func TestMonitorStartClose(t *testing.T) {
}
func TestMonitorJustClose(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -36,7 +43,10 @@ func TestMonitorJustClose(t *testing.T) {
}
func TestMonitorInjectEvent(t *testing.T) {
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}
@@ -71,7 +81,11 @@ func TestMonitorMode(t *testing.T) {
default:
t.Skipf(`invalid --monitor value: must be "raw" or "callback"`)
}
mon, err := New(t.Logf)
bus := eventbus.New()
defer bus.Close()
mon, err := New(bus, t.Logf)
if err != nil {
t.Fatal(err)
}

View File

@@ -13,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/windows/tunnel/winipcfg"
"tailscale.com/net/tsaddr"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
)
var (
@@ -45,7 +46,7 @@ type winMon struct {
noDeadlockTicker *time.Ticker
}
func newOSMon(logf logger.Logf, pm *Monitor) (osMon, error) {
func newOSMon(_ *eventbus.Bus, logf logger.Logf, pm *Monitor) (osMon, error) {
m := &winMon{
logf: logf,
isActive: pm.isActive,