various relay fixes

do not merge

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-06-21 20:56:31 -07:00
parent e52eed15c8
commit 0bd8c577b2
No known key found for this signature in database
GPG Key ID: 33DF352F65991EB8
4 changed files with 49 additions and 7 deletions

View File

@ -12,6 +12,7 @@ import (
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt" "fmt"
"log"
"net" "net"
"net/netip" "net/netip"
"slices" "slices"
@ -210,6 +211,8 @@ func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []by
return return
} }
log.Printf("XXX: handleSealedDiscoControlMsg: from=%v discoMsg=%T", from, discoMsg)
e.handleDiscoControlMsg(from, senderIndex, discoMsg, uw, serverDisco) e.handleDiscoControlMsg(from, senderIndex, discoMsg, uw, serverDisco)
} }

View File

@ -115,6 +115,7 @@ func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, vni virtualNetwo
var gh packet.GeneveHeader var gh packet.GeneveHeader
if vniIsSet { if vniIsSet {
gh.VNI = vni.get() gh.VNI = vni.get()
gh.Protocol = packet.GeneveProtocolWireGuard
} }
for i, buff := range buffs { for i, buff := range buffs {
if vniIsSet { if vniIsSet {
@ -203,6 +204,7 @@ retry:
var gh packet.GeneveHeader var gh packet.GeneveHeader
if vniIsSet { if vniIsSet {
gh.VNI = addr.vni.get() gh.VNI = addr.vni.get()
gh.Protocol = packet.GeneveProtocolWireGuard
offset -= packet.GeneveFixedHeaderLength offset -= packet.GeneveFixedHeaderLength
} }
for i := range buffs { for i := range buffs {

View File

@ -86,6 +86,7 @@ func (c *RebindingUDPConn) WriteBatchTo(buffs [][]byte, addr epAddr, offset int)
if vniIsSet { if vniIsSet {
gh = packet.GeneveHeader{ gh = packet.GeneveHeader{
VNI: addr.vni.get(), VNI: addr.vni.get(),
Protocol: packet.GeneveProtocolWireGuard,
} }
} }
for _, buf := range buffs { for _, buf := range buffs {

View File

@ -10,6 +10,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/netip" "net/netip"
"strconv" "strconv"
@ -258,6 +259,7 @@ type relayHandshakeDiscoInfo struct {
// the server disco key associated with 'work'. Callers must also call // the server disco key associated with 'work'. Callers must also call
// derefDiscoInfoFor() when 'work' is complete. // derefDiscoInfoFor() when 'work' is complete.
func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) { func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) {
log.Printf("XXX: ensureDiscoInfoFor server=%v", work.se.ServerDisco.ShortString())
r.discoInfoMu.Lock() r.discoInfoMu.Lock()
defer r.discoInfoMu.Unlock() defer r.discoInfoMu.Unlock()
di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] di, ok := r.discoInfoByServerDisco[work.se.ServerDisco]
@ -279,6 +281,7 @@ func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) {
// derefDiscoInfoFor decrements the reference count of the [*discoInfo] // derefDiscoInfoFor decrements the reference count of the [*discoInfo]
// associated with 'work'. // associated with 'work'.
func (r *relayManager) derefDiscoInfoFor(work *relayHandshakeWork) { func (r *relayManager) derefDiscoInfoFor(work *relayHandshakeWork) {
log.Printf("XXX: derefDiscoInfoFor server=%v", work.se.ServerDisco.ShortString())
r.discoInfoMu.Lock() r.discoInfoMu.Lock()
defer r.discoInfoMu.Unlock() defer r.discoInfoMu.Unlock()
di, ok := r.discoInfoByServerDisco[work.se.ServerDisco] di, ok := r.discoInfoByServerDisco[work.se.ServerDisco]
@ -429,6 +432,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
r.handshakeWorkAwaitingPong[work] = apv r.handshakeWorkAwaitingPong[work] = apv
r.addrPortVNIToHandshakeWork[apv] = work r.addrPortVNIToHandshakeWork[apv] = work
case *disco.Ping: case *disco.Ping:
event.conn.logf("XXX: handleRxHandshakeDiscoMsgRunLoop: got ping from=%v", event.from)
// Always TX a pong. We might not have any associated work if ping // Always TX a pong. We might not have any associated work if ping
// reception raced with our call to [endpoint.relayEndpointReady()], so // reception raced with our call to [endpoint.relayEndpointReady()], so
// err on the side of enabling the remote side to use this path. // err on the side of enabling the remote side to use this path.
@ -451,6 +455,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
return return
} }
case *disco.Pong: case *disco.Pong:
event.conn.logf("XXX: handleRxHandshakeDiscoMsgRunLoop: got pong from=%v", event.from)
work, ok = r.addrPortVNIToHandshakeWork[apv] work, ok = r.addrPortVNIToHandshakeWork[apv]
if !ok { if !ok {
// No outstanding work tied to this [addrPortVNI], discard. // No outstanding work tied to this [addrPortVNI], discard.
@ -584,12 +589,13 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
byServerDisco[newServerEndpoint.se.ServerDisco] = work byServerDisco[newServerEndpoint.se.ServerDisco] = work
r.handshakeWorkByServerDiscoVNI[sdv] = work r.handshakeWorkByServerDiscoVNI[sdv] = work
go r.handshakeServerEndpoint(work) go r.handshakeServerEndpoint(work, newServerEndpoint.server.IsValid())
} }
func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) { func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, sendCallMeMaybeVia bool) {
done := relayEndpointHandshakeWorkDoneEvent{work: work} done := relayEndpointHandshakeWorkDoneEvent{work: work}
r.ensureDiscoInfoFor(work) r.ensureDiscoInfoFor(work)
work.ep.c.logf("XXX: handshakeServerEndpoint starting")
defer func() { defer func() {
r.derefDiscoInfoFor(work) r.derefDiscoInfoFor(work)
@ -612,6 +618,30 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
return return
} }
if sendCallMeMaybeVia {
work.ep.c.logf("XXX: handshakeServerEndpoint, sending CallMeMaybeVia")
// Send CallMeMaybeVia to peer if we allocated this endpoint.
callMeMaybeVia := &disco.CallMeMaybeVia{
ServerDisco: work.se.ServerDisco,
LamportID: work.se.LamportID,
VNI: work.se.VNI,
BindLifetime: work.se.BindLifetime.Duration,
SteadyStateLifetime: work.se.SteadyStateLifetime.Duration,
AddrPorts: work.se.AddrPorts,
}
go func() {
work.ep.mu.Lock()
derpAddr := work.ep.derpAddr
work.ep.mu.Unlock()
epDisco := work.ep.disco.Load()
if epDisco == nil {
return
}
sent, err := work.ep.c.sendDiscoMessage(epAddr{ap: derpAddr}, work.ep.publicKey, epDisco.key, callMeMaybeVia, discoVerboseLog)
work.ep.c.logf("XXX: handshakeServerEndpoint, CallMeMaybeVia sent=%v err=%v", sent, err)
}()
}
// Limit goroutine lifetime to a reasonable duration. This is intentionally // Limit goroutine lifetime to a reasonable duration. This is intentionally
// detached and independent of 'BindLifetime' to prevent relay server // detached and independent of 'BindLifetime' to prevent relay server
// (mis)configuration from negatively impacting client resource usage. // (mis)configuration from negatively impacting client resource usage.
@ -645,7 +675,8 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
go func() { go func() {
if withAnswer != nil { if withAnswer != nil {
answer := &disco.BindUDPRelayEndpointAnswer{Answer: *withAnswer} answer := &disco.BindUDPRelayEndpointAnswer{Answer: *withAnswer}
work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog) sent, err := work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog)
work.ep.c.logf("XXX: handshakeServerEndpoint tx answer sent=%v err=%v", sent, err)
} }
work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog) work.ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog)
}() }()
@ -656,10 +687,12 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
for { for {
select { select {
case <-work.ctx.Done(): case <-work.ctx.Done():
work.ep.c.logf("XXX: handshakeServerEndpoint, ctx Done")
return return
case msgEvent := <-work.rxDiscoMsgCh: case msgEvent := <-work.rxDiscoMsgCh:
switch msg := msgEvent.msg.(type) { switch msg := msgEvent.msg.(type) {
case *disco.BindUDPRelayEndpointChallenge: case *disco.BindUDPRelayEndpointChallenge:
work.ep.c.logf("XXX: handshakeServerEndpoint, got challenge")
if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent { if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent {
continue continue
} }
@ -669,6 +702,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent { if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent {
continue continue
} }
work.ep.c.logf("XXX: handshakeServerEndpoint got ping")
// An inbound ping from the remote peer indicates we completed a // An inbound ping from the remote peer indicates we completed a
// handshake with the relay server (our answer msg was // handshake with the relay server (our answer msg was
// received). Chances are our ping was dropped before the remote // received). Chances are our ping was dropped before the remote
@ -681,6 +715,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
// in handleRxHandshakeDiscoMsgRunLoop(). // in handleRxHandshakeDiscoMsgRunLoop().
txPing(msgEvent.from, nil) txPing(msgEvent.from, nil)
case *disco.Pong: case *disco.Pong:
work.ep.c.logf("XXX: handshakeServerEndpoint got pong")
at, ok := sentPingAt[msg.TxID] at, ok := sentPingAt[msg.TxID]
if !ok { if !ok {
continue continue
@ -689,12 +724,12 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
// round-trip latency and return. // round-trip latency and return.
done.pongReceivedFrom = msgEvent.from done.pongReceivedFrom = msgEvent.from
done.latency = time.Since(at) done.latency = time.Since(at)
work.ep.c.logf("XXX: handshakeServerEndpoint functional endpoint: %v", done)
return return
default: default:
// unexpected message type, silently discard // unexpected message type, silently discard
continue continue
} }
return
case <-timer.C: case <-timer.C:
// The handshake timed out. // The handshake timed out.
return return
@ -782,6 +817,7 @@ func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGr
relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{ relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{
ep: ep, ep: ep,
se: se, se: se,
server: server,
}) })
return return
} }