2025-07-28 11:15:53 +02:00
package mapper
import (
2025-09-05 16:32:46 +02:00
"crypto/rand"
2025-12-12 14:28:09 +01:00
"errors"
2025-07-28 11:15:53 +02:00
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/types/change"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog/log"
"tailscale.com/tailcfg"
"tailscale.com/types/ptr"
)
2025-12-12 14:28:09 +01:00
var errConnectionClosed = errors . New ( "connection channel already closed" )
2025-07-28 11:15:53 +02:00
// LockFreeBatcher uses atomic operations and concurrent maps to eliminate mutex contention.
type LockFreeBatcher struct {
tick * time . Ticker
mapper * mapper
workers int
2025-07-05 23:30:47 +02:00
nodes * xsync . Map [ types . NodeID , * multiChannelNodeConn ]
2025-07-28 11:15:53 +02:00
connected * xsync . Map [ types . NodeID , * time . Time ]
// Work queue channel
2025-12-12 14:28:09 +01:00
workCh chan work
workChOnce sync . Once // Ensures workCh is only closed once
done chan struct { }
2025-12-18 06:42:32 -05:00
doneOnce sync . Once // Ensures done is only closed once
2025-07-28 11:15:53 +02:00
// Batching state
2025-12-15 14:36:21 +00:00
pendingChanges * xsync . Map [ types . NodeID , [ ] change . Change ]
2025-07-28 11:15:53 +02:00
// Metrics
totalNodes atomic . Int64
workQueuedCount atomic . Int64
workProcessed atomic . Int64
workErrors atomic . Int64
}
// AddNode registers a new node connection with the batcher and sends an initial map response.
// It creates or updates the node's connection data, validates the initial map generation,
// and notifies other nodes that this node has come online.
2025-07-05 23:30:47 +02:00
func ( b * LockFreeBatcher ) AddNode ( id types . NodeID , c chan <- * tailcfg . MapResponse , version tailcfg . CapabilityVersion ) error {
addNodeStart := time . Now ( )
// Generate connection ID
connID := generateConnectionID ( )
// Create new connection entry
now := time . Now ( )
newEntry := & connectionEntry {
id : connID ,
c : c ,
version : version ,
created : now ,
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// Initialize last used timestamp
newEntry . lastUsed . Store ( now . Unix ( ) )
2025-07-28 11:15:53 +02:00
2025-09-05 16:32:46 +02:00
// Get or create multiChannelNodeConn - this reuses existing offline nodes for rapid reconnection
nodeConn , loaded := b . nodes . LoadOrStore ( id , newMultiChannelNodeConn ( id , b . mapper ) )
2025-07-28 11:15:53 +02:00
2025-07-05 23:30:47 +02:00
if ! loaded {
2025-07-28 11:15:53 +02:00
b . totalNodes . Add ( 1 )
}
2025-09-05 16:32:46 +02:00
// Add connection to the list (lock-free)
nodeConn . addConnection ( newEntry )
// Use the worker pool for controlled concurrency instead of direct generation
initialMap , err := b . MapResponseFromChange ( id , change . FullSelf ( id ) )
2025-07-05 23:30:47 +02:00
if err != nil {
log . Error ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Err ( err ) . Msg ( "Initial map generation failed" )
nodeConn . removeConnectionByChannel ( c )
return fmt . Errorf ( "failed to generate initial map for node %d: %w" , id , err )
}
2025-07-28 11:15:53 +02:00
2025-07-05 23:30:47 +02:00
// Use a blocking send with timeout for initial map since the channel should be ready
// and we want to avoid the race condition where the receiver isn't ready yet
select {
case c <- initialMap :
// Success
case <- time . After ( 5 * time . Second ) :
log . Error ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Err ( fmt . Errorf ( "timeout" ) ) . Msg ( "Initial map send timeout" )
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Dur ( "timeout.duration" , 5 * time . Second ) .
Msg ( "Initial map send timed out because channel was blocked or receiver not ready" )
nodeConn . removeConnectionByChannel ( c )
return fmt . Errorf ( "failed to send initial map to node %d: timeout" , id )
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// Update connection status
b . connected . Store ( id , nil ) // nil = connected
// Node will automatically receive updates through the normal flow
// The initial full map already contains all current state
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Dur ( "total.duration" , time . Since ( addNodeStart ) ) .
Int ( "active.connections" , nodeConn . getActiveConnectionCount ( ) ) .
Msg ( "Node connection established in batcher because AddNode completed successfully" )
2025-07-28 11:15:53 +02:00
return nil
}
// RemoveNode disconnects a node from the batcher, marking it as offline and cleaning up its state.
2025-07-05 23:30:47 +02:00
// It validates the connection channel matches one of the current connections, closes that specific connection,
// and keeps the node entry alive for rapid reconnections instead of aggressive deletion.
// Reports if the node still has active connections after removal.
func ( b * LockFreeBatcher ) RemoveNode ( id types . NodeID , c chan <- * tailcfg . MapResponse ) bool {
nodeConn , exists := b . nodes . Load ( id )
if ! exists {
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Msg ( "RemoveNode called for non-existent node because node not found in batcher" )
return false
}
2025-07-28 11:15:53 +02:00
2025-09-05 16:32:46 +02:00
// Remove specific connection
removed := nodeConn . removeConnectionByChannel ( c )
if ! removed {
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Msg ( "RemoveNode: channel not found because connection already removed or invalid" )
return false
2025-07-28 11:15:53 +02:00
}
2025-07-05 23:30:47 +02:00
// Check if node has any remaining active connections
if nodeConn . hasActiveConnections ( ) {
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) .
Int ( "active.connections" , nodeConn . getActiveConnectionCount ( ) ) .
Msg ( "Node connection removed but keeping online because other connections remain" )
return true // Node still has active connections
}
2025-07-28 11:15:53 +02:00
2025-09-05 16:32:46 +02:00
// No active connections - keep the node entry alive for rapid reconnections
// The node will get a fresh full map when it reconnects
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , id . Uint64 ( ) ) . Msg ( "Node disconnected from batcher because all connections removed, keeping entry for rapid reconnection" )
2025-07-28 11:15:53 +02:00
b . connected . Store ( id , ptr . To ( time . Now ( ) ) )
2025-07-05 23:30:47 +02:00
return false
2025-07-28 11:15:53 +02:00
}
// AddWork queues a change to be processed by the batcher.
2025-12-15 14:36:21 +00:00
func ( b * LockFreeBatcher ) AddWork ( r ... change . Change ) {
b . addWork ( r ... )
2025-07-28 11:15:53 +02:00
}
func ( b * LockFreeBatcher ) Start ( ) {
2025-12-12 14:28:09 +01:00
b . done = make ( chan struct { } )
2025-07-28 11:15:53 +02:00
go b . doWork ( )
}
func ( b * LockFreeBatcher ) Close ( ) {
2025-12-18 06:42:32 -05:00
// Signal shutdown to all goroutines, only once
b . doneOnce . Do ( func ( ) {
if b . done != nil {
close ( b . done )
}
} )
2025-09-05 16:32:46 +02:00
2025-12-12 14:28:09 +01:00
// Only close workCh once using sync.Once to prevent races
b . workChOnce . Do ( func ( ) {
2025-09-05 16:32:46 +02:00
close ( b . workCh )
2025-12-12 14:28:09 +01:00
} )
2025-09-10 15:34:16 +02:00
// Close the underlying channels supplying the data to the clients.
b . nodes . Range ( func ( nodeID types . NodeID , conn * multiChannelNodeConn ) bool {
conn . close ( )
return true
} )
2025-07-28 11:15:53 +02:00
}
func ( b * LockFreeBatcher ) doWork ( ) {
for i := range b . workers {
go b . worker ( i + 1 )
}
2025-09-05 16:32:46 +02:00
// Create a cleanup ticker for removing truly disconnected nodes
cleanupTicker := time . NewTicker ( 5 * time . Minute )
defer cleanupTicker . Stop ( )
2025-07-28 11:15:53 +02:00
for {
select {
case <- b . tick . C :
// Process batched changes
b . processBatchedChanges ( )
2025-09-05 16:32:46 +02:00
case <- cleanupTicker . C :
// Clean up nodes that have been offline for too long
b . cleanupOfflineNodes ( )
2025-12-12 14:28:09 +01:00
case <- b . done :
log . Info ( ) . Msg ( "batcher done channel closed, stopping to feed workers" )
2025-07-28 11:15:53 +02:00
return
}
}
}
func ( b * LockFreeBatcher ) worker ( workerID int ) {
for {
select {
case w , ok := <- b . workCh :
if ! ok {
2025-09-10 15:34:16 +02:00
log . Debug ( ) . Int ( "worker.id" , workerID ) . Msgf ( "worker channel closing, shutting down worker %d" , workerID )
2025-07-28 11:15:53 +02:00
return
}
b . workProcessed . Add ( 1 )
// If the resultCh is set, it means that this is a work request
// where there is a blocking function waiting for the map that
// is being generated.
// This is used for synchronous map generation.
if w . resultCh != nil {
var result workResult
if nc , exists := b . nodes . Load ( w . nodeID ) ; exists {
2025-09-05 16:32:46 +02:00
var err error
2025-12-15 14:36:21 +00:00
2025-12-17 10:35:48 +00:00
result . mapResponse , err = generateMapResponse ( nc , b . mapper , w . c )
2025-09-05 16:32:46 +02:00
result . err = err
2025-07-28 11:15:53 +02:00
if result . err != nil {
b . workErrors . Add ( 1 )
log . Error ( ) . Err ( result . err ) .
2025-09-10 15:34:16 +02:00
Int ( "worker.id" , workerID ) .
2025-07-28 11:15:53 +02:00
Uint64 ( "node.id" , w . nodeID . Uint64 ( ) ) .
2025-12-17 10:35:48 +00:00
Str ( "reason" , w . c . Reason ) .
2025-07-28 11:15:53 +02:00
Msg ( "failed to generate map response for synchronous work" )
2025-12-15 14:36:21 +00:00
} else if result . mapResponse != nil {
// Update peer tracking for synchronous responses too
nc . updateSentPeers ( result . mapResponse )
2025-07-28 11:15:53 +02:00
}
} else {
result . err = fmt . Errorf ( "node %d not found" , w . nodeID )
2025-09-05 16:32:46 +02:00
2025-07-28 11:15:53 +02:00
b . workErrors . Add ( 1 )
log . Error ( ) . Err ( result . err ) .
2025-09-10 15:34:16 +02:00
Int ( "worker.id" , workerID ) .
2025-07-28 11:15:53 +02:00
Uint64 ( "node.id" , w . nodeID . Uint64 ( ) ) .
Msg ( "node not found for synchronous work" )
}
// Send result
select {
case w . resultCh <- result :
2025-12-12 14:28:09 +01:00
case <- b . done :
2025-07-28 11:15:53 +02:00
return
}
continue
}
// If resultCh is nil, this is an asynchronous work request
// that should be processed and sent to the node instead of
// returned to the caller.
if nc , exists := b . nodes . Load ( w . nodeID ) ; exists {
2025-07-05 23:30:47 +02:00
// Apply change to node - this will handle offline nodes gracefully
// and queue work for when they reconnect
2025-12-17 10:35:48 +00:00
err := nc . change ( w . c )
2025-07-28 11:15:53 +02:00
if err != nil {
b . workErrors . Add ( 1 )
log . Error ( ) . Err ( err ) .
2025-09-10 15:34:16 +02:00
Int ( "worker.id" , workerID ) .
2025-12-15 14:36:21 +00:00
Uint64 ( "node.id" , w . nodeID . Uint64 ( ) ) .
2025-12-17 10:35:48 +00:00
Str ( "reason" , w . c . Reason ) .
2025-07-28 11:15:53 +02:00
Msg ( "failed to apply change" )
}
}
2025-12-12 14:28:09 +01:00
case <- b . done :
log . Debug ( ) . Int ( "worker.id" , workerID ) . Msg ( "batcher shutting down, exiting worker" )
2025-07-28 11:15:53 +02:00
return
}
}
}
2025-12-15 14:36:21 +00:00
func ( b * LockFreeBatcher ) addWork ( r ... change . Change ) {
b . addToBatch ( r ... )
2025-07-28 11:15:53 +02:00
}
2025-07-05 23:30:47 +02:00
// queueWork safely queues work.
2025-07-28 11:15:53 +02:00
func ( b * LockFreeBatcher ) queueWork ( w work ) {
b . workQueuedCount . Add ( 1 )
select {
case b . workCh <- w :
// Successfully queued
2025-12-12 14:28:09 +01:00
case <- b . done :
2025-07-28 11:15:53 +02:00
// Batcher is shutting down
return
}
}
2025-12-16 18:31:14 +00:00
// addToBatch adds changes to the pending batch.
func ( b * LockFreeBatcher ) addToBatch ( changes ... change . Change ) {
// Clean up any nodes being permanently removed from the system.
//
// This handles the case where a node is deleted from state but the batcher
// still has it registered. By cleaning up here, we prevent "node not found"
// errors when workers try to generate map responses for deleted nodes.
//
// Safety: change.Change.PeersRemoved is ONLY populated when nodes are actually
// deleted from the system (via change.NodeRemoved in state.DeleteNode). Policy
// changes that affect peer visibility do NOT use this field - they set
// RequiresRuntimePeerComputation=true and compute removed peers at runtime,
// putting them in tailcfg.MapResponse.PeersRemoved (a different struct).
// Therefore, this cleanup only removes nodes that are truly being deleted,
// not nodes that are still connected but have lost visibility of certain peers.
//
// See: https://github.com/juanfont/headscale/issues/2924
for _ , ch := range changes {
for _ , removedID := range ch . PeersRemoved {
if _ , existed := b . nodes . LoadAndDelete ( removedID ) ; existed {
b . totalNodes . Add ( - 1 )
log . Debug ( ) .
Uint64 ( "node.id" , removedID . Uint64 ( ) ) .
Msg ( "Removed deleted node from batcher" )
}
b . connected . Delete ( removedID )
b . pendingChanges . Delete ( removedID )
}
}
// Short circuit if any of the changes is a full update, which
2025-07-05 23:30:47 +02:00
// means we can skip sending individual changes.
2025-12-17 10:35:48 +00:00
if change . HasFull ( changes ) {
2025-07-05 23:30:47 +02:00
b . nodes . Range ( func ( nodeID types . NodeID , _ * multiChannelNodeConn ) bool {
2025-12-15 14:36:21 +00:00
b . pendingChanges . Store ( nodeID , [ ] change . Change { change . FullUpdate ( ) } )
2025-07-05 23:30:47 +02:00
return true
} )
2025-12-15 14:36:21 +00:00
2025-07-05 23:30:47 +02:00
return
2025-07-28 11:15:53 +02:00
}
2025-12-17 10:35:48 +00:00
broadcast , targeted := change . SplitTargetedAndBroadcast ( changes )
2025-09-05 16:32:46 +02:00
2025-12-17 10:35:48 +00:00
// Handle targeted changes - send only to the specific node
for _ , ch := range targeted {
pending , _ := b . pendingChanges . LoadOrStore ( ch . TargetNode , [ ] change . Change { } )
pending = append ( pending , ch )
b . pendingChanges . Store ( ch . TargetNode , pending )
2025-07-28 11:15:53 +02:00
}
2025-12-17 10:35:48 +00:00
// Handle broadcast changes - send to all nodes, filtering as needed
2025-12-15 14:36:21 +00:00
if len ( broadcast ) > 0 {
b . nodes . Range ( func ( nodeID types . NodeID , _ * multiChannelNodeConn ) bool {
filtered := change . FilterForNode ( nodeID , broadcast )
2025-07-28 11:15:53 +02:00
2025-12-15 14:36:21 +00:00
if len ( filtered ) > 0 {
2025-12-17 10:35:48 +00:00
pending , _ := b . pendingChanges . LoadOrStore ( nodeID , [ ] change . Change { } )
pending = append ( pending , filtered ... )
b . pendingChanges . Store ( nodeID , pending )
2025-12-15 14:36:21 +00:00
}
2025-07-05 23:30:47 +02:00
2025-12-15 14:36:21 +00:00
return true
} )
}
2025-07-28 11:15:53 +02:00
}
2025-07-05 23:30:47 +02:00
// processBatchedChanges processes all pending batched changes.
2025-07-28 11:15:53 +02:00
func ( b * LockFreeBatcher ) processBatchedChanges ( ) {
if b . pendingChanges == nil {
return
}
// Process all pending changes
2025-12-17 10:35:48 +00:00
b . pendingChanges . Range ( func ( nodeID types . NodeID , pending [ ] change . Change ) bool {
if len ( pending ) == 0 {
2025-07-28 11:15:53 +02:00
return true
}
2025-12-17 10:35:48 +00:00
// Send all batched changes for this node
for _ , ch := range pending {
b . queueWork ( work { c : ch , nodeID : nodeID , resultCh : nil } )
2025-07-28 11:15:53 +02:00
}
// Clear the pending changes for this node
b . pendingChanges . Delete ( nodeID )
2025-07-05 23:30:47 +02:00
2025-07-28 11:15:53 +02:00
return true
} )
}
2025-09-05 16:32:46 +02:00
// cleanupOfflineNodes removes nodes that have been offline for too long to prevent memory leaks.
2025-09-10 15:34:16 +02:00
// TODO(kradalby): reevaluate if we want to keep this.
2025-09-05 16:32:46 +02:00
func ( b * LockFreeBatcher ) cleanupOfflineNodes ( ) {
cleanupThreshold := 15 * time . Minute
now := time . Now ( )
var nodesToCleanup [ ] types . NodeID
// Find nodes that have been offline for too long
b . connected . Range ( func ( nodeID types . NodeID , disconnectTime * time . Time ) bool {
if disconnectTime != nil && now . Sub ( * disconnectTime ) > cleanupThreshold {
// Double-check the node doesn't have active connections
if nodeConn , exists := b . nodes . Load ( nodeID ) ; exists {
if ! nodeConn . hasActiveConnections ( ) {
nodesToCleanup = append ( nodesToCleanup , nodeID )
}
}
}
return true
} )
// Clean up the identified nodes
for _ , nodeID := range nodesToCleanup {
log . Info ( ) . Uint64 ( "node.id" , nodeID . Uint64 ( ) ) .
Dur ( "offline_duration" , cleanupThreshold ) .
Msg ( "Cleaning up node that has been offline for too long" )
b . nodes . Delete ( nodeID )
b . connected . Delete ( nodeID )
b . totalNodes . Add ( - 1 )
}
if len ( nodesToCleanup ) > 0 {
log . Info ( ) . Int ( "cleaned_nodes" , len ( nodesToCleanup ) ) .
Msg ( "Completed cleanup of long-offline nodes" )
}
}
// IsConnected is lock-free read that checks if a node has any active connections.
2025-07-28 11:15:53 +02:00
func ( b * LockFreeBatcher ) IsConnected ( id types . NodeID ) bool {
2025-07-05 23:30:47 +02:00
// First check if we have active connections for this node
if nodeConn , exists := b . nodes . Load ( id ) ; exists {
if nodeConn . hasActiveConnections ( ) {
return true
}
}
// Check disconnected timestamp with grace period
val , ok := b . connected . Load ( id )
if ! ok {
return false
2025-07-28 11:15:53 +02:00
}
2025-07-05 23:30:47 +02:00
// nil means connected
if val == nil {
return true
}
2025-07-28 11:15:53 +02:00
return false
}
// ConnectedMap returns a lock-free map of all connected nodes.
func ( b * LockFreeBatcher ) ConnectedMap ( ) * xsync . Map [ types . NodeID , bool ] {
ret := xsync . NewMap [ types . NodeID , bool ] ( )
2025-07-05 23:30:47 +02:00
// First, add all nodes with active connections
b . nodes . Range ( func ( id types . NodeID , nodeConn * multiChannelNodeConn ) bool {
if nodeConn . hasActiveConnections ( ) {
ret . Store ( id , true )
}
return true
} )
// Then add all entries from the connected map
2025-07-28 11:15:53 +02:00
b . connected . Range ( func ( id types . NodeID , val * time . Time ) bool {
2025-07-05 23:30:47 +02:00
// Only add if not already added as connected above
if _ , exists := ret . Load ( id ) ; ! exists {
if val == nil {
// nil means connected
ret . Store ( id , true )
} else {
// timestamp means disconnected
ret . Store ( id , false )
}
}
2025-07-28 11:15:53 +02:00
return true
} )
return ret
}
// MapResponseFromChange queues work to generate a map response and waits for the result.
// This allows synchronous map generation using the same worker pool.
2025-12-17 10:35:48 +00:00
func ( b * LockFreeBatcher ) MapResponseFromChange ( id types . NodeID , ch change . Change ) ( * tailcfg . MapResponse , error ) {
2025-07-28 11:15:53 +02:00
resultCh := make ( chan workResult , 1 )
// Queue the work with a result channel using the safe queueing method
2025-12-17 10:35:48 +00:00
b . queueWork ( work { c : ch , nodeID : id , resultCh : resultCh } )
2025-07-28 11:15:53 +02:00
// Wait for the result
select {
case result := <- resultCh :
return result . mapResponse , result . err
2025-12-12 14:28:09 +01:00
case <- b . done :
2025-07-28 11:15:53 +02:00
return nil , fmt . Errorf ( "batcher shutting down while generating map response for node %d" , id )
}
}
2025-09-05 16:32:46 +02:00
// connectionEntry represents a single connection to a node.
type connectionEntry struct {
id string // unique connection ID
c chan <- * tailcfg . MapResponse
version tailcfg . CapabilityVersion
created time . Time
lastUsed atomic . Int64 // Unix timestamp of last successful send
2025-12-12 14:28:09 +01:00
closed atomic . Bool // Indicates if this connection has been closed
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// multiChannelNodeConn manages multiple concurrent connections for a single node.
type multiChannelNodeConn struct {
2025-07-28 11:15:53 +02:00
id types . NodeID
mapper * mapper
2025-09-05 16:32:46 +02:00
mutex sync . RWMutex
connections [ ] * connectionEntry
2025-07-28 11:15:53 +02:00
updateCount atomic . Int64
2025-12-15 14:36:21 +00:00
// lastSentPeers tracks which peers were last sent to this node.
// This enables computing diffs for policy changes instead of sending
// full peer lists (which clients interpret as "no change" when empty).
// Using xsync.Map for lock-free concurrent access.
lastSentPeers * xsync . Map [ tailcfg . NodeID , struct { } ]
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// generateConnectionID generates a unique connection identifier.
func generateConnectionID ( ) string {
bytes := make ( [ ] byte , 8 )
rand . Read ( bytes )
return fmt . Sprintf ( "%x" , bytes )
}
// newMultiChannelNodeConn creates a new multi-channel node connection.
func newMultiChannelNodeConn ( id types . NodeID , mapper * mapper ) * multiChannelNodeConn {
return & multiChannelNodeConn {
2025-12-15 14:36:21 +00:00
id : id ,
mapper : mapper ,
lastSentPeers : xsync . NewMap [ tailcfg . NodeID , struct { } ] ( ) ,
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
}
2025-07-28 11:15:53 +02:00
2025-09-10 15:34:16 +02:00
func ( mc * multiChannelNodeConn ) close ( ) {
mc . mutex . Lock ( )
defer mc . mutex . Unlock ( )
for _ , conn := range mc . connections {
2025-12-12 14:28:09 +01:00
// Mark as closed before closing the channel to prevent
// send on closed channel panics from concurrent workers
conn . closed . Store ( true )
2025-09-10 15:34:16 +02:00
close ( conn . c )
}
}
2025-09-05 16:32:46 +02:00
// addConnection adds a new connection.
func ( mc * multiChannelNodeConn ) addConnection ( entry * connectionEntry ) {
mutexWaitStart := time . Now ( )
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , entry . c ) ) . Str ( "conn.id" , entry . id ) .
Msg ( "addConnection: waiting for mutex - POTENTIAL CONTENTION POINT" )
mc . mutex . Lock ( )
mutexWaitDur := time . Since ( mutexWaitStart )
defer mc . mutex . Unlock ( )
mc . connections = append ( mc . connections , entry )
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , entry . c ) ) . Str ( "conn.id" , entry . id ) .
Int ( "total_connections" , len ( mc . connections ) ) .
Dur ( "mutex_wait_time" , mutexWaitDur ) .
Msg ( "Successfully added connection after mutex wait" )
}
// removeConnectionByChannel removes a connection by matching channel pointer.
func ( mc * multiChannelNodeConn ) removeConnectionByChannel ( c chan <- * tailcfg . MapResponse ) bool {
mc . mutex . Lock ( )
defer mc . mutex . Unlock ( )
for i , entry := range mc . connections {
if entry . c == c {
// Remove this connection
mc . connections = append ( mc . connections [ : i ] , mc . connections [ i + 1 : ] ... )
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , c ) ) .
Int ( "remaining_connections" , len ( mc . connections ) ) .
Msg ( "Successfully removed connection" )
return true
}
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
return false
}
// hasActiveConnections checks if the node has any active connections.
func ( mc * multiChannelNodeConn ) hasActiveConnections ( ) bool {
mc . mutex . RLock ( )
defer mc . mutex . RUnlock ( )
2025-07-28 11:15:53 +02:00
2025-09-05 16:32:46 +02:00
return len ( mc . connections ) > 0
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// getActiveConnectionCount returns the number of active connections.
func ( mc * multiChannelNodeConn ) getActiveConnectionCount ( ) int {
mc . mutex . RLock ( )
defer mc . mutex . RUnlock ( )
return len ( mc . connections )
}
// send broadcasts data to all active connections for the node.
func ( mc * multiChannelNodeConn ) send ( data * tailcfg . MapResponse ) error {
2025-09-10 15:34:16 +02:00
if data == nil {
return nil
}
2025-09-05 16:32:46 +02:00
mc . mutex . Lock ( )
defer mc . mutex . Unlock ( )
if len ( mc . connections ) == 0 {
// During rapid reconnection, nodes may temporarily have no active connections
// This is not an error - the node will receive a full map when it reconnects
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) .
Msg ( "send: skipping send to node with no active connections (likely rapid reconnection)" )
return nil // Return success instead of error
}
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) .
Int ( "total_connections" , len ( mc . connections ) ) .
Msg ( "send: broadcasting to all connections" )
var lastErr error
successCount := 0
var failedConnections [ ] int // Track failed connections for removal
// Send to all connections
for i , conn := range mc . connections {
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , conn . c ) ) .
Str ( "conn.id" , conn . id ) . Int ( "connection_index" , i ) .
Msg ( "send: attempting to send to connection" )
if err := conn . send ( data ) ; err != nil {
lastErr = err
failedConnections = append ( failedConnections , i )
log . Warn ( ) . Err ( err ) .
Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , conn . c ) ) .
Str ( "conn.id" , conn . id ) . Int ( "connection_index" , i ) .
Msg ( "send: connection send failed" )
} else {
successCount ++
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) . Str ( "chan" , fmt . Sprintf ( "%p" , conn . c ) ) .
Str ( "conn.id" , conn . id ) . Int ( "connection_index" , i ) .
Msg ( "send: successfully sent to connection" )
}
}
// Remove failed connections (in reverse order to maintain indices)
for i := len ( failedConnections ) - 1 ; i >= 0 ; i -- {
idx := failedConnections [ i ]
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) .
Str ( "conn.id" , mc . connections [ idx ] . id ) .
Msg ( "send: removing failed connection" )
mc . connections = append ( mc . connections [ : idx ] , mc . connections [ idx + 1 : ] ... )
}
mc . updateCount . Add ( 1 )
2025-10-17 10:07:00 +02:00
log . Debug ( ) . Uint64 ( "node.id" , mc . id . Uint64 ( ) ) .
2025-09-05 16:32:46 +02:00
Int ( "successful_sends" , successCount ) .
Int ( "failed_connections" , len ( failedConnections ) ) .
Int ( "remaining_connections" , len ( mc . connections ) ) .
Msg ( "send: completed broadcast" )
// Success if at least one send succeeded
if successCount > 0 {
return nil
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
return fmt . Errorf ( "node %d: all connections failed, last error: %w" , mc . id , lastErr )
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// send sends data to a single connection entry with timeout-based stale connection detection.
func ( entry * connectionEntry ) send ( data * tailcfg . MapResponse ) error {
2025-09-10 15:34:16 +02:00
if data == nil {
return nil
}
2025-12-12 14:28:09 +01:00
// Check if the connection has been closed to prevent send on closed channel panic.
// This can happen during shutdown when Close() is called while workers are still processing.
if entry . closed . Load ( ) {
return fmt . Errorf ( "connection %s: %w" , entry . id , errConnectionClosed )
}
2025-09-05 16:32:46 +02:00
// Use a short timeout to detect stale connections where the client isn't reading the channel.
// This is critical for detecting Docker containers that are forcefully terminated
// but still have channels that appear open.
select {
case entry . c <- data :
// Update last used timestamp on successful send
entry . lastUsed . Store ( time . Now ( ) . Unix ( ) )
return nil
case <- time . After ( 50 * time . Millisecond ) :
// Connection is likely stale - client isn't reading from channel
// This catches the case where Docker containers are killed but channels remain open
return fmt . Errorf ( "connection %s: timeout sending to channel (likely stale connection)" , entry . id )
2025-07-28 11:15:53 +02:00
}
}
2025-09-05 16:32:46 +02:00
// nodeID returns the node ID.
func ( mc * multiChannelNodeConn ) nodeID ( ) types . NodeID {
return mc . id
}
// version returns the capability version from the first active connection.
// All connections for a node should have the same version in practice.
func ( mc * multiChannelNodeConn ) version ( ) tailcfg . CapabilityVersion {
mc . mutex . RLock ( )
defer mc . mutex . RUnlock ( )
if len ( mc . connections ) == 0 {
2025-07-28 11:15:53 +02:00
return 0
}
2025-09-05 16:32:46 +02:00
return mc . connections [ 0 ] . version
2025-07-28 11:15:53 +02:00
}
2025-12-15 14:36:21 +00:00
// updateSentPeers updates the tracked peer state based on a sent MapResponse.
// This must be called after successfully sending a response to keep track of
// what the client knows about, enabling accurate diffs for future updates.
func ( mc * multiChannelNodeConn ) updateSentPeers ( resp * tailcfg . MapResponse ) {
if resp == nil {
return
}
// Full peer list replaces tracked state entirely
if resp . Peers != nil {
mc . lastSentPeers . Clear ( )
for _ , peer := range resp . Peers {
mc . lastSentPeers . Store ( peer . ID , struct { } { } )
}
}
// Incremental additions
for _ , peer := range resp . PeersChanged {
mc . lastSentPeers . Store ( peer . ID , struct { } { } )
}
// Incremental removals
for _ , id := range resp . PeersRemoved {
mc . lastSentPeers . Delete ( id )
}
}
// computePeerDiff compares the current peer list against what was last sent
// and returns the peers that were removed (in lastSentPeers but not in current).
func ( mc * multiChannelNodeConn ) computePeerDiff ( currentPeers [ ] tailcfg . NodeID ) [ ] tailcfg . NodeID {
currentSet := make ( map [ tailcfg . NodeID ] struct { } , len ( currentPeers ) )
for _ , id := range currentPeers {
currentSet [ id ] = struct { } { }
}
var removed [ ] tailcfg . NodeID
// Find removed: in lastSentPeers but not in current
mc . lastSentPeers . Range ( func ( id tailcfg . NodeID , _ struct { } ) bool {
if _ , exists := currentSet [ id ] ; ! exists {
removed = append ( removed , id )
}
return true
} )
return removed
}
2025-09-05 16:32:46 +02:00
// change applies a change to all active connections for the node.
2025-12-15 14:36:21 +00:00
func ( mc * multiChannelNodeConn ) change ( r change . Change ) error {
return handleNodeChange ( mc , mc . mapper , r )
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// DebugNodeInfo contains debug information about a node's connections.
type DebugNodeInfo struct {
Connected bool ` json:"connected" `
ActiveConnections int ` json:"active_connections" `
2025-07-28 11:15:53 +02:00
}
2025-09-05 16:32:46 +02:00
// Debug returns a pre-baked map of node debug information for the debug interface.
func ( b * LockFreeBatcher ) Debug ( ) map [ types . NodeID ] DebugNodeInfo {
result := make ( map [ types . NodeID ] DebugNodeInfo )
2025-07-28 11:15:53 +02:00
2025-09-05 16:32:46 +02:00
// Get all nodes with their connection status using immediate connection logic
// (no grace period) for debug purposes
b . nodes . Range ( func ( id types . NodeID , nodeConn * multiChannelNodeConn ) bool {
nodeConn . mutex . RLock ( )
activeConnCount := len ( nodeConn . connections )
nodeConn . mutex . RUnlock ( )
// Use immediate connection status: if active connections exist, node is connected
// If not, check the connected map for nil (connected) vs timestamp (disconnected)
connected := false
if activeConnCount > 0 {
connected = true
} else {
// Check connected map for immediate status
if val , ok := b . connected . Load ( id ) ; ok && val == nil {
connected = true
}
}
result [ id ] = DebugNodeInfo {
Connected : connected ,
ActiveConnections : activeConnCount ,
}
return true
} )
2025-07-28 11:15:53 +02:00
2025-07-05 23:30:47 +02:00
// Add all entries from the connected map to capture both connected and disconnected nodes
b . connected . Range ( func ( id types . NodeID , val * time . Time ) bool {
// Only add if not already processed above
if _ , exists := result [ id ] ; ! exists {
// Use immediate connection status for debug (no grace period)
connected := ( val == nil ) // nil means connected, timestamp means disconnected
result [ id ] = DebugNodeInfo {
Connected : connected ,
ActiveConnections : 0 ,
}
}
return true
} )
return result
2025-07-28 11:15:53 +02:00
}
2025-08-27 17:09:13 +02:00
func ( b * LockFreeBatcher ) DebugMapResponses ( ) ( map [ types . NodeID ] [ ] tailcfg . MapResponse , error ) {
return b . mapper . debugMapResponses ( )
}
2025-12-16 18:31:14 +00:00
// WorkErrors returns the count of work errors encountered.
// This is primarily useful for testing and debugging.
func ( b * LockFreeBatcher ) WorkErrors ( ) int64 {
return b . workErrors . Load ( )
}