From 8e6f63cf110364b52e3f6c232b23196f16484473 Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Mon, 16 Jun 2025 08:42:09 -0700 Subject: [PATCH] ipn/ipnlocal,wgengine/magicsock: use eventbus for node & filter updates (#16271) nodeBackend now publishes filter and node changes to eventbus topics that are consumed by magicsock.Conn Updates tailscale/corp#27502 Updates tailscale/corp#29543 Signed-off-by: Jordan Whited --- ipn/ipnlocal/local.go | 16 ++++++-- ipn/ipnlocal/node_backend.go | 40 ++++++++++++++++--- ipn/ipnlocal/node_backend_test.go | 12 +++--- wgengine/magicsock/magicsock.go | 65 +++++++++++++++++++++++++------ 4 files changed, 108 insertions(+), 25 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index daedb1e19..cd30e92bb 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -98,6 +98,7 @@ import ( "tailscale.com/util/clientmetric" "tailscale.com/util/deephash" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus" "tailscale.com/util/goroutines" "tailscale.com/util/httpm" "tailscale.com/util/mak" @@ -514,7 +515,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), } - nb := newNodeBackend(ctx) + nb := newNodeBackend(ctx, b.sys.Bus.Get()) b.currentNodeAtomic.Store(nb) nb.ready() @@ -599,8 +600,15 @@ func (b *LocalBackend) currentNode() *nodeBackend { if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() { return v } - // Auto-init one in tests for LocalBackend created without the NewLocalBackend constructor... - v := newNodeBackend(cmp.Or(b.ctx, context.Background())) + // Auto-init [nodeBackend] in tests for LocalBackend created without the + // NewLocalBackend() constructor. Same reasoning for checking b.sys. + var bus *eventbus.Bus + if b.sys == nil { + bus = eventbus.New() + } else { + bus = b.sys.Bus.Get() + } + v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus) if b.currentNodeAtomic.CompareAndSwap(nil, v) { v.ready() } @@ -7009,7 +7017,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err // down, so no need to do any work. return nil } - newNode := newNodeBackend(b.ctx) + newNode := newNodeBackend(b.ctx, b.sys.Bus.Get()) if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil { oldNode.shutdown(errNodeContextChanged) } diff --git a/ipn/ipnlocal/node_backend.go b/ipn/ipnlocal/node_backend.go index 361d10bb6..efa74577b 100644 --- a/ipn/ipnlocal/node_backend.go +++ b/ipn/ipnlocal/node_backend.go @@ -23,9 +23,11 @@ import ( "tailscale.com/types/ptr" "tailscale.com/types/views" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus" "tailscale.com/util/mak" "tailscale.com/util/slicesx" "tailscale.com/wgengine/filter" + "tailscale.com/wgengine/magicsock" ) // nodeBackend is node-specific [LocalBackend] state. It is usually the current node. @@ -69,6 +71,11 @@ type nodeBackend struct { // replaced with a new one. filterAtomic atomic.Pointer[filter.Filter] + // initialized once and immutable + eventClient *eventbus.Client + filterUpdates *eventbus.Publisher[magicsock.FilterUpdate] + nodeUpdates *eventbus.Publisher[magicsock.NodeAddrsHostInfoUpdate] + // TODO(nickkhyl): maybe use sync.RWMutex? mu sync.Mutex // protects the following fields @@ -95,16 +102,20 @@ type nodeBackend struct { nodeByAddr map[netip.Addr]tailcfg.NodeID } -func newNodeBackend(ctx context.Context) *nodeBackend { +func newNodeBackend(ctx context.Context, bus *eventbus.Bus) *nodeBackend { ctx, ctxCancel := context.WithCancelCause(ctx) nb := &nodeBackend{ - ctx: ctx, - ctxCancel: ctxCancel, - readyCh: make(chan struct{}), + ctx: ctx, + ctxCancel: ctxCancel, + eventClient: bus.Client("ipnlocal.nodeBackend"), + readyCh: make(chan struct{}), } // Default filter blocks everything and logs nothing. noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{}) nb.filterAtomic.Store(noneFilter) + nb.filterUpdates = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient) + nb.nodeUpdates = eventbus.Publish[magicsock.NodeAddrsHostInfoUpdate](nb.eventClient) + nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()}) return nb } @@ -418,9 +429,16 @@ func (nb *nodeBackend) updatePeersLocked() { nb.peers[k] = tailcfg.NodeView{} } + changed := magicsock.NodeAddrsHostInfoUpdate{ + Complete: true, + } // Second pass, add everything wanted. for _, p := range nm.Peers { mak.Set(&nb.peers, p.ID(), p) + mak.Set(&changed.NodesByID, p.ID(), magicsock.NodeAddrsHostInfo{ + Addresses: p.Addresses(), + Hostinfo: p.Hostinfo(), + }) } // Third pass, remove deleted things. @@ -429,6 +447,7 @@ func (nb *nodeBackend) updatePeersLocked() { delete(nb.peers, k) } } + nb.nodeUpdates.Publish(changed) } func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { @@ -443,6 +462,9 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo // call (e.g. its endpoints + online status both change) var mutableNodes map[tailcfg.NodeID]*tailcfg.Node + changed := magicsock.NodeAddrsHostInfoUpdate{ + Complete: false, + } for _, m := range muts { n, ok := mutableNodes[m.NodeIDBeingMutated()] if !ok { @@ -457,8 +479,14 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo m.Apply(n) } for nid, n := range mutableNodes { - nb.peers[nid] = n.View() + nv := n.View() + nb.peers[nid] = nv + mak.Set(&changed.NodesByID, nid, magicsock.NodeAddrsHostInfo{ + Addresses: nv.Addresses(), + Hostinfo: nv.Hostinfo(), + }) } + nb.nodeUpdates.Publish(changed) return true } @@ -480,6 +508,7 @@ func (nb *nodeBackend) filter() *filter.Filter { func (nb *nodeBackend) setFilter(f *filter.Filter) { nb.filterAtomic.Store(f) + nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: f}) } func (nb *nodeBackend) dnsConfigForNetmap(prefs ipn.PrefsView, selfExpired bool, logf logger.Logf, versionOS string) *dns.Config { @@ -545,6 +574,7 @@ func (nb *nodeBackend) doShutdown(cause error) { defer nb.mu.Unlock() nb.ctxCancel(cause) nb.readyCh = nil + nb.eventClient.Close() } // dnsConfigForNetmap returns a *dns.Config for the given netmap, diff --git a/ipn/ipnlocal/node_backend_test.go b/ipn/ipnlocal/node_backend_test.go index a82b60a9a..dc67d327c 100644 --- a/ipn/ipnlocal/node_backend_test.go +++ b/ipn/ipnlocal/node_backend_test.go @@ -8,10 +8,12 @@ import ( "errors" "testing" "time" + + "tailscale.com/util/eventbus" ) func TestNodeBackendReadiness(t *testing.T) { - nb := newNodeBackend(t.Context()) + nb := newNodeBackend(t.Context(), eventbus.New()) // The node backend is not ready until [nodeBackend.ready] is called, // and [nodeBackend.Wait] should fail with [context.DeadlineExceeded]. @@ -42,7 +44,7 @@ func TestNodeBackendReadiness(t *testing.T) { } func TestNodeBackendShutdown(t *testing.T) { - nb := newNodeBackend(t.Context()) + nb := newNodeBackend(t.Context(), eventbus.New()) shutdownCause := errors.New("test shutdown") @@ -80,7 +82,7 @@ func TestNodeBackendShutdown(t *testing.T) { } func TestNodeBackendReadyAfterShutdown(t *testing.T) { - nb := newNodeBackend(t.Context()) + nb := newNodeBackend(t.Context(), eventbus.New()) shutdownCause := errors.New("test shutdown") nb.shutdown(shutdownCause) @@ -92,7 +94,7 @@ func TestNodeBackendReadyAfterShutdown(t *testing.T) { func TestNodeBackendParentContextCancellation(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) - nb := newNodeBackend(ctx) + nb := newNodeBackend(ctx, eventbus.New()) cancelCtx() @@ -109,7 +111,7 @@ func TestNodeBackendParentContextCancellation(t *testing.T) { } func TestNodeBackendConcurrentReadyAndShutdown(t *testing.T) { - nb := newNodeBackend(t.Context()) + nb := newNodeBackend(t.Context(), eventbus.New()) // Calling [nodeBackend.ready] and [nodeBackend.shutdown] concurrently // should not cause issues, and [nodeBackend.Wait] should unblock, diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index e5cc87dc3..1042e6794 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -63,6 +63,7 @@ import ( "tailscale.com/util/set" "tailscale.com/util/testenv" "tailscale.com/util/usermetric" + "tailscale.com/wgengine/filter" "tailscale.com/wgengine/wgint" ) @@ -502,6 +503,30 @@ func (o *Options) derpActiveFunc() func() { return o.DERPActiveFunc } +// NodeAddrsHostInfoUpdate represents an update event of the addresses and +// [tailcfg.HostInfoView] for a node set. This event is published over an +// [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as of 2025-06. If +// you are adding more subscribers consider moving this type out of magicsock. +type NodeAddrsHostInfoUpdate struct { + NodesByID map[tailcfg.NodeID]NodeAddrsHostInfo + Complete bool // true if NodesByID contains all known nodes, false if it may be a subset +} + +// NodeAddrsHostInfo represents the addresses and [tailcfg.HostinfoView] for a +// Tailscale node. +type NodeAddrsHostInfo struct { + Addresses views.Slice[netip.Prefix] + Hostinfo tailcfg.HostinfoView +} + +// FilterUpdate represents an update event for a [*filter.Filter]. This event is +// signaled over an [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as +// of 2025-06. If you are adding more subscribers consider moving this type out +// of magicsock. +type FilterUpdate struct { + *filter.Filter +} + // newConn is the error-free, network-listening-side-effect-free based // of NewConn. Mostly for tests. func newConn(logf logger.Logf) *Conn { @@ -535,6 +560,20 @@ func newConn(logf logger.Logf) *Conn { return c } +// consumeEventbusTopic consumes events from sub and passes them to +// handlerFn until sub.Done() is closed. +func consumeEventbusTopic[T any](sub *eventbus.Subscriber[T], handlerFn func(t T)) { + defer sub.Close() + for { + select { + case evt := <-sub.Events(): + handlerFn(evt) + case <-sub.Done(): + return + } + } +} + // NewConn creates a magic Conn listening on opts.Port. // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. @@ -562,17 +601,17 @@ func NewConn(opts Options) (*Conn, error) { c.eventClient = c.eventBus.Client("magicsock.Conn") pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient) - go func() { - defer pmSub.Close() - for { - select { - case <-pmSub.Events(): - c.onPortMapChanged() - case <-pmSub.Done(): - return - } - } - }() + go consumeEventbusTopic(pmSub, func(_ portmapper.Mapping) { + c.onPortMapChanged() + }) + filterSub := eventbus.Subscribe[FilterUpdate](c.eventClient) + go consumeEventbusTopic(filterSub, func(t FilterUpdate) { + // TODO(jwhited): implement + }) + nodeSub := eventbus.Subscribe[NodeAddrsHostInfoUpdate](c.eventClient) + go consumeEventbusTopic(nodeSub, func(t NodeAddrsHostInfoUpdate) { + // TODO(jwhited): implement + }) // Disable the explicit callback from the portmapper, the subscriber handles it. onPortMapChanged = nil @@ -2798,6 +2837,10 @@ func (c *connBind) Close() error { return nil } c.closed = true + // Close the [eventbus.Client]. + if c.eventClient != nil { + c.eventClient.Close() + } // Unblock all outstanding receives. c.pconn4.Close() c.pconn6.Close()