From f3767dddf8b1af91e5b67126a9dd519bb284cfec Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Tue, 16 Dec 2025 18:31:14 +0000 Subject: [PATCH] batcher: ensure removal from batcher Fixes #2924 Signed-off-by: Kristoffer Dalby --- hscontrol/mapper/batcher_lockfree.go | 41 +++++++- hscontrol/mapper/batcher_test.go | 137 +++++++++++++++++++++++++++ hscontrol/state/state.go | 4 +- 3 files changed, 178 insertions(+), 4 deletions(-) diff --git a/hscontrol/mapper/batcher_lockfree.go b/hscontrol/mapper/batcher_lockfree.go index 28e53426..db8d89e9 100644 --- a/hscontrol/mapper/batcher_lockfree.go +++ b/hscontrol/mapper/batcher_lockfree.go @@ -285,9 +285,38 @@ func (b *LockFreeBatcher) queueWork(w work) { } } -// addToBatch adds a response to the pending batch. -func (b *LockFreeBatcher) addToBatch(responses ...change.Change) { - // Short circuit if any of the responses is a full update, which +// addToBatch adds changes to the pending batch. +func (b *LockFreeBatcher) addToBatch(changes ...change.Change) { + // Clean up any nodes being permanently removed from the system. + // + // This handles the case where a node is deleted from state but the batcher + // still has it registered. By cleaning up here, we prevent "node not found" + // errors when workers try to generate map responses for deleted nodes. + // + // Safety: change.Change.PeersRemoved is ONLY populated when nodes are actually + // deleted from the system (via change.NodeRemoved in state.DeleteNode). Policy + // changes that affect peer visibility do NOT use this field - they set + // RequiresRuntimePeerComputation=true and compute removed peers at runtime, + // putting them in tailcfg.MapResponse.PeersRemoved (a different struct). + // Therefore, this cleanup only removes nodes that are truly being deleted, + // not nodes that are still connected but have lost visibility of certain peers. + // + // See: https://github.com/juanfont/headscale/issues/2924 + for _, ch := range changes { + for _, removedID := range ch.PeersRemoved { + if _, existed := b.nodes.LoadAndDelete(removedID); existed { + b.totalNodes.Add(-1) + log.Debug(). + Uint64("node.id", removedID.Uint64()). + Msg("Removed deleted node from batcher") + } + + b.connected.Delete(removedID) + b.pendingChanges.Delete(removedID) + } + } + + // Short circuit if any of the changes is a full update, which // means we can skip sending individual changes. if change.HasFull(responses) { b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool { @@ -789,3 +818,9 @@ func (b *LockFreeBatcher) Debug() map[types.NodeID]DebugNodeInfo { func (b *LockFreeBatcher) DebugMapResponses() (map[types.NodeID][]tailcfg.MapResponse, error) { return b.mapper.debugMapResponses() } + +// WorkErrors returns the count of work errors encountered. +// This is primarily useful for testing and debugging. +func (b *LockFreeBatcher) WorkErrors() int64 { + return b.workErrors.Load() +} diff --git a/hscontrol/mapper/batcher_test.go b/hscontrol/mapper/batcher_test.go index f67cb517..3cbd4e2d 100644 --- a/hscontrol/mapper/batcher_test.go +++ b/hscontrol/mapper/batcher_test.go @@ -2635,3 +2635,140 @@ func TestBatcherMultiConnection(t *testing.T) { }) } } + +// TestNodeDeletedWhileChangesPending reproduces issue #2924 where deleting a node +// from state while there are pending changes for that node in the batcher causes +// "node not found" errors. The race condition occurs when: +// 1. Node is connected and changes are queued for it +// 2. Node is deleted from state (NodeStore) but not from batcher +// 3. Batcher worker tries to generate map response for deleted node +// 4. Mapper fails to find node in state, causing repeated "node not found" errors. +func TestNodeDeletedWhileChangesPending(t *testing.T) { + for _, batcherFunc := range allBatcherFunctions { + t.Run(batcherFunc.name, func(t *testing.T) { + // Create test environment with 3 nodes + testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, 1, 3, NORMAL_BUFFER_SIZE) + defer cleanup() + + batcher := testData.Batcher + st := testData.State + node1 := &testData.Nodes[0] + node2 := &testData.Nodes[1] + node3 := &testData.Nodes[2] + + t.Logf("Testing issue #2924: Node1=%d, Node2=%d, Node3=%d", + node1.n.ID, node2.n.ID, node3.n.ID) + + // Helper to drain channels + drainCh := func(ch chan *tailcfg.MapResponse) { + for { + select { + case <-ch: + // drain + default: + return + } + } + } + + // Start update consumers for all nodes + node1.start() + node2.start() + node3.start() + + defer node1.cleanup() + defer node2.cleanup() + defer node3.cleanup() + + // Connect all nodes to the batcher + require.NoError(t, batcher.AddNode(node1.n.ID, node1.ch, tailcfg.CapabilityVersion(100))) + require.NoError(t, batcher.AddNode(node2.n.ID, node2.ch, tailcfg.CapabilityVersion(100))) + require.NoError(t, batcher.AddNode(node3.n.ID, node3.ch, tailcfg.CapabilityVersion(100))) + + // Wait for all nodes to be connected + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.True(c, batcher.IsConnected(node1.n.ID), "node1 should be connected") + assert.True(c, batcher.IsConnected(node2.n.ID), "node2 should be connected") + assert.True(c, batcher.IsConnected(node3.n.ID), "node3 should be connected") + }, 5*time.Second, 50*time.Millisecond, "waiting for nodes to connect") + + // Get initial work errors count + var initialWorkErrors int64 + if lfb, ok := unwrapBatcher(batcher).(*LockFreeBatcher); ok { + initialWorkErrors = lfb.WorkErrors() + t.Logf("Initial work errors: %d", initialWorkErrors) + } + + // Clear channels to prepare for the test + drainCh(node1.ch) + drainCh(node2.ch) + drainCh(node3.ch) + + // Get node view for deletion + nodeToDelete, ok := st.GetNodeByID(node3.n.ID) + require.True(t, ok, "node3 should exist in state") + + // Delete the node from state - this returns a NodeRemoved change + // In production, this change is sent to batcher via app.Change() + nodeChange, err := st.DeleteNode(nodeToDelete) + require.NoError(t, err, "should be able to delete node from state") + t.Logf("Deleted node %d from state, change: %s", node3.n.ID, nodeChange.Reason) + + // Verify node is deleted from state + _, exists := st.GetNodeByID(node3.n.ID) + require.False(t, exists, "node3 should be deleted from state") + + // Send the NodeRemoved change to batcher (this is what app.Change() does) + // With the fix, this should clean up node3 from batcher's internal state + batcher.AddWork(nodeChange) + + // Wait for the batcher to process the removal and clean up the node + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.False(c, batcher.IsConnected(node3.n.ID), "node3 should be disconnected from batcher") + }, 5*time.Second, 50*time.Millisecond, "waiting for node removal to be processed") + + t.Logf("Node %d connected in batcher after NodeRemoved: %v", node3.n.ID, batcher.IsConnected(node3.n.ID)) + + // Now queue changes that would have caused errors before the fix + // With the fix, these should NOT cause "node not found" errors + // because node3 was cleaned up when NodeRemoved was processed + batcher.AddWork(change.FullUpdate()) + batcher.AddWork(change.PolicyChange()) + + // Wait for work to be processed and verify no errors occurred + // With the fix, no new errors should occur because the deleted node + // was cleaned up from batcher state when NodeRemoved was processed + assert.EventuallyWithT(t, func(c *assert.CollectT) { + var finalWorkErrors int64 + if lfb, ok := unwrapBatcher(batcher).(*LockFreeBatcher); ok { + finalWorkErrors = lfb.WorkErrors() + } + + newErrors := finalWorkErrors - initialWorkErrors + assert.Zero(c, newErrors, "Fix for #2924: should have no work errors after node deletion") + }, 5*time.Second, 100*time.Millisecond, "waiting for work processing to complete without errors") + + // Verify remaining nodes still work correctly + drainCh(node1.ch) + drainCh(node2.ch) + batcher.AddWork(change.NodeAdded(node1.n.ID)) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + // Node 1 and 2 should receive updates + stats1 := NodeStats{TotalUpdates: atomic.LoadInt64(&node1.updateCount)} + stats2 := NodeStats{TotalUpdates: atomic.LoadInt64(&node2.updateCount)} + assert.Positive(c, stats1.TotalUpdates, "node1 should have received updates") + assert.Positive(c, stats2.TotalUpdates, "node2 should have received updates") + }, 5*time.Second, 100*time.Millisecond, "waiting for remaining nodes to receive updates") + }) + } +} + +// unwrapBatcher extracts the underlying batcher from wrapper types. +func unwrapBatcher(b Batcher) Batcher { + if wrapper, ok := b.(*testBatcherWrapper); ok { + return unwrapBatcher(wrapper.Batcher) + } + + return b +} diff --git a/hscontrol/state/state.go b/hscontrol/state/state.go index 43ce155d..b365269c 100644 --- a/hscontrol/state/state.go +++ b/hscontrol/state/state.go @@ -478,7 +478,9 @@ func (s *State) DeleteNode(node types.NodeView) (change.Change, error) { } if !policyChange.IsEmpty() { - c = policyChange + // Merge policy change with NodeRemoved to preserve PeersRemoved info + // This ensures the batcher cleans up the deleted node from its state + c = c.Merge(policyChange) } return c, nil