batcher: ensure removal from batcher

Fixes #2924

Signed-off-by: Kristoffer Dalby <kristoffer@dalby.cc>
This commit is contained in:
Kristoffer Dalby
2025-12-16 18:31:14 +00:00
parent 5c6cd62df1
commit f3767dddf8
3 changed files with 178 additions and 4 deletions

View File

@@ -285,9 +285,38 @@ func (b *LockFreeBatcher) queueWork(w work) {
}
}
// addToBatch adds a response to the pending batch.
func (b *LockFreeBatcher) addToBatch(responses ...change.Change) {
// Short circuit if any of the responses is a full update, which
// addToBatch adds changes to the pending batch.
func (b *LockFreeBatcher) addToBatch(changes ...change.Change) {
// Clean up any nodes being permanently removed from the system.
//
// This handles the case where a node is deleted from state but the batcher
// still has it registered. By cleaning up here, we prevent "node not found"
// errors when workers try to generate map responses for deleted nodes.
//
// Safety: change.Change.PeersRemoved is ONLY populated when nodes are actually
// deleted from the system (via change.NodeRemoved in state.DeleteNode). Policy
// changes that affect peer visibility do NOT use this field - they set
// RequiresRuntimePeerComputation=true and compute removed peers at runtime,
// putting them in tailcfg.MapResponse.PeersRemoved (a different struct).
// Therefore, this cleanup only removes nodes that are truly being deleted,
// not nodes that are still connected but have lost visibility of certain peers.
//
// See: https://github.com/juanfont/headscale/issues/2924
for _, ch := range changes {
for _, removedID := range ch.PeersRemoved {
if _, existed := b.nodes.LoadAndDelete(removedID); existed {
b.totalNodes.Add(-1)
log.Debug().
Uint64("node.id", removedID.Uint64()).
Msg("Removed deleted node from batcher")
}
b.connected.Delete(removedID)
b.pendingChanges.Delete(removedID)
}
}
// Short circuit if any of the changes is a full update, which
// means we can skip sending individual changes.
if change.HasFull(responses) {
b.nodes.Range(func(nodeID types.NodeID, _ *multiChannelNodeConn) bool {
@@ -789,3 +818,9 @@ func (b *LockFreeBatcher) Debug() map[types.NodeID]DebugNodeInfo {
func (b *LockFreeBatcher) DebugMapResponses() (map[types.NodeID][]tailcfg.MapResponse, error) {
return b.mapper.debugMapResponses()
}
// WorkErrors returns the count of work errors encountered.
// This is primarily useful for testing and debugging.
func (b *LockFreeBatcher) WorkErrors() int64 {
return b.workErrors.Load()
}

View File

@@ -2635,3 +2635,140 @@ func TestBatcherMultiConnection(t *testing.T) {
})
}
}
// TestNodeDeletedWhileChangesPending reproduces issue #2924 where deleting a node
// from state while there are pending changes for that node in the batcher causes
// "node not found" errors. The race condition occurs when:
// 1. Node is connected and changes are queued for it
// 2. Node is deleted from state (NodeStore) but not from batcher
// 3. Batcher worker tries to generate map response for deleted node
// 4. Mapper fails to find node in state, causing repeated "node not found" errors.
func TestNodeDeletedWhileChangesPending(t *testing.T) {
for _, batcherFunc := range allBatcherFunctions {
t.Run(batcherFunc.name, func(t *testing.T) {
// Create test environment with 3 nodes
testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, 1, 3, NORMAL_BUFFER_SIZE)
defer cleanup()
batcher := testData.Batcher
st := testData.State
node1 := &testData.Nodes[0]
node2 := &testData.Nodes[1]
node3 := &testData.Nodes[2]
t.Logf("Testing issue #2924: Node1=%d, Node2=%d, Node3=%d",
node1.n.ID, node2.n.ID, node3.n.ID)
// Helper to drain channels
drainCh := func(ch chan *tailcfg.MapResponse) {
for {
select {
case <-ch:
// drain
default:
return
}
}
}
// Start update consumers for all nodes
node1.start()
node2.start()
node3.start()
defer node1.cleanup()
defer node2.cleanup()
defer node3.cleanup()
// Connect all nodes to the batcher
require.NoError(t, batcher.AddNode(node1.n.ID, node1.ch, tailcfg.CapabilityVersion(100)))
require.NoError(t, batcher.AddNode(node2.n.ID, node2.ch, tailcfg.CapabilityVersion(100)))
require.NoError(t, batcher.AddNode(node3.n.ID, node3.ch, tailcfg.CapabilityVersion(100)))
// Wait for all nodes to be connected
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, batcher.IsConnected(node1.n.ID), "node1 should be connected")
assert.True(c, batcher.IsConnected(node2.n.ID), "node2 should be connected")
assert.True(c, batcher.IsConnected(node3.n.ID), "node3 should be connected")
}, 5*time.Second, 50*time.Millisecond, "waiting for nodes to connect")
// Get initial work errors count
var initialWorkErrors int64
if lfb, ok := unwrapBatcher(batcher).(*LockFreeBatcher); ok {
initialWorkErrors = lfb.WorkErrors()
t.Logf("Initial work errors: %d", initialWorkErrors)
}
// Clear channels to prepare for the test
drainCh(node1.ch)
drainCh(node2.ch)
drainCh(node3.ch)
// Get node view for deletion
nodeToDelete, ok := st.GetNodeByID(node3.n.ID)
require.True(t, ok, "node3 should exist in state")
// Delete the node from state - this returns a NodeRemoved change
// In production, this change is sent to batcher via app.Change()
nodeChange, err := st.DeleteNode(nodeToDelete)
require.NoError(t, err, "should be able to delete node from state")
t.Logf("Deleted node %d from state, change: %s", node3.n.ID, nodeChange.Reason)
// Verify node is deleted from state
_, exists := st.GetNodeByID(node3.n.ID)
require.False(t, exists, "node3 should be deleted from state")
// Send the NodeRemoved change to batcher (this is what app.Change() does)
// With the fix, this should clean up node3 from batcher's internal state
batcher.AddWork(nodeChange)
// Wait for the batcher to process the removal and clean up the node
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.False(c, batcher.IsConnected(node3.n.ID), "node3 should be disconnected from batcher")
}, 5*time.Second, 50*time.Millisecond, "waiting for node removal to be processed")
t.Logf("Node %d connected in batcher after NodeRemoved: %v", node3.n.ID, batcher.IsConnected(node3.n.ID))
// Now queue changes that would have caused errors before the fix
// With the fix, these should NOT cause "node not found" errors
// because node3 was cleaned up when NodeRemoved was processed
batcher.AddWork(change.FullUpdate())
batcher.AddWork(change.PolicyChange())
// Wait for work to be processed and verify no errors occurred
// With the fix, no new errors should occur because the deleted node
// was cleaned up from batcher state when NodeRemoved was processed
assert.EventuallyWithT(t, func(c *assert.CollectT) {
var finalWorkErrors int64
if lfb, ok := unwrapBatcher(batcher).(*LockFreeBatcher); ok {
finalWorkErrors = lfb.WorkErrors()
}
newErrors := finalWorkErrors - initialWorkErrors
assert.Zero(c, newErrors, "Fix for #2924: should have no work errors after node deletion")
}, 5*time.Second, 100*time.Millisecond, "waiting for work processing to complete without errors")
// Verify remaining nodes still work correctly
drainCh(node1.ch)
drainCh(node2.ch)
batcher.AddWork(change.NodeAdded(node1.n.ID))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
// Node 1 and 2 should receive updates
stats1 := NodeStats{TotalUpdates: atomic.LoadInt64(&node1.updateCount)}
stats2 := NodeStats{TotalUpdates: atomic.LoadInt64(&node2.updateCount)}
assert.Positive(c, stats1.TotalUpdates, "node1 should have received updates")
assert.Positive(c, stats2.TotalUpdates, "node2 should have received updates")
}, 5*time.Second, 100*time.Millisecond, "waiting for remaining nodes to receive updates")
})
}
}
// unwrapBatcher extracts the underlying batcher from wrapper types.
func unwrapBatcher(b Batcher) Batcher {
if wrapper, ok := b.(*testBatcherWrapper); ok {
return unwrapBatcher(wrapper.Batcher)
}
return b
}

View File

@@ -478,7 +478,9 @@ func (s *State) DeleteNode(node types.NodeView) (change.Change, error) {
}
if !policyChange.IsEmpty() {
c = policyChange
// Merge policy change with NodeRemoved to preserve PeersRemoved info
// This ensures the batcher cleans up the deleted node from its state
c = c.Merge(policyChange)
}
return c, nil