ipnlocal,magicsock: decouple magicsock from ipnlocal

We had to introduce a direct synchronization, but we can have the config
happen as a callback.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl 2025-07-15 11:54:53 -04:00
parent c572442548
commit 5b2e3f181c
No known key found for this signature in database
GPG Key ID: 060429CBEC62B1B4
2 changed files with 74 additions and 66 deletions

View File

@ -203,6 +203,8 @@ type LocalBackend struct {
keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change
sys *tsd.System
eventbus *eventbus.Bus
eventClient *eventbus.Client
health *health.Tracker // always non-nil
metrics metrics
e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys
@ -428,6 +430,8 @@ type LocalBackend struct {
//
// See tailscale/corp#29969.
overrideExitNodePolicy bool
magicSockConfChangeSub *eventbus.Subscriber[magicsock.ConfigurationChanged]
}
// HealthTracker returns the health tracker for the backend.
@ -533,7 +537,12 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool),
}
nb := newNodeBackend(ctx, b.sys.Bus.Get())
if bus, ok := sys.Bus.GetOK(); ok {
b.eventbus = bus
b.eventClient = bus.Client("ipnlocal.LocalBackend")
b.magicSockConfChangeSub = eventbus.Subscribe[magicsock.ConfigurationChanged](b.eventClient)
}
nb := newNodeBackend(ctx, b.eventbus)
b.currentNodeAtomic.Store(nb)
nb.ready()
@ -606,6 +615,21 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
return b, nil
}
func (b *LocalBackend) consumeEventbusTopics() {
for {
select {
case <-b.ctx.Done():
b.magicSockConfChangeSub.Close()
return
case <-b.magicSockConfChangeSub.Events():
if b.ctx.Err() != nil {
return
}
go b.authReconfig()
}
}
}
func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
func (b *LocalBackend) Sys() *tsd.System { return b.sys }
@ -621,10 +645,10 @@ func (b *LocalBackend) currentNode() *nodeBackend {
// Auto-init [nodeBackend] in tests for LocalBackend created without the
// NewLocalBackend() constructor. Same reasoning for checking b.sys.
var bus *eventbus.Bus
if b.sys == nil {
if b.eventbus == nil {
bus = eventbus.New()
} else {
bus = b.sys.Bus.Get()
bus = b.eventbus
}
v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus)
if b.currentNodeAtomic.CompareAndSwap(nil, v) {
@ -1765,9 +1789,6 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
b.setAuthURL(st.URL)
}
b.stateMachine()
// This is currently (2020-07-28) necessary; conditionally disabling it is fragile!
// This is where netmap information gets propagated to router and magicsock.
b.authReconfig()
}
type preferencePolicyInfo struct {
@ -2283,6 +2304,7 @@ func (b *LocalBackend) getNewControlClientFuncLocked() clientGen {
// initOnce is called on the first call to [LocalBackend.Start].
func (b *LocalBackend) initOnce() {
go b.consumeEventbusTopics()
b.extHost.Init()
}
@ -4443,7 +4465,6 @@ func (b *LocalBackend) changeDisablesExitNodeLocked(prefs ipn.PrefsView, change
// but wasn't empty before, then the change disables
// exit node usage.
return tmpPrefs.ExitNodeID == ""
}
// adjustEditPrefsLocked applies additional changes to mp if necessary,
@ -5108,11 +5129,6 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() {
// updates are not currently blocked, based on the cached netmap and
// user prefs.
func (b *LocalBackend) authReconfig() {
// Wait for magicsock to process pending [eventbus] events,
// such as netmap updates. This should be completed before
// wireguard-go is reconfigured. See tailscale/tailscale#16369.
b.MagicConn().Synchronize()
b.mu.Lock()
blocked := b.blocked
prefs := b.pm.CurrentPrefs()
@ -7358,7 +7374,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err
// down, so no need to do any work.
return nil
}
newNode := newNodeBackend(b.ctx, b.sys.Bus.Get())
newNode := newNodeBackend(b.ctx, b.eventbus)
if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil {
oldNode.shutdown(errNodeContextChanged)
}
@ -7697,10 +7713,8 @@ var (
// allowedAutoRoute determines if the route being added via AdvertiseRoute (the app connector featuge) should be allowed.
func allowedAutoRoute(ipp netip.Prefix) bool {
// Note: blocking the addrs for globals, not solely the prefixes.
for _, addr := range disallowedAddrs {
if ipp.Addr() == addr {
return false
}
if slices.Contains(disallowedAddrs, ipp.Addr()) {
return false
}
for _, pfx := range disallowedRanges {
if pfx.Overlaps(ipp) {
@ -8134,7 +8148,6 @@ func isAllowedAutoExitNodeID(exitNodeID tailcfg.StableNodeID) bool {
}
if nodes, _ := syspolicy.GetStringArray(syspolicy.AllowedSuggestedExitNodes, nil); nodes != nil {
return slices.Contains(nodes, string(exitNodeID))
}
return true // no policy configured; allow all exit nodes
}
@ -8278,9 +8291,7 @@ func (b *LocalBackend) vipServicesFromPrefsLocked(prefs ipn.PrefsView) []*tailcf
return servicesList
}
var (
metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus")
)
var metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus")
func (b *LocalBackend) stateEncrypted() opt.Bool {
switch runtime.GOOS {

View File

@ -14,6 +14,7 @@ import (
"expvar"
"fmt"
"io"
"maps"
"net"
"net/netip"
"reflect"
@ -179,11 +180,11 @@ type Conn struct {
filterSub *eventbus.Subscriber[FilterUpdate]
nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate]
nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate]
syncSub *eventbus.Subscriber[syncPoint]
syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp]
configChangedPub *eventbus.Publisher[ConfigurationChanged]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
netInfoPub *eventbus.Publisher[tailcfg.NetInfo]
// pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock
@ -423,6 +424,8 @@ type Conn struct {
// metrics contains the metrics for the magicsock instance.
metrics *metrics
hasReconfigured chan any
}
// SetDebugLoggingEnabled controls whether spammy debug logging is enabled.
@ -562,20 +565,7 @@ type FilterUpdate struct {
*filter.Filter
}
// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize].
// It serves as a synchronization point, allowing to wait until magicsock
// has processed all pending events.
type syncPoint chan struct{}
// Wait blocks until [syncPoint.Signal] is called.
func (s syncPoint) Wait() {
<-s
}
// Signal signals the sync point, unblocking the [syncPoint.Wait] call.
func (s syncPoint) Signal() {
close(s)
}
type ConfigurationChanged struct{}
// UDPRelayAllocReq represents a [*disco.AllocateUDPRelayEndpointRequest]
// reception event. This is signaled over an [eventbus.Bus] from
@ -612,15 +602,16 @@ type UDPRelayAllocResp struct {
func newConn(logf logger.Logf) *Conn {
discoPrivate := key.NewDisco()
c := &Conn{
logf: logf,
derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736
derpStarted: make(chan struct{}),
peerLastDerp: make(map[key.NodePublic]int),
peerMap: newPeerMap(),
discoInfo: make(map[key.DiscoPublic]*discoInfo),
discoPrivate: discoPrivate,
discoPublic: discoPrivate.Public(),
cloudInfo: newCloudInfo(logf),
logf: logf,
derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736
derpStarted: make(chan struct{}),
peerLastDerp: make(map[key.NodePublic]int),
peerMap: newPeerMap(),
discoInfo: make(map[key.DiscoPublic]*discoInfo),
discoPrivate: discoPrivate,
discoPublic: discoPrivate.Public(),
cloudInfo: newCloudInfo(logf),
hasReconfigured: make(chan any, 25),
}
c.discoShort = c.discoPublic.ShortString()
c.bind = &connBind{Conn: c, closed: true}
@ -658,15 +649,22 @@ func (c *Conn) consumeEventbusTopics() {
c.onPortMapChanged()
case filterUpdate := <-c.filterSub.Events():
c.onFilterUpdate(filterUpdate)
c.hasReconfigured <- new(any)
case nodeViews := <-c.nodeViewsSub.Events():
c.onNodeViewsUpdate(nodeViews)
c.hasReconfigured <- new(any)
case nodeMuts := <-c.nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
case syncPoint := <-c.syncSub.Events():
c.dlogf("magicsock: received sync point after reconfig")
syncPoint.Signal()
case allocResp := <-c.allocRelayEndpointSub.Events():
c.onUDPRelayAllocResp(allocResp)
c.hasReconfigured <- new(any)
case <-c.hasReconfigured:
c.dlogf("magicsock: configuration has changed")
// Drain channel as we only want to reconfigure once
for len(c.hasReconfigured) > 0 {
<-c.hasReconfigured
}
c.configChangedPub.Publish(ConfigurationChanged{})
}
}
}
@ -699,18 +697,6 @@ func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
go c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, disco.key, allocResp.Message, discoVerboseLog)
}
// Synchronize waits for all [eventbus] events published
// prior to this call to be processed by the receiver.
func (c *Conn) Synchronize() {
if c.syncPub == nil {
// Eventbus is not used; no need to synchronize (in certain tests).
return
}
sp := syncPoint(make(chan struct{}))
c.syncPub.Publish(sp)
sp.Wait()
}
// NewConn creates a magic Conn listening on opts.Port.
// As the set of possible endpoints for a Conn changes, the
// callback opts.EndpointsFunc is called.
@ -738,10 +724,10 @@ func NewConn(opts Options) (*Conn, error) {
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient)
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
c.netInfoPub = eventbus.Publish[tailcfg.NetInfo](c.eventClient)
c.configChangedPub = eventbus.Publish[ConfigurationChanged](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
}
@ -1121,12 +1107,21 @@ func (c *Conn) callNetInfoCallback(ni *tailcfg.NetInfo) {
func (c *Conn) callNetInfoCallbackLocked(ni *tailcfg.NetInfo) {
c.netInfoLast = ni
c.publishNetInfo(ni)
if c.netInfoFunc != nil {
c.dlogf("[v1] magicsock: netInfo update: %+v", ni)
go c.netInfoFunc(ni)
}
}
func (c *Conn) publishNetInfo(ni *tailcfg.NetInfo) {
if c.netInfoPub != nil {
newNetInfo := *ni
newNetInfo.DERPLatency = maps.Clone(ni.DERPLatency)
c.netInfoPub.Publish(newNetInfo)
}
}
// addValidDiscoPathForTest makes addr a validated disco address for
// discoKey. It's used in tests to enable receiving of packets from
// addr without having to spin up the entire active discovery
@ -4097,9 +4092,11 @@ type lazyEndpoint struct {
src epAddr
}
var _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil)
var _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil)
var _ conn.Endpoint = (*lazyEndpoint)(nil)
var (
_ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil)
_ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil)
_ conn.Endpoint = (*lazyEndpoint)(nil)
)
// InitiationMessagePublicKey implements [conn.InitiationAwareEndpoint].
// wireguard-go calls us here if we passed it a [*lazyEndpoint] for an