From 9e28bfc69c0127a21fbce6beeaee2d763fe78d2a Mon Sep 17 00:00:00 2001 From: Nick Khyl Date: Tue, 24 Jun 2025 13:39:29 -0500 Subject: [PATCH] ipn/ipnlocal,wgengine/magicsock: wait for magicsock to process pending events on authReconfig Updates #16369 Signed-off-by: Nick Khyl --- ipn/ipnlocal/local.go | 5 ++++ ipn/ipnlocal/local_test.go | 6 ++++ ipn/ipnlocal/state_test.go | 51 ++++++++++++++++++++++++++++++++- wgengine/magicsock/magicsock.go | 34 ++++++++++++++++++++++ 4 files changed, 95 insertions(+), 1 deletion(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 908418d4a..5467088f7 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -4853,6 +4853,11 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() { // updates are not currently blocked, based on the cached netmap and // user prefs. func (b *LocalBackend) authReconfig() { + // Wait for magicsock to process pending [eventbus] events, + // such as netmap updates. This should be completed before + // wireguard-go is reconfigured. See tailscale/tailscale#16369. + b.MagicConn().Synchronize() + b.mu.Lock() blocked := b.blocked prefs := b.pm.CurrentPrefs() diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 6e24f4300..6e6278688 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -85,6 +85,12 @@ func makeNodeKeyFromID(nodeID tailcfg.NodeID) key.NodePublic { return key.NodePublicFromRaw32(memro.B(raw)) } +func makeDiscoKeyFromID(nodeID tailcfg.NodeID) (ret key.DiscoPublic) { + raw := make([]byte, 32) + binary.BigEndian.PutUint64(raw[24:], uint64(nodeID)) + return key.DiscoPublicFromRaw32(memro.B(raw)) +} + func TestShrinkDefaultRoute(t *testing.T) { tests := []struct { route string diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 5d9e8b169..2921de203 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -1114,6 +1114,8 @@ func TestEngineReconfigOnStateChange(t *testing.T) { disconnect := &ipn.MaskedPrefs{Prefs: ipn.Prefs{WantRunning: false}, WantRunningSet: true} node1 := testNetmapForNode(1, "node-1", []netip.Prefix{netip.MustParsePrefix("100.64.1.1/32")}) node2 := testNetmapForNode(2, "node-2", []netip.Prefix{netip.MustParsePrefix("100.64.1.2/32")}) + node3 := testNetmapForNode(3, "node-3", []netip.Prefix{netip.MustParsePrefix("100.64.1.3/32")}) + node3.Peers = []tailcfg.NodeView{node1.SelfNode, node2.SelfNode} routesWithQuad100 := func(extra ...netip.Prefix) []netip.Prefix { return append(extra, netip.MustParsePrefix("100.100.100.100/32")) } @@ -1308,6 +1310,40 @@ func TestEngineReconfigOnStateChange(t *testing.T) { Hosts: hostsFor(node1), }, }, + { + name: "Start/Connect/Login/WithPeers", + steps: func(t *testing.T, lb *LocalBackend, cc func() *mockControl) { + mustDo(t)(lb.Start(ipn.Options{})) + mustDo2(t)(lb.EditPrefs(connect)) + cc().authenticated(node3) + }, + wantState: ipn.Starting, + wantCfg: &wgcfg.Config{ + Name: "tailscale", + NodeID: node3.SelfNode.StableID(), + Peers: []wgcfg.Peer{ + { + PublicKey: node1.SelfNode.Key(), + DiscoKey: node1.SelfNode.DiscoKey(), + }, + { + PublicKey: node2.SelfNode.Key(), + DiscoKey: node2.SelfNode.DiscoKey(), + }, + }, + Addresses: node3.SelfNode.Addresses().AsSlice(), + }, + wantRouterCfg: &router.Config{ + SNATSubnetRoutes: true, + NetfilterMode: preftype.NetfilterOn, + LocalAddrs: node3.SelfNode.Addresses().AsSlice(), + Routes: routesWithQuad100(), + }, + wantDNSCfg: &dns.Config{ + Routes: map[dnsname.FQDN][]*dnstype.Resolver{}, + Hosts: hostsFor(node3), + }, + }, } for _, tt := range tests { @@ -1322,8 +1358,18 @@ func TestEngineReconfigOnStateChange(t *testing.T) { t.Errorf("State: got %v; want %v", gotState, tt.wantState) } + if engine.Config() != nil { + for _, p := range engine.Config().Peers { + pKey := p.PublicKey.UntypedHexString() + _, err := lb.MagicConn().ParseEndpoint(pKey) + if err != nil { + t.Errorf("ParseEndpoint(%q) failed: %v", pKey, err) + } + } + } + opts := []cmp.Option{ - cmpopts.EquateComparable(key.NodePublic{}, netip.Addr{}, netip.Prefix{}), + cmpopts.EquateComparable(key.NodePublic{}, key.DiscoPublic{}, netip.Addr{}, netip.Prefix{}), } if diff := cmp.Diff(tt.wantCfg, engine.Config(), opts...); diff != "" { t.Errorf("wgcfg.Config(+got -want): %v", diff) @@ -1356,6 +1402,8 @@ func testNetmapForNode(userID tailcfg.UserID, name string, addresses []netip.Pre Addresses: addresses, MachineAuthorized: true, } + self.Key = makeNodeKeyFromID(self.ID) + self.DiscoKey = makeDiscoKeyFromID(self.ID) return &netmap.NetworkMap{ SelfNode: self.View(), Name: self.Name, @@ -1403,6 +1451,7 @@ func newLocalBackendWithMockEngineAndControl(t *testing.T, enableLogging bool) ( magicConn, err := magicsock.NewConn(magicsock.Options{ Logf: logf, + EventBus: sys.Bus.Get(), NetMon: dialer.NetMon(), Metrics: sys.UserMetricsRegistry(), HealthTracker: sys.HealthTracker(), diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index a96eaf3d8..d7b522699 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -167,6 +167,8 @@ type Conn struct { filterSub *eventbus.Subscriber[FilterUpdate] nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate] nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate] + syncSub *eventbus.Subscriber[syncPoint] + syncPub *eventbus.Publisher[syncPoint] subsDoneCh chan struct{} // closed when consumeEventbusTopics returns // pconn4 and pconn6 are the underlying UDP sockets used to @@ -538,6 +540,21 @@ type FilterUpdate struct { *filter.Filter } +// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize]. +// It serves as a synchronization point, allowing to wait until magicsock +// has processed all pending events. +type syncPoint chan struct{} + +// Wait blocks until [syncPoint.Signal] is called. +func (s syncPoint) Wait() { + <-s +} + +// Signal signals the sync point, unblocking the [syncPoint.Wait] call. +func (s syncPoint) Signal() { + close(s) +} + // newConn is the error-free, network-listening-side-effect-free based // of NewConn. Mostly for tests. func newConn(logf logger.Logf) *Conn { @@ -593,10 +610,25 @@ func (c *Conn) consumeEventbusTopics() { c.onNodeViewsUpdate(nodeViews) case nodeMuts := <-c.nodeMutsSub.Events(): c.onNodeMutationsUpdate(nodeMuts) + case syncPoint := <-c.syncSub.Events(): + c.dlogf("magicsock: received sync point after reconfig") + syncPoint.Signal() } } } +// Synchronize waits for all [eventbus] events published +// prior to this call to be processed by the receiver. +func (c *Conn) Synchronize() { + if c.syncPub == nil { + // Eventbus is not used; no need to synchronize (in certain tests). + return + } + sp := syncPoint(make(chan struct{})) + c.syncPub.Publish(sp) + sp.Wait() +} + // NewConn creates a magic Conn listening on opts.Port. // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. @@ -624,6 +656,8 @@ func NewConn(opts Options) (*Conn, error) { c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) + c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) + c.syncPub = eventbus.Publish[syncPoint](c.eventClient) c.subsDoneCh = make(chan struct{}) go c.consumeEventbusTopics() }