From fcab50b2763a1c7cd51f3c5d9cf8d2198eb7fa90 Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Wed, 18 Jun 2025 10:31:00 -0700 Subject: [PATCH] ipn/ipnlocal,wgengine{/magicsock}: replace SetNetworkMap with eventbus (#16299) Same with UpdateNetmapDelta. Updates tailscale/corp#27502 Updates #15160 Signed-off-by: Jordan Whited --- ipn/ipnlocal/local.go | 4 - ipn/ipnlocal/local_test.go | 49 +++++++- ipn/ipnlocal/node_backend.go | 44 ++++---- ipn/ipnlocal/serve_test.go | 2 + wgengine/magicsock/magicsock.go | 161 ++++++++++++++------------- wgengine/magicsock/magicsock_test.go | 62 +++++++---- wgengine/userspace.go | 1 - 7 files changed, 194 insertions(+), 129 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index cd30e92bb..908418d4a 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -1946,10 +1946,6 @@ var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil) // UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater. func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { - if !b.MagicConn().UpdateNetmapDelta(muts) { - return false - } - var notify *ipn.Notify // non-nil if we need to send a Notify defer func() { if notify != nil { diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 281d0e9c4..6e24f4300 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -5,6 +5,7 @@ package ipnlocal import ( "context" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -23,6 +24,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + memro "go4.org/mem" "go4.org/netipx" "golang.org/x/net/dns/dnsmessage" "tailscale.com/appc" @@ -77,6 +79,12 @@ func inRemove(ip netip.Addr) bool { return false } +func makeNodeKeyFromID(nodeID tailcfg.NodeID) key.NodePublic { + raw := make([]byte, 32) + binary.BigEndian.PutUint64(raw[24:], uint64(nodeID)) + return key.NodePublicFromRaw32(memro.B(raw)) +} + func TestShrinkDefaultRoute(t *testing.T) { tests := []struct { route string @@ -794,6 +802,7 @@ func TestStatusPeerCapabilities(t *testing.T) { (&tailcfg.Node{ ID: 1, StableID: "foo", + Key: makeNodeKeyFromID(1), IsWireGuardOnly: true, Hostinfo: (&tailcfg.Hostinfo{}).View(), Capabilities: []tailcfg.NodeCapability{tailcfg.CapabilitySSH}, @@ -804,6 +813,7 @@ func TestStatusPeerCapabilities(t *testing.T) { (&tailcfg.Node{ ID: 2, StableID: "bar", + Key: makeNodeKeyFromID(2), Hostinfo: (&tailcfg.Hostinfo{}).View(), Capabilities: []tailcfg.NodeCapability{tailcfg.CapabilityAdmin}, CapMap: (tailcfg.NodeCapMap)(map[tailcfg.NodeCapability][]tailcfg.RawMessage{ @@ -830,12 +840,14 @@ func TestStatusPeerCapabilities(t *testing.T) { (&tailcfg.Node{ ID: 1, StableID: "foo", + Key: makeNodeKeyFromID(1), IsWireGuardOnly: true, Hostinfo: (&tailcfg.Hostinfo{}).View(), }).View(), (&tailcfg.Node{ ID: 2, StableID: "bar", + Key: makeNodeKeyFromID(2), Hostinfo: (&tailcfg.Hostinfo{}).View(), }).View(), }, @@ -927,7 +939,11 @@ func TestUpdateNetmapDelta(t *testing.T) { nm := &netmap.NetworkMap{} for i := range 5 { - nm.Peers = append(nm.Peers, (&tailcfg.Node{ID: (tailcfg.NodeID(i) + 1)}).View()) + id := tailcfg.NodeID(i + 1) + nm.Peers = append(nm.Peers, (&tailcfg.Node{ + ID: id, + Key: makeNodeKeyFromID(id), + }).View()) } b.currentNode().SetNetMap(nm) @@ -963,18 +979,22 @@ func TestUpdateNetmapDelta(t *testing.T) { wants := []*tailcfg.Node{ { ID: 1, + Key: makeNodeKeyFromID(1), HomeDERP: 1, }, { ID: 2, + Key: makeNodeKeyFromID(2), Online: ptr.To(true), }, { ID: 3, + Key: makeNodeKeyFromID(3), Online: ptr.To(false), }, { ID: 4, + Key: makeNodeKeyFromID(4), LastSeen: ptr.To(someTime), }, } @@ -998,12 +1018,14 @@ func TestWhoIs(t *testing.T) { SelfNode: (&tailcfg.Node{ ID: 1, User: 10, + Key: makeNodeKeyFromID(1), Addresses: []netip.Prefix{netip.MustParsePrefix("100.101.102.103/32")}, }).View(), Peers: []tailcfg.NodeView{ (&tailcfg.Node{ ID: 2, User: 20, + Key: makeNodeKeyFromID(2), Addresses: []netip.Prefix{netip.MustParsePrefix("100.200.200.200/32")}, }).View(), }, @@ -1593,6 +1615,7 @@ func dnsResponse(domain, address string) []byte { } func TestSetExitNodeIDPolicy(t *testing.T) { + zeroValHostinfoView := new(tailcfg.Hostinfo).View() pfx := netip.MustParsePrefix tests := []struct { name string @@ -1669,14 +1692,18 @@ func TestSetExitNodeIDPolicy(t *testing.T) { }).View(), Peers: []tailcfg.NodeView{ (&tailcfg.Node{ + ID: 201, Name: "a.tailnet", + Key: makeNodeKeyFromID(201), Addresses: []netip.Prefix{ pfx("100.0.0.201/32"), pfx("100::201/128"), }, }).View(), (&tailcfg.Node{ + ID: 202, Name: "b.tailnet", + Key: makeNodeKeyFromID(202), Addresses: []netip.Prefix{ pfx("100::202/128"), }, @@ -1702,18 +1729,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) { }).View(), Peers: []tailcfg.NodeView{ (&tailcfg.Node{ + ID: 123, Name: "a.tailnet", StableID: tailcfg.StableNodeID("123"), + Key: makeNodeKeyFromID(123), Addresses: []netip.Prefix{ pfx("127.0.0.1/32"), pfx("100::201/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), (&tailcfg.Node{ + ID: 202, Name: "b.tailnet", + Key: makeNodeKeyFromID(202), Addresses: []netip.Prefix{ pfx("100::202/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), }, }, @@ -1734,18 +1767,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) { }).View(), Peers: []tailcfg.NodeView{ (&tailcfg.Node{ + ID: 123, Name: "a.tailnet", StableID: tailcfg.StableNodeID("123"), + Key: makeNodeKeyFromID(123), Addresses: []netip.Prefix{ pfx("127.0.0.1/32"), pfx("100::201/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), (&tailcfg.Node{ + ID: 202, Name: "b.tailnet", + Key: makeNodeKeyFromID(202), Addresses: []netip.Prefix{ pfx("100::202/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), }, }, @@ -1768,18 +1807,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) { }).View(), Peers: []tailcfg.NodeView{ (&tailcfg.Node{ + ID: 123, Name: "a.tailnet", StableID: tailcfg.StableNodeID("123"), + Key: makeNodeKeyFromID(123), Addresses: []netip.Prefix{ pfx("100.64.5.6/32"), pfx("100::201/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), (&tailcfg.Node{ + ID: 202, Name: "b.tailnet", + Key: makeNodeKeyFromID(202), Addresses: []netip.Prefix{ pfx("100::202/128"), }, + Hostinfo: zeroValHostinfoView, }).View(), }, }, @@ -1827,7 +1872,6 @@ func TestSetExitNodeIDPolicy(t *testing.T) { b.currentNode().SetNetMap(test.nm) b.pm = pm b.lastSuggestedExitNode = test.lastSuggestedExitNode - prefs := b.pm.prefs.AsStruct() if changed := applySysPolicy(prefs, test.lastSuggestedExitNode, false) || setExitNodeID(prefs, test.nm); changed != test.prefsChanged { t.Errorf("wanted prefs changed %v, got prefs changed %v", test.prefsChanged, changed) @@ -3218,6 +3262,7 @@ type peerOptFunc func(*tailcfg.Node) func makePeer(id tailcfg.NodeID, opts ...peerOptFunc) tailcfg.NodeView { node := &tailcfg.Node{ ID: id, + Key: makeNodeKeyFromID(id), StableID: tailcfg.StableNodeID(fmt.Sprintf("stable%d", id)), Name: fmt.Sprintf("peer%d", id), HomeDERP: int(id), diff --git a/ipn/ipnlocal/node_backend.go b/ipn/ipnlocal/node_backend.go index efa74577b..05389a677 100644 --- a/ipn/ipnlocal/node_backend.go +++ b/ipn/ipnlocal/node_backend.go @@ -72,9 +72,10 @@ type nodeBackend struct { filterAtomic atomic.Pointer[filter.Filter] // initialized once and immutable - eventClient *eventbus.Client - filterUpdates *eventbus.Publisher[magicsock.FilterUpdate] - nodeUpdates *eventbus.Publisher[magicsock.NodeAddrsHostInfoUpdate] + eventClient *eventbus.Client + filterPub *eventbus.Publisher[magicsock.FilterUpdate] + nodeViewsPub *eventbus.Publisher[magicsock.NodeViewsUpdate] + nodeMutsPub *eventbus.Publisher[magicsock.NodeMutationsUpdate] // TODO(nickkhyl): maybe use sync.RWMutex? mu sync.Mutex // protects the following fields @@ -113,9 +114,10 @@ func newNodeBackend(ctx context.Context, bus *eventbus.Bus) *nodeBackend { // Default filter blocks everything and logs nothing. noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{}) nb.filterAtomic.Store(noneFilter) - nb.filterUpdates = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient) - nb.nodeUpdates = eventbus.Publish[magicsock.NodeAddrsHostInfoUpdate](nb.eventClient) - nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()}) + nb.filterPub = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient) + nb.nodeViewsPub = eventbus.Publish[magicsock.NodeViewsUpdate](nb.eventClient) + nb.nodeMutsPub = eventbus.Publish[magicsock.NodeMutationsUpdate](nb.eventClient) + nb.filterPub.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()}) return nb } @@ -379,6 +381,12 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) { nb.netMap = nm nb.updateNodeByAddrLocked() nb.updatePeersLocked() + nv := magicsock.NodeViewsUpdate{} + if nm != nil { + nv.SelfNode = nm.SelfNode + nv.Peers = nm.Peers + } + nb.nodeViewsPub.Publish(nv) } func (nb *nodeBackend) updateNodeByAddrLocked() { @@ -429,16 +437,9 @@ func (nb *nodeBackend) updatePeersLocked() { nb.peers[k] = tailcfg.NodeView{} } - changed := magicsock.NodeAddrsHostInfoUpdate{ - Complete: true, - } // Second pass, add everything wanted. for _, p := range nm.Peers { mak.Set(&nb.peers, p.ID(), p) - mak.Set(&changed.NodesByID, p.ID(), magicsock.NodeAddrsHostInfo{ - Addresses: p.Addresses(), - Hostinfo: p.Hostinfo(), - }) } // Third pass, remove deleted things. @@ -447,7 +448,6 @@ func (nb *nodeBackend) updatePeersLocked() { delete(nb.peers, k) } } - nb.nodeUpdates.Publish(changed) } func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { @@ -462,8 +462,8 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo // call (e.g. its endpoints + online status both change) var mutableNodes map[tailcfg.NodeID]*tailcfg.Node - changed := magicsock.NodeAddrsHostInfoUpdate{ - Complete: false, + update := magicsock.NodeMutationsUpdate{ + Mutations: make([]netmap.NodeMutation, 0, len(muts)), } for _, m := range muts { n, ok := mutableNodes[m.NodeIDBeingMutated()] @@ -475,18 +475,14 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo } n = nv.AsStruct() mak.Set(&mutableNodes, nv.ID(), n) + update.Mutations = append(update.Mutations, m) } m.Apply(n) } for nid, n := range mutableNodes { - nv := n.View() - nb.peers[nid] = nv - mak.Set(&changed.NodesByID, nid, magicsock.NodeAddrsHostInfo{ - Addresses: nv.Addresses(), - Hostinfo: nv.Hostinfo(), - }) + nb.peers[nid] = n.View() } - nb.nodeUpdates.Publish(changed) + nb.nodeMutsPub.Publish(update) return true } @@ -508,7 +504,7 @@ func (nb *nodeBackend) filter() *filter.Filter { func (nb *nodeBackend) setFilter(f *filter.Filter) { nb.filterAtomic.Store(f) - nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: f}) + nb.filterPub.Publish(magicsock.FilterUpdate{Filter: f}) } func (nb *nodeBackend) dnsConfigForNetmap(prefs ipn.PrefsView, selfExpired bool, logf logger.Logf, versionOS string) *dns.Config { diff --git a/ipn/ipnlocal/serve_test.go b/ipn/ipnlocal/serve_test.go index b9370f877..57d1a4745 100644 --- a/ipn/ipnlocal/serve_test.go +++ b/ipn/ipnlocal/serve_test.go @@ -918,6 +918,7 @@ func newTestBackend(t *testing.T) *LocalBackend { ID: 152, ComputedName: "some-peer", User: tailcfg.UserID(1), + Key: makeNodeKeyFromID(152), Addresses: []netip.Prefix{ netip.MustParsePrefix("100.150.151.152/32"), }, @@ -927,6 +928,7 @@ func newTestBackend(t *testing.T) *LocalBackend { ComputedName: "some-tagged-peer", Tags: []string{"tag:server", "tag:test"}, User: tailcfg.UserID(1), + Key: makeNodeKeyFromID(153), Addresses: []netip.Prefix{ netip.MustParsePrefix("100.150.151.153/32"), }, diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 1042e6794..a6c6a3fb6 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -160,6 +160,14 @@ type Conn struct { connCtxCancel func() // closes connCtx donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call + // These [eventbus.Subscriber] fields are solely accessed by + // consumeEventbusTopics once initialized. + pmSub *eventbus.Subscriber[portmapper.Mapping] + filterSub *eventbus.Subscriber[FilterUpdate] + nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate] + nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate] + subsDoneCh chan struct{} // closed when consumeEventbusTopics returns + // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock // protocols. @@ -341,9 +349,9 @@ type Conn struct { netInfoLast *tailcfg.NetInfo derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled - peers views.Slice[tailcfg.NodeView] // from last SetNetworkMap update - lastFlags debugFlags // at time of last SetNetworkMap - firstAddrForTest netip.Addr // from last SetNetworkMap update; for tests only + peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate update + lastFlags debugFlags // at time of last onNodeViewsUpdate + firstAddrForTest netip.Addr // from last onNodeViewsUpdate update; for tests only privateKey key.NodePrivate // WireGuard private key for this node everHadKey bool // whether we ever had a non-zero private key myDerp int // nearest DERP region ID; 0 means none/unknown @@ -411,10 +419,8 @@ func (c *Conn) dlogf(format string, a ...any) { // Options contains options for Listen. type Options struct { // EventBus, if non-nil, is used for event publication and subscription by - // each Conn created from these Options. - // - // TODO(creachadair): As of 2025-03-19 this is optional, but is intended to - // become required non-nil. + // each Conn created from these Options. It must not be nil outside of + // tests. EventBus *eventbus.Bus // Logf provides a log function to use. It must not be nil. @@ -503,20 +509,22 @@ func (o *Options) derpActiveFunc() func() { return o.DERPActiveFunc } -// NodeAddrsHostInfoUpdate represents an update event of the addresses and -// [tailcfg.HostInfoView] for a node set. This event is published over an -// [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as of 2025-06. If -// you are adding more subscribers consider moving this type out of magicsock. -type NodeAddrsHostInfoUpdate struct { - NodesByID map[tailcfg.NodeID]NodeAddrsHostInfo - Complete bool // true if NodesByID contains all known nodes, false if it may be a subset +// NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all +// nodes. This event is published over an [eventbus.Bus]. It may be published +// with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole +// subscriber as of 2025-06. If you are adding more subscribers consider moving +// this type out of magicsock. +type NodeViewsUpdate struct { + SelfNode tailcfg.NodeView + Peers []tailcfg.NodeView } -// NodeAddrsHostInfo represents the addresses and [tailcfg.HostinfoView] for a -// Tailscale node. -type NodeAddrsHostInfo struct { - Addresses views.Slice[netip.Prefix] - Hostinfo tailcfg.HostinfoView +// NodeMutationsUpdate represents an update event of one or more +// [netmap.NodeMutation]. This event is published over an [eventbus.Bus]. +// [magicsock.Conn] is the sole subscriber as of 2025-06. If you are adding more +// subscribers consider moving this type out of magicsock. +type NodeMutationsUpdate struct { + Mutations []netmap.NodeMutation } // FilterUpdate represents an update event for a [*filter.Filter]. This event is @@ -560,16 +568,28 @@ func newConn(logf logger.Logf) *Conn { return c } -// consumeEventbusTopic consumes events from sub and passes them to -// handlerFn until sub.Done() is closed. -func consumeEventbusTopic[T any](sub *eventbus.Subscriber[T], handlerFn func(t T)) { - defer sub.Close() +// consumeEventbusTopics consumes events from all [Conn]-relevant +// [eventbus.Subscriber]'s and passes them to their related handler. Events are +// always handled in the order they are received, i.e. the next event is not +// read until the previous event's handler has returned. It returns when the +// [portmapper.Mapping] subscriber is closed, which is interpreted to be the +// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either +// all open or all closed). +func (c *Conn) consumeEventbusTopics() { + defer close(c.subsDoneCh) + for { select { - case evt := <-sub.Events(): - handlerFn(evt) - case <-sub.Done(): + case <-c.pmSub.Done(): return + case <-c.pmSub.Events(): + c.onPortMapChanged() + case filterUpdate := <-c.filterSub.Events(): + c.onFilterUpdate(filterUpdate) + case nodeViews := <-c.nodeViewsSub.Events(): + c.onNodeViewsUpdate(nodeViews) + case nodeMuts := <-c.nodeMutsSub.Events(): + c.onNodeMutationsUpdate(nodeMuts) } } } @@ -592,29 +612,17 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity - // If an event bus is enabled, subscribe to portmapping changes; otherwise - // use the callback mechanism of portmapper.Client. - // - // TODO(creachadair): Remove the switch once the event bus is mandatory. - onPortMapChanged := c.onPortMapChanged if c.eventBus != nil { c.eventClient = c.eventBus.Client("magicsock.Conn") - pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient) - go consumeEventbusTopic(pmSub, func(_ portmapper.Mapping) { - c.onPortMapChanged() - }) - filterSub := eventbus.Subscribe[FilterUpdate](c.eventClient) - go consumeEventbusTopic(filterSub, func(t FilterUpdate) { - // TODO(jwhited): implement - }) - nodeSub := eventbus.Subscribe[NodeAddrsHostInfoUpdate](c.eventClient) - go consumeEventbusTopic(nodeSub, func(t NodeAddrsHostInfoUpdate) { - // TODO(jwhited): implement - }) - - // Disable the explicit callback from the portmapper, the subscriber handles it. - onPortMapChanged = nil + // Subscribe calls must return before NewConn otherwise published + // events can be missed. + c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient) + c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) + c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) + c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) + c.subsDoneCh = make(chan struct{}) + go c.consumeEventbusTopics() } // Don't log the same log messages possibly every few seconds in our @@ -630,7 +638,6 @@ func NewConn(opts Options) (*Conn, error) { NetMon: opts.NetMon, DebugKnobs: portMapOpts, ControlKnobs: opts.ControlKnobs, - OnChange: onPortMapChanged, }) c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP) c.netMon = opts.NetMon @@ -2551,12 +2558,13 @@ func capVerIsRelayCapable(version tailcfg.CapabilityVersion) bool { return false } -// SetNetworkMap is called when the control client gets a new network -// map from the control server. It must always be non-nil. -// -// It should not use the DERPMap field of NetworkMap; that's -// conditionally sent to SetDERPMap instead. -func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { +func (c *Conn) onFilterUpdate(f FilterUpdate) { + // TODO(jwhited): implement +} + +// onNodeViewsUpdate is called when a [NodeViewsUpdate] is received over the +// [eventbus.Bus]. +func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) { c.mu.Lock() defer c.mu.Unlock() @@ -2565,15 +2573,15 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { } priorPeers := c.peers - metricNumPeers.Set(int64(len(nm.Peers))) + metricNumPeers.Set(int64(len(update.Peers))) // Update c.netMap regardless, before the following early return. - curPeers := views.SliceOf(nm.Peers) + curPeers := views.SliceOf(update.Peers) c.peers = curPeers flags := c.debugFlagsLocked() - if addrs := nm.GetAddresses(); addrs.Len() > 0 { - c.firstAddrForTest = addrs.At(0).Addr() + if update.SelfNode.Valid() && update.SelfNode.Addresses().Len() > 0 { + c.firstAddrForTest = update.SelfNode.Addresses().At(0).Addr() } else { c.firstAddrForTest = netip.Addr{} } @@ -2588,16 +2596,16 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { c.lastFlags = flags - c.logf("[v1] magicsock: got updated network map; %d peers", len(nm.Peers)) + c.logf("[v1] magicsock: got updated network map; %d peers", len(update.Peers)) - entriesPerBuffer := debugRingBufferSize(len(nm.Peers)) + entriesPerBuffer := debugRingBufferSize(len(update.Peers)) // Try a pass of just upserting nodes and creating missing // endpoints. If the set of nodes is the same, this is an // efficient alloc-free update. If the set of nodes is different, // we'll fall through to the next pass, which allocates but can // handle full set updates. - for _, n := range nm.Peers { + for _, n := range update.Peers { if n.ID() == 0 { devPanicf("node with zero ID") continue @@ -2697,14 +2705,14 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { c.peerMap.upsertEndpoint(ep, key.DiscoPublic{}) } - // If the set of nodes changed since the last SetNetworkMap, the + // If the set of nodes changed since the last onNodeViewsUpdate, the // upsert loop just above made c.peerMap contain the union of the // old and new peers - which will be larger than the set from the // current netmap. If that happens, go through the allocful // deletion path to clean up moribund nodes. - if c.peerMap.nodeCount() != len(nm.Peers) { + if c.peerMap.nodeCount() != len(update.Peers) { keep := set.Set[key.NodePublic]{} - for _, n := range nm.Peers { + for _, n := range update.Peers { keep.Add(n.Key()) } c.peerMap.forEachEndpoint(func(ep *endpoint) { @@ -2837,10 +2845,6 @@ func (c *connBind) Close() error { return nil } c.closed = true - // Close the [eventbus.Client]. - if c.eventClient != nil { - c.eventClient.Close() - } // Unblock all outstanding receives. c.pconn4.Close() c.pconn6.Close() @@ -2850,9 +2854,6 @@ func (c *connBind) Close() error { if c.closeDisco6 != nil { c.closeDisco6.Close() } - if c.eventClient != nil { - c.eventClient.Close() - } // Send an empty read result to unblock receiveDERP, // which will then check connBind.Closed. // connBind.Closed takes c.mu, but c.derpRecvCh is buffered. @@ -2871,6 +2872,17 @@ func (c *connBind) isClosed() bool { // // Only the first close does anything. Any later closes return nil. func (c *Conn) Close() error { + // Close the [eventbus.Client] and wait for Conn.consumeEventbusTopics to + // return. Do this before acquiring c.mu: + // 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can + // deadlock with c.Close(). + // 2. Conn.consumeEventbusTopics event handlers may not guard against + // undesirable post/in-progress Conn.Close() behaviors. + if c.eventClient != nil { + c.eventClient.Close() + <-c.subsDoneCh + } + c.mu.Lock() defer c.mu.Unlock() if c.closed { @@ -2901,7 +2913,6 @@ func (c *Conn) Close() error { if c.closeDisco6 != nil { c.closeDisco6.Close() } - // Wait on goroutines updating right at the end, once everything is // already closed. We want everything else in the Conn to be // consistently in the closed state before we release mu to wait @@ -3233,12 +3244,13 @@ func simpleDur(d time.Duration) time.Duration { return d.Round(time.Minute) } -// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater. -func (c *Conn) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { +// onNodeMutationsUpdate is called when a [NodeMutationsUpdate] is received over +// the [eventbus.Bus]. +func (c *Conn) onNodeMutationsUpdate(update NodeMutationsUpdate) { c.mu.Lock() defer c.mu.Unlock() - for _, m := range muts { + for _, m := range update.Mutations { nodeID := m.NodeIDBeingMutated() ep, ok := c.peerMap.endpointForNodeID(nodeID) if !ok { @@ -3257,7 +3269,6 @@ func (c *Conn) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { ep.mu.Unlock() } } - return true } // UpdateStatus implements the interface nede by ipnstate.StatusBuilder. diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 5e71a40c9..7fa062fa8 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -166,7 +166,7 @@ type magicStack struct { } // newMagicStack builds and initializes an idle magicsock and -// friends. You need to call conn.SetNetworkMap and dev.Reconfig +// friends. You need to call conn.onNodeViewsUpdate and dev.Reconfig // before anything interesting happens. func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap) *magicStack { privateKey := key.NewNode() @@ -339,9 +339,13 @@ func meshStacks(logf logger.Logf, mutateNetmap func(idx int, nm *netmap.NetworkM for i, m := range ms { nm := buildNetmapLocked(i) - m.conn.SetNetworkMap(nm) - peerSet := make(set.Set[key.NodePublic], len(nm.Peers)) - for _, peer := range nm.Peers { + nv := NodeViewsUpdate{ + SelfNode: nm.SelfNode, + Peers: nm.Peers, + } + m.conn.onNodeViewsUpdate(nv) + peerSet := make(set.Set[key.NodePublic], len(nv.Peers)) + for _, peer := range nv.Peers { peerSet.Add(peer.Key()) } m.conn.UpdatePeers(peerSet) @@ -1366,7 +1370,7 @@ func newTestConn(t testing.TB) *Conn { return conn } -// addTestEndpoint sets conn's network map to a single peer expected +// addTestEndpoint sets conn's node views to a single peer expected // to receive packets from sendConn (or DERP), and returns that peer's // nodekey and discokey. func addTestEndpoint(tb testing.TB, conn *Conn, sendConn net.PacketConn) (key.NodePublic, key.DiscoPublic) { @@ -1375,7 +1379,7 @@ func addTestEndpoint(tb testing.TB, conn *Conn, sendConn net.PacketConn) (key.No // codepath. discoKey := key.DiscoPublicFromRaw32(mem.B([]byte{31: 1})) nodeKey := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 31: 0})) - conn.SetNetworkMap(&netmap.NetworkMap{ + conn.onNodeViewsUpdate(NodeViewsUpdate{ Peers: nodeViews([]*tailcfg.Node{ { ID: 1, @@ -1564,11 +1568,11 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView { return nv } -// Test that a netmap update where node changes its node key but +// Test that a node views update where node changes its node key but // doesn't change its disco key doesn't result in a broken state. // // https://github.com/tailscale/tailscale/issues/1391 -func TestSetNetworkMapChangingNodeKey(t *testing.T) { +func TestOnNodeViewsUpdateChangingNodeKey(t *testing.T) { conn := newTestConn(t) t.Cleanup(func() { conn.Close() }) var buf tstest.MemLogger @@ -1580,7 +1584,7 @@ func TestSetNetworkMapChangingNodeKey(t *testing.T) { nodeKey1 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '1', 31: 0})) nodeKey2 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '2', 31: 0})) - conn.SetNetworkMap(&netmap.NetworkMap{ + conn.onNodeViewsUpdate(NodeViewsUpdate{ Peers: nodeViews([]*tailcfg.Node{ { ID: 1, @@ -1596,7 +1600,7 @@ func TestSetNetworkMapChangingNodeKey(t *testing.T) { } for range 3 { - conn.SetNetworkMap(&netmap.NetworkMap{ + conn.onNodeViewsUpdate(NodeViewsUpdate{ Peers: nodeViews([]*tailcfg.Node{ { ID: 2, @@ -1921,7 +1925,7 @@ func eps(s ...string) []netip.AddrPort { return eps } -func TestStressSetNetworkMap(t *testing.T) { +func TestStressOnNodeViewsUpdate(t *testing.T) { t.Parallel() conn := newTestConn(t) @@ -1969,15 +1973,15 @@ func TestStressSetNetworkMap(t *testing.T) { allPeers[j].Key = randNodeKey() } } - // Clone existing peers into a new netmap. + // Clone existing peers. peers := make([]*tailcfg.Node, 0, len(allPeers)) for peerIdx, p := range allPeers { if present[peerIdx] { peers = append(peers, p.Clone()) } } - // Set the netmap. - conn.SetNetworkMap(&netmap.NetworkMap{ + // Set the node views. + conn.onNodeViewsUpdate(NodeViewsUpdate{ Peers: nodeViews(peers), }) // Check invariants. @@ -2102,10 +2106,10 @@ func TestRebindingUDPConn(t *testing.T) { } // https://github.com/tailscale/tailscale/issues/6680: don't ignore -// SetNetworkMap calls when there are no peers. (A too aggressive fast path was +// onNodeViewsUpdate calls when there are no peers. (A too aggressive fast path was // previously bailing out early, thinking there were no changes since all zero -// peers didn't change, but the netmap has non-peer info in it too we shouldn't discard) -func TestSetNetworkMapWithNoPeers(t *testing.T) { +// peers didn't change, but the node views has non-peer info in it too we shouldn't discard) +func TestOnNodeViewsUpdateWithNoPeers(t *testing.T) { var c Conn knobs := &controlknobs.Knobs{} c.logf = logger.Discard @@ -2114,9 +2118,9 @@ func TestSetNetworkMapWithNoPeers(t *testing.T) { for i := 1; i <= 3; i++ { v := !debugEnableSilentDisco() envknob.Setenv("TS_DEBUG_ENABLE_SILENT_DISCO", fmt.Sprint(v)) - nm := &netmap.NetworkMap{} - c.SetNetworkMap(nm) - t.Logf("ptr %d: %p", i, nm) + nv := NodeViewsUpdate{} + c.onNodeViewsUpdate(nv) + t.Logf("ptr %d: %p", i, nv) if c.lastFlags.heartbeatDisabled != v { t.Fatalf("call %d: didn't store netmap", i) } @@ -2213,7 +2217,11 @@ func TestIsWireGuardOnlyPeer(t *testing.T) { }, }), } - m.conn.SetNetworkMap(nm) + nv := NodeViewsUpdate{ + SelfNode: nm.SelfNode, + Peers: nm.Peers, + } + m.conn.onNodeViewsUpdate(nv) cfg, err := nmcfg.WGCfg(nm, t.Logf, netmap.AllowSubnetRoutes, "") if err != nil { @@ -2275,7 +2283,11 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) { }, }), } - m.conn.SetNetworkMap(nm) + nv := NodeViewsUpdate{ + SelfNode: nm.SelfNode, + Peers: nm.Peers, + } + m.conn.onNodeViewsUpdate(nv) cfg, err := nmcfg.WGCfg(nm, t.Logf, netmap.AllowSubnetRoutes, "") if err != nil { @@ -2312,7 +2324,11 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) { // configures WG. func applyNetworkMap(t *testing.T, m *magicStack, nm *netmap.NetworkMap) { t.Helper() - m.conn.SetNetworkMap(nm) + nv := NodeViewsUpdate{ + SelfNode: nm.SelfNode, + Peers: nm.Peers, + } + m.conn.onNodeViewsUpdate(nv) // Make sure we can't use v6 to avoid test failures. m.conn.noV6.Store(true) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index b1b82032b..4a9f32143 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -1300,7 +1300,6 @@ func (e *userspaceEngine) linkChange(delta *netmon.ChangeDelta) { } func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) { - e.magicConn.SetNetworkMap(nm) e.mu.Lock() e.netMap = nm e.mu.Unlock()