wgengine/magicsock: generate relay server set from tailnet policy (#16331)

Updates tailscale/corp#27502

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-06-20 15:00:28 -07:00 committed by GitHub
parent 12e92b1b08
commit d3bb34c628
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 386 additions and 24 deletions

View File

@ -14,6 +14,7 @@ import (
"expvar"
"fmt"
"io"
"math"
"net"
"net/netip"
"reflect"
@ -349,9 +350,11 @@ type Conn struct {
netInfoLast *tailcfg.NetInfo
derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled
peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate update
self tailcfg.NodeView // from last onNodeViewsUpdate
peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate, sorted by Node.ID; Note: [netmap.NodeMutation]'s rx'd in onNodeMutationsUpdate are never applied
filt *filter.Filter // from last onFilterUpdate
relayClientEnabled bool // whether we can allocate UDP relay endpoints on UDP relay servers
lastFlags debugFlags // at time of last onNodeViewsUpdate
firstAddrForTest netip.Addr // from last onNodeViewsUpdate update; for tests only
privateKey key.NodePrivate // WireGuard private key for this node
everHadKey bool // whether we ever had a non-zero private key
myDerp int // nearest DERP region ID; 0 means none/unknown
@ -516,7 +519,7 @@ func (o *Options) derpActiveFunc() func() {
// this type out of magicsock.
type NodeViewsUpdate struct {
SelfNode tailcfg.NodeView
Peers []tailcfg.NodeView
Peers []tailcfg.NodeView // sorted by Node.ID
}
// NodeMutationsUpdate represents an update event of one or more
@ -2555,38 +2558,160 @@ func (c *Conn) SetProbeUDPLifetime(v bool) {
func capVerIsRelayCapable(version tailcfg.CapabilityVersion) bool {
// TODO(jwhited): implement once capVer is bumped
return false
return version == math.MinInt32
}
func capVerIsRelayServerCapable(version tailcfg.CapabilityVersion) bool {
// TODO(jwhited): implement once capVer is bumped
return version == math.MinInt32
}
// onFilterUpdate is called when a [FilterUpdate] is received over the
// [eventbus.Bus].
func (c *Conn) onFilterUpdate(f FilterUpdate) {
// TODO(jwhited): implement
c.mu.Lock()
c.filt = f.Filter
self := c.self
peers := c.peers
relayClientEnabled := c.relayClientEnabled
c.mu.Unlock() // release c.mu before potentially calling c.updateRelayServersSet which is O(m * n)
if !relayClientEnabled {
// Early return if we cannot operate as a relay client.
return
}
// The filter has changed, and we are operating as a relay server client.
// Re-evaluate it in order to produce an updated relay server set.
c.updateRelayServersSet(f.Filter, self, peers)
}
// updateRelayServersSet iterates all peers, evaluating filt for each one in
// order to determine which peers are relay server candidates. filt, self, and
// peers are passed as args (vs c.mu-guarded fields) to enable callers to
// release c.mu before calling as this is O(m * n) (we iterate all cap rules 'm'
// in filt for every peer 'n').
// TODO: Optimize this so that it's not O(m * n). This might involve:
// 1. Changes to [filter.Filter], e.g. adding a CapsWithValues() to check for
// a given capability instead of building and returning a map of all of
// them.
// 2. Moving this work upstream into [nodeBackend] or similar, and publishing
// the computed result over the eventbus instead.
func (c *Conn) updateRelayServersSet(filt *filter.Filter, self tailcfg.NodeView, peers views.Slice[tailcfg.NodeView]) {
relayServers := make(set.Set[netip.AddrPort])
for _, peer := range peers.All() {
peerAPI := peerAPIIfCandidateRelayServer(filt, self, peer)
if peerAPI.IsValid() {
relayServers.Add(peerAPI)
}
}
c.relayManager.handleRelayServersSet(relayServers)
}
// peerAPIIfCandidateRelayServer returns the peer API address of peer if it
// is considered to be a candidate relay server upon evaluation against filt and
// self, otherwise it returns a zero value.
func peerAPIIfCandidateRelayServer(filt *filter.Filter, self, peer tailcfg.NodeView) netip.AddrPort {
if filt == nil ||
!self.Valid() ||
!peer.Valid() ||
!capVerIsRelayServerCapable(peer.Cap()) ||
!peer.Hostinfo().Valid() {
return netip.AddrPort{}
}
for _, peerPrefix := range peer.Addresses().All() {
if !peerPrefix.IsSingleIP() {
continue
}
peerAddr := peerPrefix.Addr()
for _, selfPrefix := range self.Addresses().All() {
if !selfPrefix.IsSingleIP() {
continue
}
selfAddr := selfPrefix.Addr()
if selfAddr.BitLen() == peerAddr.BitLen() { // same address family
if filt.CapsWithValues(peerAddr, selfAddr).HasCapability(tailcfg.PeerCapabilityRelayTarget) {
for _, s := range peer.Hostinfo().Services().All() {
if peerAddr.Is4() && s.Proto == tailcfg.PeerAPI4 ||
peerAddr.Is6() && s.Proto == tailcfg.PeerAPI6 {
return netip.AddrPortFrom(peerAddr, s.Port)
}
}
return netip.AddrPort{} // no peerAPI
} else {
// [nodeBackend.peerCapsLocked] only returns/considers the
// [tailcfg.PeerCapMap] between the passed src and the
// _first_ host (/32 or /128) address for self. We are
// consistent with that behavior here. If self and peer
// host addresses are of the same address family they either
// have the capability or not. We do not check against
// additional host addresses of the same address family.
return netip.AddrPort{}
}
}
}
}
return netip.AddrPort{}
}
// onNodeViewsUpdate is called when a [NodeViewsUpdate] is received over the
// [eventbus.Bus].
func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) {
peersChanged := c.updateNodes(update)
relayClientEnabled := update.SelfNode.Valid() &&
update.SelfNode.HasCap(tailcfg.NodeAttrRelayClient) &&
envknob.UseWIPCode()
c.mu.Lock()
relayClientChanged := c.relayClientEnabled != relayClientEnabled
c.relayClientEnabled = relayClientEnabled
filt := c.filt
self := c.self
peers := c.peers
c.mu.Unlock() // release c.mu before potentially calling c.updateRelayServersSet which is O(m * n)
if peersChanged || relayClientChanged {
if !relayClientEnabled {
c.relayManager.handleRelayServersSet(nil)
} else {
c.updateRelayServersSet(filt, self, peers)
}
}
}
// updateNodes updates [Conn] to reflect the [tailcfg.NodeView]'s contained
// in update. It returns true if update.Peers was unequal to c.peers, otherwise
// false.
func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return
return false
}
priorPeers := c.peers
metricNumPeers.Set(int64(len(update.Peers)))
// Update c.netMap regardless, before the following early return.
// Update c.self & c.peers regardless, before the following early return.
c.self = update.SelfNode
curPeers := views.SliceOf(update.Peers)
c.peers = curPeers
// [debugFlags] are mutable in [Conn.SetSilentDisco] &
// [Conn.SetProbeUDPLifetime]. These setters are passed [controlknobs.Knobs]
// values by [ipnlocal.LocalBackend] around netmap reception.
// [controlknobs.Knobs] are simply self [tailcfg.NodeCapability]'s. They are
// useful as a global view of notable feature toggles, but the magicsock
// setters are completely unnecessary as we have the same values right here
// (update.SelfNode.Capabilities) at a time they are considered most
// up-to-date.
// TODO: mutate [debugFlags] here instead of in various [Conn] setters.
flags := c.debugFlagsLocked()
if update.SelfNode.Valid() && update.SelfNode.Addresses().Len() > 0 {
c.firstAddrForTest = update.SelfNode.Addresses().At(0).Addr()
} else {
c.firstAddrForTest = netip.Addr{}
}
if nodesEqual(priorPeers, curPeers) && c.lastFlags == flags {
peersChanged = !nodesEqual(priorPeers, curPeers)
if !peersChanged && c.lastFlags == flags {
// The rest of this function is all adjusting state for peers that have
// changed. But if the set of peers is equal and the debug flags (for
// silent disco and probe UDP lifetime) haven't changed, there is no
@ -2728,6 +2853,8 @@ func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) {
delete(c.discoInfo, dk)
}
}
return peersChanged
}
func devPanicf(format string, a ...any) {
@ -3245,7 +3372,7 @@ func simpleDur(d time.Duration) time.Duration {
}
// onNodeMutationsUpdate is called when a [NodeMutationsUpdate] is received over
// the [eventbus.Bus].
// the [eventbus.Bus]. Note: It does not apply these mutations to c.peers.
func (c *Conn) onNodeMutationsUpdate(update NodeMutationsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -19,6 +19,7 @@ import (
"net/http/httptest"
"net/netip"
"os"
"reflect"
"runtime"
"strconv"
"strings"
@ -71,6 +72,7 @@ import (
"tailscale.com/util/slicesx"
"tailscale.com/util/usermetric"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/filter/filtertype"
"tailscale.com/wgengine/wgcfg"
"tailscale.com/wgengine/wgcfg/nmcfg"
"tailscale.com/wgengine/wglog"
@ -275,7 +277,10 @@ func (s *magicStack) Status() *ipnstate.Status {
func (s *magicStack) IP() netip.Addr {
for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
s.conn.mu.Lock()
addr := s.conn.firstAddrForTest
var addr netip.Addr
if s.conn.self.Valid() && s.conn.self.Addresses().Len() > 0 {
addr = s.conn.self.Addresses().At(0).Addr()
}
s.conn.mu.Unlock()
if addr.IsValid() {
return addr
@ -3378,3 +3383,198 @@ func Test_virtualNetworkID(t *testing.T) {
})
}
}
func Test_peerAPIIfCandidateRelayServer(t *testing.T) {
selfOnlyIPv4 := &tailcfg.Node{
Cap: math.MinInt32,
Addresses: []netip.Prefix{
netip.MustParsePrefix("1.1.1.1/32"),
},
}
selfOnlyIPv6 := selfOnlyIPv4.Clone()
selfOnlyIPv6.Addresses[0] = netip.MustParsePrefix("::1/128")
peerHostinfo := &tailcfg.Hostinfo{
Services: []tailcfg.Service{
{
Proto: tailcfg.PeerAPI4,
Port: 4,
},
{
Proto: tailcfg.PeerAPI6,
Port: 6,
},
},
}
peerOnlyIPv4 := &tailcfg.Node{
Cap: math.MinInt32,
CapMap: map[tailcfg.NodeCapability][]tailcfg.RawMessage{
tailcfg.NodeAttrRelayServer: nil,
},
Addresses: []netip.Prefix{
netip.MustParsePrefix("2.2.2.2/32"),
},
Hostinfo: peerHostinfo.View(),
}
peerOnlyIPv6 := peerOnlyIPv4.Clone()
peerOnlyIPv6.Addresses[0] = netip.MustParsePrefix("::2/128")
peerOnlyIPv4ZeroCapVer := peerOnlyIPv4.Clone()
peerOnlyIPv4ZeroCapVer.Cap = 0
peerOnlyIPv4NilHostinfo := peerOnlyIPv4.Clone()
peerOnlyIPv4NilHostinfo.Hostinfo = tailcfg.HostinfoView{}
tests := []struct {
name string
filt *filter.Filter
self tailcfg.NodeView
peer tailcfg.NodeView
want netip.AddrPort
}{
{
name: "match v4",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("1.1.1.1/32"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv4.View(),
peer: peerOnlyIPv4.View(),
want: netip.MustParseAddrPort("2.2.2.2:4"),
},
{
name: "match v6",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("::1/128"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv6.View(),
peer: peerOnlyIPv6.View(),
want: netip.MustParseAddrPort("[::2]:6"),
},
{
name: "no match dst",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("::3/128"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv6.View(),
peer: peerOnlyIPv6.View(),
},
{
name: "no match peer cap",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("::1/128"),
Cap: tailcfg.PeerCapabilityIngress,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv6.View(),
peer: peerOnlyIPv6.View(),
},
{
name: "cap ver not relay capable",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("1.1.1.1/32"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: peerOnlyIPv4.View(),
peer: peerOnlyIPv4ZeroCapVer.View(),
},
{
name: "nil filt",
filt: nil,
self: selfOnlyIPv4.View(),
peer: peerOnlyIPv4.View(),
},
{
name: "nil self",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("1.1.1.1/32"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: tailcfg.NodeView{},
peer: peerOnlyIPv4.View(),
},
{
name: "nil peer",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("1.1.1.1/32"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv4.View(),
peer: tailcfg.NodeView{},
},
{
name: "nil peer hostinfo",
filt: filter.New([]filtertype.Match{
{
Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")},
Caps: []filtertype.CapMatch{
{
Dst: netip.MustParsePrefix("1.1.1.1/32"),
Cap: tailcfg.PeerCapabilityRelayTarget,
},
},
},
}, nil, nil, nil, nil, nil),
self: selfOnlyIPv4.View(),
peer: peerOnlyIPv4NilHostinfo.View(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := peerAPIIfCandidateRelayServer(tt.filt, tt.self, tt.peer); !reflect.DeepEqual(got, tt.want) {
t.Errorf("peerAPIIfCandidateRelayServer() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -51,6 +51,7 @@ type relayManager struct {
cancelWorkCh chan *endpoint
newServerEndpointCh chan newRelayServerEndpointEvent
rxHandshakeDiscoMsgCh chan relayHandshakeDiscoMsgEvent
serversCh chan set.Set[netip.AddrPort]
discoInfoMu sync.Mutex // guards the following field
discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo
@ -174,8 +175,30 @@ func (r *relayManager) runLoop() {
if !r.hasActiveWorkRunLoop() {
return
}
case serversUpdate := <-r.serversCh:
r.handleServersUpdateRunLoop(serversUpdate)
if !r.hasActiveWorkRunLoop() {
return
}
}
}
}
func (r *relayManager) handleServersUpdateRunLoop(update set.Set[netip.AddrPort]) {
for k, v := range r.serversByAddrPort {
if !update.Contains(k) {
delete(r.serversByAddrPort, k)
delete(r.serversByDisco, v)
}
}
for _, v := range update.Slice() {
_, ok := r.serversByAddrPort[v]
if ok {
// don't zero known disco keys
continue
}
r.serversByAddrPort[v] = key.DiscoPublic{}
}
}
type relayHandshakeDiscoMsgEvent struct {
@ -215,6 +238,7 @@ func (r *relayManager) init() {
r.cancelWorkCh = make(chan *endpoint)
r.newServerEndpointCh = make(chan newRelayServerEndpointEvent)
r.rxHandshakeDiscoMsgCh = make(chan relayHandshakeDiscoMsgEvent)
r.serversCh = make(chan set.Set[netip.AddrPort])
r.runLoopStoppedCh = make(chan struct{}, 1)
r.runLoopStoppedCh <- struct{}{}
})
@ -299,6 +323,11 @@ func (r *relayManager) handleGeneveEncapDiscoMsgNotBestAddr(dm disco.Message, di
relayManagerInputEvent(r, nil, &r.rxHandshakeDiscoMsgCh, relayHandshakeDiscoMsgEvent{msg: dm, disco: di.discoKey, from: src.ap, vni: src.vni.get(), at: time.Now()})
}
// handleRelayServersSet handles an update of the complete relay server set.
func (r *relayManager) handleRelayServersSet(servers set.Set[netip.AddrPort]) {
relayManagerInputEvent(r, nil, &r.serversCh, servers)
}
// relayManagerInputEvent initializes [relayManager] if necessary, starts
// relayManager.runLoop() if it is not running, and writes 'event' on 'eventCh'.
//

View File

@ -4,10 +4,12 @@
package magicsock
import (
"net/netip"
"testing"
"tailscale.com/disco"
"tailscale.com/types/key"
"tailscale.com/util/set"
)
func TestRelayManagerInitAndIdle(t *testing.T) {
@ -26,4 +28,8 @@ func TestRelayManagerInitAndIdle(t *testing.T) {
rm = relayManager{}
rm.handleGeneveEncapDiscoMsgNotBestAddr(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, epAddr{})
<-rm.runLoopStoppedCh
rm = relayManager{}
rm.handleRelayServersSet(make(set.Set[netip.AddrPort]))
<-rm.runLoopStoppedCh
}