2023-05-10 09:24:05 +02:00
package hscontrol
2021-08-13 10:33:50 +01:00
import (
2021-12-31 20:51:20 +01:00
"context"
2025-07-28 11:15:53 +02:00
"encoding/binary"
"encoding/json"
2021-12-31 20:51:20 +01:00
"fmt"
2024-02-23 10:59:24 +01:00
"math/rand/v2"
2021-08-18 23:24:22 +01:00
"net/http"
2021-08-13 10:33:50 +01:00
"time"
2023-05-21 19:37:59 +03:00
"github.com/juanfont/headscale/hscontrol/types"
2025-07-28 11:15:53 +02:00
"github.com/juanfont/headscale/hscontrol/util"
2021-08-13 10:33:50 +01:00
"github.com/rs/zerolog/log"
2024-05-24 09:15:34 +01:00
"github.com/sasha-s/go-deadlock"
2021-08-13 10:33:50 +01:00
"tailscale.com/tailcfg"
2025-07-28 11:15:53 +02:00
"tailscale.com/util/zstdframe"
2021-08-13 10:33:50 +01:00
)
2021-11-14 18:31:51 +01:00
const (
2024-02-23 10:59:24 +01:00
keepAliveInterval = 50 * time . Second
2021-11-14 18:31:51 +01:00
)
2022-05-16 14:59:46 +02:00
type contextKey string
2023-09-24 13:42:05 +02:00
const nodeNameContextKey = contextKey ( "nodeName" )
2022-05-16 14:59:46 +02:00
2024-02-23 10:59:24 +01:00
type mapSession struct {
h * Headscale
req tailcfg . MapRequest
ctx context . Context
capVer tailcfg . CapabilityVersion
2024-01-18 17:30:25 +01:00
2024-05-24 09:15:34 +01:00
cancelChMu deadlock . Mutex
2023-09-11 06:18:31 -05:00
2025-07-28 11:15:53 +02:00
ch chan * tailcfg . MapResponse
2024-05-24 09:15:34 +01:00
cancelCh chan struct { }
cancelChOpen bool
2023-09-11 11:45:46 -05:00
2024-05-24 09:15:34 +01:00
keepAlive time . Duration
2024-04-10 15:35:09 +02:00
keepAliveTicker * time . Ticker
2025-07-28 11:15:53 +02:00
node * types . Node
2024-02-23 10:59:24 +01:00
w http . ResponseWriter
2023-09-11 11:45:46 -05:00
2024-02-23 10:59:24 +01:00
warnf func ( string , ... any )
infof func ( string , ... any )
tracef func ( string , ... any )
errf func ( error , string , ... any )
}
2023-09-11 06:18:31 -05:00
2024-02-23 10:59:24 +01:00
func ( h * Headscale ) newMapSession (
ctx context . Context ,
req tailcfg . MapRequest ,
w http . ResponseWriter ,
2025-07-28 11:15:53 +02:00
node * types . Node ,
2024-02-23 10:59:24 +01:00
) * mapSession {
2025-07-28 11:15:53 +02:00
warnf , infof , tracef , errf := logPollFunc ( req , node )
2024-02-23 10:59:24 +01:00
2024-05-24 09:15:34 +01:00
ka := keepAliveInterval + ( time . Duration ( rand . IntN ( 9000 ) ) * time . Millisecond )
2024-02-23 10:59:24 +01:00
return & mapSession {
h : h ,
ctx : ctx ,
req : req ,
w : w ,
2025-07-28 11:15:53 +02:00
node : node ,
2024-02-23 10:59:24 +01:00
capVer : req . Version ,
2025-07-28 11:15:53 +02:00
ch : make ( chan * tailcfg . MapResponse , h . cfg . Tuning . NodeMapSessionBufferedChanSize ) ,
2024-05-24 09:15:34 +01:00
cancelCh : make ( chan struct { } ) ,
cancelChOpen : true ,
2024-02-23 10:59:24 +01:00
2024-05-24 09:15:34 +01:00
keepAlive : ka ,
keepAliveTicker : nil ,
2024-04-10 15:35:09 +02:00
2024-02-23 10:59:24 +01:00
// Loggers
warnf : warnf ,
infof : infof ,
tracef : tracef ,
errf : errf ,
}
}
2023-09-11 06:18:31 -05:00
2024-02-23 10:59:24 +01:00
func ( m * mapSession ) isStreaming ( ) bool {
2025-07-28 11:15:53 +02:00
return m . req . Stream
2024-02-23 10:59:24 +01:00
}
2023-12-09 18:09:24 +01:00
2024-02-23 10:59:24 +01:00
func ( m * mapSession ) isEndpointUpdate ( ) bool {
2025-07-28 11:15:53 +02:00
return ! m . req . Stream && m . req . OmitPeers
2024-02-23 10:59:24 +01:00
}
2023-12-09 18:09:24 +01:00
2024-05-24 09:15:34 +01:00
func ( m * mapSession ) resetKeepAlive ( ) {
m . keepAliveTicker . Reset ( m . keepAlive )
}
2022-08-24 20:53:55 +10:00
2024-07-18 10:01:59 +02:00
func ( m * mapSession ) beforeServeLongPoll ( ) {
if m . node . IsEphemeral ( ) {
2025-07-28 11:15:53 +02:00
m . h . ephemeralGC . Cancel ( m . node . ID )
2024-07-18 10:01:59 +02:00
}
}
func ( m * mapSession ) afterServeLongPoll ( ) {
if m . node . IsEphemeral ( ) {
2025-07-28 11:15:53 +02:00
m . h . ephemeralGC . Schedule ( m . node . ID , m . h . cfg . EphemeralNodeInactivityTimeout )
2024-07-18 10:01:59 +02:00
}
}
2024-05-24 09:15:34 +01:00
// serve handles non-streaming requests.
func ( m * mapSession ) serve ( ) {
2024-02-23 10:59:24 +01:00
// This is the mechanism where the node gives us information about its
// current configuration.
//
2025-07-05 23:30:47 +02:00
// Process the MapRequest to update node state (endpoints, hostinfo, etc.)
c , err := m . h . state . UpdateNodeFromMapRequest ( m . node . ID , m . req )
if err != nil {
httpError ( m . w , err )
return
}
m . h . Change ( c )
2025-07-28 11:15:53 +02:00
// If OmitPeers is true and Stream is false
2025-02-05 16:10:18 +01:00
// then the server will let clients update their endpoints without
2024-02-23 10:59:24 +01:00
// breaking existing long-polling (Stream == true) connections.
// In this case, the server can omit the entire response; the client
// only checks the HTTP response status code.
//
// This is what Tailscale calls a Lite update, the client ignores
// the response and just wants a 200.
2025-07-28 11:15:53 +02:00
// !req.stream && req.OmitPeers
2024-02-23 10:59:24 +01:00
if m . isEndpointUpdate ( ) {
2025-07-28 11:15:53 +02:00
m . w . WriteHeader ( http . StatusOK )
mapResponseEndpointUpdates . WithLabelValues ( "ok" ) . Inc ( )
2023-06-21 11:29:52 +02:00
}
2024-05-24 09:15:34 +01:00
}
// serveLongPoll ensures the node gets the appropriate updates from either
// polling or immediate responses.
//
//nolint:gocyclo
func ( m * mapSession ) serveLongPoll ( ) {
2024-07-18 10:01:59 +02:00
m . beforeServeLongPoll ( )
2025-07-05 23:30:47 +02:00
log . Trace ( ) . Caller ( ) . Uint64 ( "node.id" , m . node . ID . Uint64 ( ) ) . Str ( "node.name" , m . node . Hostname ) . Msg ( "Long poll session started because client connected" )
2024-05-24 09:15:34 +01:00
// Clean up the session when the client disconnects
defer func ( ) {
m . cancelChMu . Lock ( )
m . cancelChOpen = false
close ( m . cancelCh )
m . cancelChMu . Unlock ( )
2025-07-05 23:30:47 +02:00
_ = m . h . mapBatcher . RemoveNode ( m . node . ID , m . ch )
// When a node disconnects, it might rapidly reconnect (e.g. mobile clients, network weather).
// Instead of immediately marking the node as offline, we wait a few seconds to see if it reconnects.
// If it does reconnect, the existing mapSession will be replaced and the node remains online.
// If it doesn't reconnect within the timeout, we mark it as offline.
//
// This avoids flapping nodes in the UI and unnecessary churn in the network.
// This is not my favourite solution, but it kind of works in our eventually consistent world.
ticker := time . NewTicker ( time . Second )
defer ticker . Stop ( )
disconnected := true
// Wait up to 10 seconds for the node to reconnect.
// 10 seconds was arbitrary chosen as a reasonable time to reconnect.
for range 10 {
if m . h . mapBatcher . IsConnected ( m . node . ID ) {
disconnected = false
break
}
<- ticker . C
2024-05-24 09:15:34 +01:00
}
2025-07-28 11:15:53 +02:00
2025-07-05 23:30:47 +02:00
if disconnected {
disconnectChanges , err := m . h . state . Disconnect ( m . node . ID )
if err != nil {
m . errf ( err , "Failed to disconnect node %s" , m . node . Hostname )
}
2024-05-24 09:15:34 +01:00
2025-07-05 23:30:47 +02:00
m . h . Change ( disconnectChanges ... )
m . afterServeLongPoll ( )
m . infof ( "node has disconnected, mapSession: %p, chan: %p" , m , m . ch )
}
2024-05-24 09:15:34 +01:00
} ( )
2024-02-23 10:59:24 +01:00
// Set up the client stream
m . h . pollNetMapStreamWG . Add ( 1 )
defer m . h . pollNetMapStreamWG . Done ( )
2023-09-11 06:18:31 -05:00
2025-07-28 11:15:53 +02:00
ctx , cancel := context . WithCancel ( context . WithValue ( m . ctx , nodeNameContextKey , m . node . Hostname ) )
2022-06-20 12:30:51 +02:00
defer cancel ( )
2022-04-10 00:37:13 +02:00
2024-05-24 09:15:34 +01:00
m . keepAliveTicker = time . NewTicker ( m . keepAlive )
2025-07-05 23:30:47 +02:00
// Process the initial MapRequest to update node state (endpoints, hostinfo, etc.)
// CRITICAL: This must be done BEFORE calling Connect() to ensure routes are properly
// synchronized. When nodes reconnect, they send their hostinfo with announced routes
// in the MapRequest. We need this data in NodeStore before Connect() sets up the
// primary routes, otherwise SubnetRoutes() returns empty and the node is removed
// from AvailableRoutes.
mapReqChange , err := m . h . state . UpdateNodeFromMapRequest ( m . node . ID , m . req )
if err != nil {
m . errf ( err , "failed to update node from initial MapRequest" )
2025-07-28 11:15:53 +02:00
return
}
2025-07-05 23:30:47 +02:00
// Connect the node after its state has been updated.
// We send two separate change notifications because these are distinct operations:
// 1. UpdateNodeFromMapRequest: processes the client's reported state (routes, endpoints, hostinfo)
// 2. Connect: marks the node online and recalculates primary routes based on the updated state
// While this results in two notifications, it ensures route data is synchronized before
// primary route selection occurs, which is critical for proper HA subnet router failover.
connectChanges := m . h . state . Connect ( m . node . ID )
2024-05-24 09:15:34 +01:00
m . infof ( "node has connected, mapSession: %p, chan: %p" , m , m . ch )
2025-09-05 16:32:46 +02:00
// TODO(kradalby): Redo the comments here
// Add node to batcher so it can receive updates,
// adding this before connecting it to the state ensure that
// it does not miss any updates that might be sent in the split
// time between the node connecting and the batcher being ready.
if err := m . h . mapBatcher . AddNode ( m . node . ID , m . ch , m . capVer ) ; err != nil {
m . errf ( err , "failed to add node to batcher" )
log . Error ( ) . Uint64 ( "node.id" , m . node . ID . Uint64 ( ) ) . Str ( "node.name" , m . node . Hostname ) . Err ( err ) . Msg ( "AddNode failed in poll session" )
return
}
log . Debug ( ) . Caller ( ) . Uint64 ( "node.id" , m . node . ID . Uint64 ( ) ) . Str ( "node.name" , m . node . Hostname ) . Msg ( "AddNode succeeded in poll session because node added to batcher" )
m . h . Change ( mapReqChange )
m . h . Change ( connectChanges ... )
2024-02-23 10:59:24 +01:00
// Loop through updates and continuously send them to the
// client.
for {
2024-04-27 10:47:39 +02:00
// consume channels with update, keep alives or "batch" blocking signals
select {
case <- m . cancelCh :
m . tracef ( "poll cancelled received" )
2024-05-24 09:15:34 +01:00
mapResponseEnded . WithLabelValues ( "cancelled" ) . Inc ( )
2024-04-27 10:47:39 +02:00
return
2024-05-24 09:15:34 +01:00
2024-04-27 10:47:39 +02:00
case <- ctx . Done ( ) :
2025-09-05 16:32:46 +02:00
m . tracef ( "poll context done chan:%p" , m . ch )
2024-05-24 09:15:34 +01:00
mapResponseEnded . WithLabelValues ( "done" ) . Inc ( )
2024-04-27 10:47:39 +02:00
return
2024-05-24 09:15:34 +01:00
// Consume updates sent to node
case update , ok := <- m . ch :
2025-07-28 11:15:53 +02:00
m . tracef ( "received update from channel, ok: %t" , ok )
2024-05-24 09:15:34 +01:00
if ! ok {
m . tracef ( "update channel closed, streaming session is likely being replaced" )
return
}
2025-07-28 11:15:53 +02:00
if err := m . writeMap ( update ) ; err != nil {
m . errf ( err , "cannot write update to client" )
2024-09-11 12:00:32 +02:00
return
}
2025-07-28 11:15:53 +02:00
m . tracef ( "update sent" )
m . resetKeepAlive ( )
2023-12-09 18:09:24 +01:00
2024-04-10 15:35:09 +02:00
case <- m . keepAliveTicker . C :
2025-07-28 11:15:53 +02:00
if err := m . writeMap ( & keepAlive ) ; err != nil {
m . errf ( err , "cannot write keep alive" )
2024-02-23 10:59:24 +01:00
return
}
2024-04-21 18:28:17 +02:00
2024-05-24 09:15:34 +01:00
if debugHighCardinalityMetrics {
2025-07-28 11:15:53 +02:00
mapResponseLastSentSeconds . WithLabelValues ( "keepalive" , m . node . ID . String ( ) ) . Set ( float64 ( time . Now ( ) . Unix ( ) ) )
2024-05-24 09:15:34 +01:00
}
2024-04-21 18:28:17 +02:00
mapResponseSent . WithLabelValues ( "ok" , "keepalive" ) . Inc ( )
2025-07-05 23:30:47 +02:00
m . resetKeepAlive ( )
2021-08-13 10:33:50 +01:00
}
2022-06-20 12:30:51 +02:00
}
2021-08-13 10:33:50 +01:00
}
2021-08-18 23:24:22 +01:00
2025-07-28 11:15:53 +02:00
// writeMap writes the map response to the client.
// It handles compression if requested and any headers that need to be set.
// It also handles flushing the response if the ResponseWriter
// implements http.Flusher.
func ( m * mapSession ) writeMap ( msg * tailcfg . MapResponse ) error {
jsonBody , err := json . Marshal ( msg )
2025-07-08 09:49:05 +02:00
if err != nil {
2025-07-28 11:15:53 +02:00
return fmt . Errorf ( "marshalling map response: %w" , err )
2025-07-08 09:49:05 +02:00
}
2025-07-28 11:15:53 +02:00
if m . req . Compress == util . ZstdCompression {
jsonBody = zstdframe . AppendEncode ( nil , jsonBody , zstdframe . FastestCompression )
2024-02-23 10:59:24 +01:00
}
2025-07-28 11:15:53 +02:00
data := make ( [ ] byte , reservedResponseHeaderSize )
binary . LittleEndian . PutUint32 ( data , uint32 ( len ( jsonBody ) ) )
data = append ( data , jsonBody ... )
2025-07-08 09:49:05 +02:00
2025-07-28 11:15:53 +02:00
startWrite := time . Now ( )
2024-10-17 18:45:33 +03:00
2025-07-28 11:15:53 +02:00
_ , err = m . w . Write ( data )
2025-05-27 16:27:16 +02:00
if err != nil {
2025-07-28 11:15:53 +02:00
return err
2024-02-23 10:59:24 +01:00
}
2025-07-28 11:15:53 +02:00
if m . isStreaming ( ) {
if f , ok := m . w . ( http . Flusher ) ; ok {
f . Flush ( )
} else {
m . errf ( nil , "ResponseWriter does not implement http.Flusher, cannot flush" )
}
2025-05-27 16:27:16 +02:00
}
2025-09-05 16:32:46 +02:00
log . Trace ( ) .
Caller ( ) .
Str ( "node.name" , m . node . Hostname ) .
Uint64 ( "node.id" , m . node . ID . Uint64 ( ) ) .
Str ( "chan" , fmt . Sprintf ( "%p" , m . ch ) ) .
TimeDiff ( "timeSpent" , time . Now ( ) , startWrite ) .
Str ( "machine.key" , m . node . MachineKey . String ( ) ) .
Bool ( "keepalive" , msg . KeepAlive ) .
Msgf ( "finished writing mapresp to node chan(%p)" , m . ch )
2024-02-23 10:59:24 +01:00
2025-07-28 11:15:53 +02:00
return nil
2024-02-23 10:59:24 +01:00
}
2025-07-28 11:15:53 +02:00
var keepAlive = tailcfg . MapResponse {
KeepAlive : true ,
2023-09-11 06:18:31 -05:00
}
2023-12-09 18:09:24 +01:00
2025-07-28 11:15:53 +02:00
func logTracePeerChange ( hostname string , hostinfoChange bool , peerChange * tailcfg . PeerChange ) {
2025-09-05 16:32:46 +02:00
trace := log . Trace ( ) . Caller ( ) . Uint64 ( "node.id" , uint64 ( peerChange . NodeID ) ) . Str ( "hostname" , hostname )
2023-12-09 18:09:24 +01:00
2025-07-28 11:15:53 +02:00
if peerChange . Key != nil {
2025-09-05 16:32:46 +02:00
trace = trace . Str ( "node.key" , peerChange . Key . ShortString ( ) )
2023-12-09 18:09:24 +01:00
}
2025-07-28 11:15:53 +02:00
if peerChange . DiscoKey != nil {
2025-09-05 16:32:46 +02:00
trace = trace . Str ( "disco.key" , peerChange . DiscoKey . ShortString ( ) )
2023-12-09 18:09:24 +01:00
}
2025-07-28 11:15:53 +02:00
if peerChange . Online != nil {
trace = trace . Bool ( "online" , * peerChange . Online )
2023-12-09 18:09:24 +01:00
}
2025-07-28 11:15:53 +02:00
if peerChange . Endpoints != nil {
eps := make ( [ ] string , len ( peerChange . Endpoints ) )
for idx , ep := range peerChange . Endpoints {
2023-12-09 18:09:24 +01:00
eps [ idx ] = ep . String ( )
}
trace = trace . Strs ( "endpoints" , eps )
}
if hostinfoChange {
trace = trace . Bool ( "hostinfo_changed" , hostinfoChange )
}
2025-07-28 11:15:53 +02:00
if peerChange . DERPRegion != 0 {
trace = trace . Int ( "derp_region" , peerChange . DERPRegion )
2023-12-09 18:09:24 +01:00
}
2025-07-28 11:15:53 +02:00
trace . Time ( "last_seen" , * peerChange . LastSeen ) . Msg ( "PeerChange received" )
2024-02-23 10:59:24 +01:00
}
func logPollFunc (
mapRequest tailcfg . MapRequest ,
node * types . Node ,
) ( func ( string , ... any ) , func ( string , ... any ) , func ( string , ... any ) , func ( error , string , ... any ) ) {
return func ( msg string , a ... any ) {
log . Warn ( ) .
Caller ( ) .
Bool ( "omitPeers" , mapRequest . OmitPeers ) .
Bool ( "stream" , mapRequest . Stream ) .
Uint64 ( "node.id" , node . ID . Uint64 ( ) ) .
2025-09-05 16:32:46 +02:00
Str ( "node.name" , node . Hostname ) .
2024-02-23 10:59:24 +01:00
Msgf ( msg , a ... )
} ,
func ( msg string , a ... any ) {
log . Info ( ) .
Caller ( ) .
Bool ( "omitPeers" , mapRequest . OmitPeers ) .
Bool ( "stream" , mapRequest . Stream ) .
Uint64 ( "node.id" , node . ID . Uint64 ( ) ) .
2025-09-05 16:32:46 +02:00
Str ( "node.name" , node . Hostname ) .
2024-02-23 10:59:24 +01:00
Msgf ( msg , a ... )
} ,
func ( msg string , a ... any ) {
log . Trace ( ) .
Caller ( ) .
Bool ( "omitPeers" , mapRequest . OmitPeers ) .
Bool ( "stream" , mapRequest . Stream ) .
Uint64 ( "node.id" , node . ID . Uint64 ( ) ) .
2025-09-05 16:32:46 +02:00
Str ( "node.name" , node . Hostname ) .
2024-02-23 10:59:24 +01:00
Msgf ( msg , a ... )
} ,
func ( err error , msg string , a ... any ) {
log . Error ( ) .
Caller ( ) .
Bool ( "omitPeers" , mapRequest . OmitPeers ) .
Bool ( "stream" , mapRequest . Stream ) .
Uint64 ( "node.id" , node . ID . Uint64 ( ) ) .
2025-09-05 16:32:46 +02:00
Str ( "node.name" , node . Hostname ) .
2024-02-23 10:59:24 +01:00
Err ( err ) .
Msgf ( msg , a ... )
}
}