net/netmon: add socket reopen to darwin route monitor

As we see offset based errors from ParseRIB, it is possible that we lose
track of framing on the socket. Restart the socket when this may have
occurred.

This change contains typically hairy concurrency around concurrent fd
handling. In the future we should change the netmon interface to have a
PAL that hides this kind of behavior to implicitly provide a concurrency
safe interface, in particular look to replace the receive API with
something more natural to the go concurrency environment.

Updates #14201

Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
James Tucker 2025-02-03 10:50:55 -08:00
parent 66f36e3bb2
commit 0d92be0cdf
No known key found for this signature in database

View File

@ -5,9 +5,11 @@ package netmon
import ( import (
"fmt" "fmt"
"io"
"net/netip" "net/netip"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"golang.org/x/net/route" "golang.org/x/net/route"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
@ -25,34 +27,87 @@ type unspecifiedMessage struct{}
func (unspecifiedMessage) ignore() bool { return false } func (unspecifiedMessage) ignore() bool { return false }
func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) { func newOSMon(logf logger.Logf, _ *Monitor) (osMon, error) {
fd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0) m := &darwinRouteMon{
if err != nil { logf: logf,
}
m.fd.Store(fdNeedReopen)
if err := m.reopen(); err != nil {
return nil, err return nil, err
} }
return &darwinRouteMon{ return m, nil
logf: logf,
fd: fd,
}, nil
} }
const (
fdNeedReopen = -1
fdClosed = -2
)
type darwinRouteMon struct { type darwinRouteMon struct {
logf logger.Logf logf logger.Logf
fd int // AF_ROUTE socket buf [2 << 10]byte
buf [2 << 10]byte mu sync.Mutex // synchronizes reopen and read
closeOnce sync.Once fd atomic.Int64 // AF_ROUTE socket, -1 when not open, -2 when closed
}
func (m *darwinRouteMon) reopen() error {
m.mu.Lock()
defer m.mu.Unlock()
fd := m.fd.Swap(fdNeedReopen)
if fd == fdClosed {
return io.EOF
}
if fd >= 0 {
unix.Close(int(fd))
}
nfd, err := unix.Socket(unix.AF_ROUTE, unix.SOCK_RAW, 0)
if err != nil {
return err
}
m.fd.Store(int64(nfd))
return nil
}
func (m *darwinRouteMon) closeForReopen() {
m.mu.Lock()
defer m.mu.Unlock()
fd := m.fd.Swap(fdNeedReopen)
if fd >= 0 {
unix.Close(int(fd))
}
}
func (m *darwinRouteMon) read(buf []byte) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
fd := m.fd.Load()
if fd == fdNeedReopen {
if err := m.reopen(); err != nil {
return 0, err
}
}
if fd == fdClosed {
return 0, io.EOF
}
n, err := unix.Read(int(fd), buf)
return n, err
} }
func (m *darwinRouteMon) Close() error { func (m *darwinRouteMon) Close() error {
var err error // close can not take m.mu, otherwise it could be blocked by reopen or read.
m.closeOnce.Do(func() { fd := m.fd.Swap(fdClosed)
err = unix.Close(m.fd) if fd >= 0 {
}) return unix.Close(int(fd))
return err }
return nil
} }
func (m *darwinRouteMon) Receive() (message, error) { func (m *darwinRouteMon) Receive() (message, error) {
// Future hazard: the reopen logic assumes that the receive caller has some
// kind of back-off. At the time of writing it will spam this at most at
// 1hz. If changed carelessly, the reopen logic here can turn this into a
// hot loop instead.
for { for {
n, err := unix.Read(m.fd, m.buf[:]) n, err := m.read(m.buf[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,6 +128,7 @@ func (m *darwinRouteMon) Receive() (message, error) {
if debugRouteMessages { if debugRouteMessages {
m.logf("read %d bytes (% 02x), failed to parse RIB: %v", n, m.buf[:n], err) m.logf("read %d bytes (% 02x), failed to parse RIB: %v", n, m.buf[:n], err)
} }
m.closeForReopen()
return unspecifiedMessage{}, nil return unspecifiedMessage{}, nil
} }
if len(msgs) == 0 { if len(msgs) == 0 {