ipn/ipnlocal,wgengine/magicsock: use eventbus for node & filter updates (#16271)

nodeBackend now publishes filter and node changes to eventbus topics
that are consumed by magicsock.Conn

Updates tailscale/corp#27502
Updates tailscale/corp#29543

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-06-16 08:42:09 -07:00 committed by GitHub
parent 42da161b19
commit 8e6f63cf11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 108 additions and 25 deletions

View File

@ -98,6 +98,7 @@ import (
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/deephash" "tailscale.com/util/deephash"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"tailscale.com/util/eventbus"
"tailscale.com/util/goroutines" "tailscale.com/util/goroutines"
"tailscale.com/util/httpm" "tailscale.com/util/httpm"
"tailscale.com/util/mak" "tailscale.com/util/mak"
@ -514,7 +515,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool), needsCaptiveDetection: make(chan bool),
} }
nb := newNodeBackend(ctx) nb := newNodeBackend(ctx, b.sys.Bus.Get())
b.currentNodeAtomic.Store(nb) b.currentNodeAtomic.Store(nb)
nb.ready() nb.ready()
@ -599,8 +600,15 @@ func (b *LocalBackend) currentNode() *nodeBackend {
if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() { if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() {
return v return v
} }
// Auto-init one in tests for LocalBackend created without the NewLocalBackend constructor... // Auto-init [nodeBackend] in tests for LocalBackend created without the
v := newNodeBackend(cmp.Or(b.ctx, context.Background())) // NewLocalBackend() constructor. Same reasoning for checking b.sys.
var bus *eventbus.Bus
if b.sys == nil {
bus = eventbus.New()
} else {
bus = b.sys.Bus.Get()
}
v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus)
if b.currentNodeAtomic.CompareAndSwap(nil, v) { if b.currentNodeAtomic.CompareAndSwap(nil, v) {
v.ready() v.ready()
} }
@ -7009,7 +7017,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err
// down, so no need to do any work. // down, so no need to do any work.
return nil return nil
} }
newNode := newNodeBackend(b.ctx) newNode := newNodeBackend(b.ctx, b.sys.Bus.Get())
if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil { if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil {
oldNode.shutdown(errNodeContextChanged) oldNode.shutdown(errNodeContextChanged)
} }

View File

@ -23,9 +23,11 @@ import (
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak" "tailscale.com/util/mak"
"tailscale.com/util/slicesx" "tailscale.com/util/slicesx"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
"tailscale.com/wgengine/magicsock"
) )
// nodeBackend is node-specific [LocalBackend] state. It is usually the current node. // nodeBackend is node-specific [LocalBackend] state. It is usually the current node.
@ -69,6 +71,11 @@ type nodeBackend struct {
// replaced with a new one. // replaced with a new one.
filterAtomic atomic.Pointer[filter.Filter] filterAtomic atomic.Pointer[filter.Filter]
// initialized once and immutable
eventClient *eventbus.Client
filterUpdates *eventbus.Publisher[magicsock.FilterUpdate]
nodeUpdates *eventbus.Publisher[magicsock.NodeAddrsHostInfoUpdate]
// TODO(nickkhyl): maybe use sync.RWMutex? // TODO(nickkhyl): maybe use sync.RWMutex?
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
@ -95,16 +102,20 @@ type nodeBackend struct {
nodeByAddr map[netip.Addr]tailcfg.NodeID nodeByAddr map[netip.Addr]tailcfg.NodeID
} }
func newNodeBackend(ctx context.Context) *nodeBackend { func newNodeBackend(ctx context.Context, bus *eventbus.Bus) *nodeBackend {
ctx, ctxCancel := context.WithCancelCause(ctx) ctx, ctxCancel := context.WithCancelCause(ctx)
nb := &nodeBackend{ nb := &nodeBackend{
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
readyCh: make(chan struct{}), eventClient: bus.Client("ipnlocal.nodeBackend"),
readyCh: make(chan struct{}),
} }
// Default filter blocks everything and logs nothing. // Default filter blocks everything and logs nothing.
noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{}) noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{})
nb.filterAtomic.Store(noneFilter) nb.filterAtomic.Store(noneFilter)
nb.filterUpdates = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient)
nb.nodeUpdates = eventbus.Publish[magicsock.NodeAddrsHostInfoUpdate](nb.eventClient)
nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()})
return nb return nb
} }
@ -418,9 +429,16 @@ func (nb *nodeBackend) updatePeersLocked() {
nb.peers[k] = tailcfg.NodeView{} nb.peers[k] = tailcfg.NodeView{}
} }
changed := magicsock.NodeAddrsHostInfoUpdate{
Complete: true,
}
// Second pass, add everything wanted. // Second pass, add everything wanted.
for _, p := range nm.Peers { for _, p := range nm.Peers {
mak.Set(&nb.peers, p.ID(), p) mak.Set(&nb.peers, p.ID(), p)
mak.Set(&changed.NodesByID, p.ID(), magicsock.NodeAddrsHostInfo{
Addresses: p.Addresses(),
Hostinfo: p.Hostinfo(),
})
} }
// Third pass, remove deleted things. // Third pass, remove deleted things.
@ -429,6 +447,7 @@ func (nb *nodeBackend) updatePeersLocked() {
delete(nb.peers, k) delete(nb.peers, k)
} }
} }
nb.nodeUpdates.Publish(changed)
} }
func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
@ -443,6 +462,9 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
// call (e.g. its endpoints + online status both change) // call (e.g. its endpoints + online status both change)
var mutableNodes map[tailcfg.NodeID]*tailcfg.Node var mutableNodes map[tailcfg.NodeID]*tailcfg.Node
changed := magicsock.NodeAddrsHostInfoUpdate{
Complete: false,
}
for _, m := range muts { for _, m := range muts {
n, ok := mutableNodes[m.NodeIDBeingMutated()] n, ok := mutableNodes[m.NodeIDBeingMutated()]
if !ok { if !ok {
@ -457,8 +479,14 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
m.Apply(n) m.Apply(n)
} }
for nid, n := range mutableNodes { for nid, n := range mutableNodes {
nb.peers[nid] = n.View() nv := n.View()
nb.peers[nid] = nv
mak.Set(&changed.NodesByID, nid, magicsock.NodeAddrsHostInfo{
Addresses: nv.Addresses(),
Hostinfo: nv.Hostinfo(),
})
} }
nb.nodeUpdates.Publish(changed)
return true return true
} }
@ -480,6 +508,7 @@ func (nb *nodeBackend) filter() *filter.Filter {
func (nb *nodeBackend) setFilter(f *filter.Filter) { func (nb *nodeBackend) setFilter(f *filter.Filter) {
nb.filterAtomic.Store(f) nb.filterAtomic.Store(f)
nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: f})
} }
func (nb *nodeBackend) dnsConfigForNetmap(prefs ipn.PrefsView, selfExpired bool, logf logger.Logf, versionOS string) *dns.Config { func (nb *nodeBackend) dnsConfigForNetmap(prefs ipn.PrefsView, selfExpired bool, logf logger.Logf, versionOS string) *dns.Config {
@ -545,6 +574,7 @@ func (nb *nodeBackend) doShutdown(cause error) {
defer nb.mu.Unlock() defer nb.mu.Unlock()
nb.ctxCancel(cause) nb.ctxCancel(cause)
nb.readyCh = nil nb.readyCh = nil
nb.eventClient.Close()
} }
// dnsConfigForNetmap returns a *dns.Config for the given netmap, // dnsConfigForNetmap returns a *dns.Config for the given netmap,

View File

@ -8,10 +8,12 @@ import (
"errors" "errors"
"testing" "testing"
"time" "time"
"tailscale.com/util/eventbus"
) )
func TestNodeBackendReadiness(t *testing.T) { func TestNodeBackendReadiness(t *testing.T) {
nb := newNodeBackend(t.Context()) nb := newNodeBackend(t.Context(), eventbus.New())
// The node backend is not ready until [nodeBackend.ready] is called, // The node backend is not ready until [nodeBackend.ready] is called,
// and [nodeBackend.Wait] should fail with [context.DeadlineExceeded]. // and [nodeBackend.Wait] should fail with [context.DeadlineExceeded].
@ -42,7 +44,7 @@ func TestNodeBackendReadiness(t *testing.T) {
} }
func TestNodeBackendShutdown(t *testing.T) { func TestNodeBackendShutdown(t *testing.T) {
nb := newNodeBackend(t.Context()) nb := newNodeBackend(t.Context(), eventbus.New())
shutdownCause := errors.New("test shutdown") shutdownCause := errors.New("test shutdown")
@ -80,7 +82,7 @@ func TestNodeBackendShutdown(t *testing.T) {
} }
func TestNodeBackendReadyAfterShutdown(t *testing.T) { func TestNodeBackendReadyAfterShutdown(t *testing.T) {
nb := newNodeBackend(t.Context()) nb := newNodeBackend(t.Context(), eventbus.New())
shutdownCause := errors.New("test shutdown") shutdownCause := errors.New("test shutdown")
nb.shutdown(shutdownCause) nb.shutdown(shutdownCause)
@ -92,7 +94,7 @@ func TestNodeBackendReadyAfterShutdown(t *testing.T) {
func TestNodeBackendParentContextCancellation(t *testing.T) { func TestNodeBackendParentContextCancellation(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
nb := newNodeBackend(ctx) nb := newNodeBackend(ctx, eventbus.New())
cancelCtx() cancelCtx()
@ -109,7 +111,7 @@ func TestNodeBackendParentContextCancellation(t *testing.T) {
} }
func TestNodeBackendConcurrentReadyAndShutdown(t *testing.T) { func TestNodeBackendConcurrentReadyAndShutdown(t *testing.T) {
nb := newNodeBackend(t.Context()) nb := newNodeBackend(t.Context(), eventbus.New())
// Calling [nodeBackend.ready] and [nodeBackend.shutdown] concurrently // Calling [nodeBackend.ready] and [nodeBackend.shutdown] concurrently
// should not cause issues, and [nodeBackend.Wait] should unblock, // should not cause issues, and [nodeBackend.Wait] should unblock,

View File

@ -63,6 +63,7 @@ import (
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/testenv" "tailscale.com/util/testenv"
"tailscale.com/util/usermetric" "tailscale.com/util/usermetric"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/wgint" "tailscale.com/wgengine/wgint"
) )
@ -502,6 +503,30 @@ func (o *Options) derpActiveFunc() func() {
return o.DERPActiveFunc return o.DERPActiveFunc
} }
// NodeAddrsHostInfoUpdate represents an update event of the addresses and
// [tailcfg.HostInfoView] for a node set. This event is published over an
// [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as of 2025-06. If
// you are adding more subscribers consider moving this type out of magicsock.
type NodeAddrsHostInfoUpdate struct {
NodesByID map[tailcfg.NodeID]NodeAddrsHostInfo
Complete bool // true if NodesByID contains all known nodes, false if it may be a subset
}
// NodeAddrsHostInfo represents the addresses and [tailcfg.HostinfoView] for a
// Tailscale node.
type NodeAddrsHostInfo struct {
Addresses views.Slice[netip.Prefix]
Hostinfo tailcfg.HostinfoView
}
// FilterUpdate represents an update event for a [*filter.Filter]. This event is
// signaled over an [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as
// of 2025-06. If you are adding more subscribers consider moving this type out
// of magicsock.
type FilterUpdate struct {
*filter.Filter
}
// newConn is the error-free, network-listening-side-effect-free based // newConn is the error-free, network-listening-side-effect-free based
// of NewConn. Mostly for tests. // of NewConn. Mostly for tests.
func newConn(logf logger.Logf) *Conn { func newConn(logf logger.Logf) *Conn {
@ -535,6 +560,20 @@ func newConn(logf logger.Logf) *Conn {
return c return c
} }
// consumeEventbusTopic consumes events from sub and passes them to
// handlerFn until sub.Done() is closed.
func consumeEventbusTopic[T any](sub *eventbus.Subscriber[T], handlerFn func(t T)) {
defer sub.Close()
for {
select {
case evt := <-sub.Events():
handlerFn(evt)
case <-sub.Done():
return
}
}
}
// NewConn creates a magic Conn listening on opts.Port. // NewConn creates a magic Conn listening on opts.Port.
// As the set of possible endpoints for a Conn changes, the // As the set of possible endpoints for a Conn changes, the
// callback opts.EndpointsFunc is called. // callback opts.EndpointsFunc is called.
@ -562,17 +601,17 @@ func NewConn(opts Options) (*Conn, error) {
c.eventClient = c.eventBus.Client("magicsock.Conn") c.eventClient = c.eventBus.Client("magicsock.Conn")
pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient) pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient)
go func() { go consumeEventbusTopic(pmSub, func(_ portmapper.Mapping) {
defer pmSub.Close() c.onPortMapChanged()
for { })
select { filterSub := eventbus.Subscribe[FilterUpdate](c.eventClient)
case <-pmSub.Events(): go consumeEventbusTopic(filterSub, func(t FilterUpdate) {
c.onPortMapChanged() // TODO(jwhited): implement
case <-pmSub.Done(): })
return nodeSub := eventbus.Subscribe[NodeAddrsHostInfoUpdate](c.eventClient)
} go consumeEventbusTopic(nodeSub, func(t NodeAddrsHostInfoUpdate) {
} // TODO(jwhited): implement
}() })
// Disable the explicit callback from the portmapper, the subscriber handles it. // Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil onPortMapChanged = nil
@ -2798,6 +2837,10 @@ func (c *connBind) Close() error {
return nil return nil
} }
c.closed = true c.closed = true
// Close the [eventbus.Client].
if c.eventClient != nil {
c.eventClient.Close()
}
// Unblock all outstanding receives. // Unblock all outstanding receives.
c.pconn4.Close() c.pconn4.Close()
c.pconn6.Close() c.pconn6.Close()