From d3bb34c628b01953c1f064d75d01c0a41e4d41ab Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Fri, 20 Jun 2025 15:00:28 -0700 Subject: [PATCH] wgengine/magicsock: generate relay server set from tailnet policy (#16331) Updates tailscale/corp#27502 Signed-off-by: Jordan Whited --- wgengine/magicsock/magicsock.go | 173 +++++++++++++++++--- wgengine/magicsock/magicsock_test.go | 202 +++++++++++++++++++++++- wgengine/magicsock/relaymanager.go | 29 ++++ wgengine/magicsock/relaymanager_test.go | 6 + 4 files changed, 386 insertions(+), 24 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index bfc7afba9..0679a4ebd 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -14,6 +14,7 @@ import ( "expvar" "fmt" "io" + "math" "net" "net/netip" "reflect" @@ -348,17 +349,19 @@ type Conn struct { // magicsock could do with any complexity reduction it can get. netInfoLast *tailcfg.NetInfo - derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled - peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate update - lastFlags debugFlags // at time of last onNodeViewsUpdate - firstAddrForTest netip.Addr // from last onNodeViewsUpdate update; for tests only - privateKey key.NodePrivate // WireGuard private key for this node - everHadKey bool // whether we ever had a non-zero private key - myDerp int // nearest DERP region ID; 0 means none/unknown - homeless bool // if true, don't try to find & stay conneted to a DERP home (myDerp will stay 0) - derpStarted chan struct{} // closed on first connection to DERP; for tests & cleaner Close - activeDerp map[int]activeDerp // DERP regionID -> connection to a node in that region - prevDerp map[int]*syncs.WaitGroupChan + derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled + self tailcfg.NodeView // from last onNodeViewsUpdate + peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate, sorted by Node.ID; Note: [netmap.NodeMutation]'s rx'd in onNodeMutationsUpdate are never applied + filt *filter.Filter // from last onFilterUpdate + relayClientEnabled bool // whether we can allocate UDP relay endpoints on UDP relay servers + lastFlags debugFlags // at time of last onNodeViewsUpdate + privateKey key.NodePrivate // WireGuard private key for this node + everHadKey bool // whether we ever had a non-zero private key + myDerp int // nearest DERP region ID; 0 means none/unknown + homeless bool // if true, don't try to find & stay conneted to a DERP home (myDerp will stay 0) + derpStarted chan struct{} // closed on first connection to DERP; for tests & cleaner Close + activeDerp map[int]activeDerp // DERP regionID -> connection to a node in that region + prevDerp map[int]*syncs.WaitGroupChan // derpRoute contains optional alternate routes to use as an // optimization instead of contacting a peer via their home @@ -516,7 +519,7 @@ func (o *Options) derpActiveFunc() func() { // this type out of magicsock. type NodeViewsUpdate struct { SelfNode tailcfg.NodeView - Peers []tailcfg.NodeView + Peers []tailcfg.NodeView // sorted by Node.ID } // NodeMutationsUpdate represents an update event of one or more @@ -2555,38 +2558,160 @@ func (c *Conn) SetProbeUDPLifetime(v bool) { func capVerIsRelayCapable(version tailcfg.CapabilityVersion) bool { // TODO(jwhited): implement once capVer is bumped - return false + return version == math.MinInt32 } +func capVerIsRelayServerCapable(version tailcfg.CapabilityVersion) bool { + // TODO(jwhited): implement once capVer is bumped + return version == math.MinInt32 +} + +// onFilterUpdate is called when a [FilterUpdate] is received over the +// [eventbus.Bus]. func (c *Conn) onFilterUpdate(f FilterUpdate) { - // TODO(jwhited): implement + c.mu.Lock() + c.filt = f.Filter + self := c.self + peers := c.peers + relayClientEnabled := c.relayClientEnabled + c.mu.Unlock() // release c.mu before potentially calling c.updateRelayServersSet which is O(m * n) + + if !relayClientEnabled { + // Early return if we cannot operate as a relay client. + return + } + + // The filter has changed, and we are operating as a relay server client. + // Re-evaluate it in order to produce an updated relay server set. + c.updateRelayServersSet(f.Filter, self, peers) +} + +// updateRelayServersSet iterates all peers, evaluating filt for each one in +// order to determine which peers are relay server candidates. filt, self, and +// peers are passed as args (vs c.mu-guarded fields) to enable callers to +// release c.mu before calling as this is O(m * n) (we iterate all cap rules 'm' +// in filt for every peer 'n'). +// TODO: Optimize this so that it's not O(m * n). This might involve: +// 1. Changes to [filter.Filter], e.g. adding a CapsWithValues() to check for +// a given capability instead of building and returning a map of all of +// them. +// 2. Moving this work upstream into [nodeBackend] or similar, and publishing +// the computed result over the eventbus instead. +func (c *Conn) updateRelayServersSet(filt *filter.Filter, self tailcfg.NodeView, peers views.Slice[tailcfg.NodeView]) { + relayServers := make(set.Set[netip.AddrPort]) + for _, peer := range peers.All() { + peerAPI := peerAPIIfCandidateRelayServer(filt, self, peer) + if peerAPI.IsValid() { + relayServers.Add(peerAPI) + } + } + c.relayManager.handleRelayServersSet(relayServers) +} + +// peerAPIIfCandidateRelayServer returns the peer API address of peer if it +// is considered to be a candidate relay server upon evaluation against filt and +// self, otherwise it returns a zero value. +func peerAPIIfCandidateRelayServer(filt *filter.Filter, self, peer tailcfg.NodeView) netip.AddrPort { + if filt == nil || + !self.Valid() || + !peer.Valid() || + !capVerIsRelayServerCapable(peer.Cap()) || + !peer.Hostinfo().Valid() { + return netip.AddrPort{} + } + for _, peerPrefix := range peer.Addresses().All() { + if !peerPrefix.IsSingleIP() { + continue + } + peerAddr := peerPrefix.Addr() + for _, selfPrefix := range self.Addresses().All() { + if !selfPrefix.IsSingleIP() { + continue + } + selfAddr := selfPrefix.Addr() + if selfAddr.BitLen() == peerAddr.BitLen() { // same address family + if filt.CapsWithValues(peerAddr, selfAddr).HasCapability(tailcfg.PeerCapabilityRelayTarget) { + for _, s := range peer.Hostinfo().Services().All() { + if peerAddr.Is4() && s.Proto == tailcfg.PeerAPI4 || + peerAddr.Is6() && s.Proto == tailcfg.PeerAPI6 { + return netip.AddrPortFrom(peerAddr, s.Port) + } + } + return netip.AddrPort{} // no peerAPI + } else { + // [nodeBackend.peerCapsLocked] only returns/considers the + // [tailcfg.PeerCapMap] between the passed src and the + // _first_ host (/32 or /128) address for self. We are + // consistent with that behavior here. If self and peer + // host addresses are of the same address family they either + // have the capability or not. We do not check against + // additional host addresses of the same address family. + return netip.AddrPort{} + } + } + } + } + return netip.AddrPort{} } // onNodeViewsUpdate is called when a [NodeViewsUpdate] is received over the // [eventbus.Bus]. func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) { + peersChanged := c.updateNodes(update) + + relayClientEnabled := update.SelfNode.Valid() && + update.SelfNode.HasCap(tailcfg.NodeAttrRelayClient) && + envknob.UseWIPCode() + + c.mu.Lock() + relayClientChanged := c.relayClientEnabled != relayClientEnabled + c.relayClientEnabled = relayClientEnabled + filt := c.filt + self := c.self + peers := c.peers + c.mu.Unlock() // release c.mu before potentially calling c.updateRelayServersSet which is O(m * n) + + if peersChanged || relayClientChanged { + if !relayClientEnabled { + c.relayManager.handleRelayServersSet(nil) + } else { + c.updateRelayServersSet(filt, self, peers) + } + } +} + +// updateNodes updates [Conn] to reflect the [tailcfg.NodeView]'s contained +// in update. It returns true if update.Peers was unequal to c.peers, otherwise +// false. +func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) { c.mu.Lock() defer c.mu.Unlock() if c.closed { - return + return false } priorPeers := c.peers metricNumPeers.Set(int64(len(update.Peers))) - // Update c.netMap regardless, before the following early return. + // Update c.self & c.peers regardless, before the following early return. + c.self = update.SelfNode curPeers := views.SliceOf(update.Peers) c.peers = curPeers + // [debugFlags] are mutable in [Conn.SetSilentDisco] & + // [Conn.SetProbeUDPLifetime]. These setters are passed [controlknobs.Knobs] + // values by [ipnlocal.LocalBackend] around netmap reception. + // [controlknobs.Knobs] are simply self [tailcfg.NodeCapability]'s. They are + // useful as a global view of notable feature toggles, but the magicsock + // setters are completely unnecessary as we have the same values right here + // (update.SelfNode.Capabilities) at a time they are considered most + // up-to-date. + // TODO: mutate [debugFlags] here instead of in various [Conn] setters. flags := c.debugFlagsLocked() - if update.SelfNode.Valid() && update.SelfNode.Addresses().Len() > 0 { - c.firstAddrForTest = update.SelfNode.Addresses().At(0).Addr() - } else { - c.firstAddrForTest = netip.Addr{} - } - if nodesEqual(priorPeers, curPeers) && c.lastFlags == flags { + peersChanged = !nodesEqual(priorPeers, curPeers) + if !peersChanged && c.lastFlags == flags { // The rest of this function is all adjusting state for peers that have // changed. But if the set of peers is equal and the debug flags (for // silent disco and probe UDP lifetime) haven't changed, there is no @@ -2728,6 +2853,8 @@ func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) { delete(c.discoInfo, dk) } } + + return peersChanged } func devPanicf(format string, a ...any) { @@ -3245,7 +3372,7 @@ func simpleDur(d time.Duration) time.Duration { } // onNodeMutationsUpdate is called when a [NodeMutationsUpdate] is received over -// the [eventbus.Bus]. +// the [eventbus.Bus]. Note: It does not apply these mutations to c.peers. func (c *Conn) onNodeMutationsUpdate(update NodeMutationsUpdate) { c.mu.Lock() defer c.mu.Unlock() diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 7fa062fa8..8aa9a09d2 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -19,6 +19,7 @@ import ( "net/http/httptest" "net/netip" "os" + "reflect" "runtime" "strconv" "strings" @@ -71,6 +72,7 @@ import ( "tailscale.com/util/slicesx" "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" + "tailscale.com/wgengine/filter/filtertype" "tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg/nmcfg" "tailscale.com/wgengine/wglog" @@ -275,7 +277,10 @@ func (s *magicStack) Status() *ipnstate.Status { func (s *magicStack) IP() netip.Addr { for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { s.conn.mu.Lock() - addr := s.conn.firstAddrForTest + var addr netip.Addr + if s.conn.self.Valid() && s.conn.self.Addresses().Len() > 0 { + addr = s.conn.self.Addresses().At(0).Addr() + } s.conn.mu.Unlock() if addr.IsValid() { return addr @@ -3378,3 +3383,198 @@ func Test_virtualNetworkID(t *testing.T) { }) } } + +func Test_peerAPIIfCandidateRelayServer(t *testing.T) { + selfOnlyIPv4 := &tailcfg.Node{ + Cap: math.MinInt32, + Addresses: []netip.Prefix{ + netip.MustParsePrefix("1.1.1.1/32"), + }, + } + selfOnlyIPv6 := selfOnlyIPv4.Clone() + selfOnlyIPv6.Addresses[0] = netip.MustParsePrefix("::1/128") + + peerHostinfo := &tailcfg.Hostinfo{ + Services: []tailcfg.Service{ + { + Proto: tailcfg.PeerAPI4, + Port: 4, + }, + { + Proto: tailcfg.PeerAPI6, + Port: 6, + }, + }, + } + peerOnlyIPv4 := &tailcfg.Node{ + Cap: math.MinInt32, + CapMap: map[tailcfg.NodeCapability][]tailcfg.RawMessage{ + tailcfg.NodeAttrRelayServer: nil, + }, + Addresses: []netip.Prefix{ + netip.MustParsePrefix("2.2.2.2/32"), + }, + Hostinfo: peerHostinfo.View(), + } + + peerOnlyIPv6 := peerOnlyIPv4.Clone() + peerOnlyIPv6.Addresses[0] = netip.MustParsePrefix("::2/128") + + peerOnlyIPv4ZeroCapVer := peerOnlyIPv4.Clone() + peerOnlyIPv4ZeroCapVer.Cap = 0 + + peerOnlyIPv4NilHostinfo := peerOnlyIPv4.Clone() + peerOnlyIPv4NilHostinfo.Hostinfo = tailcfg.HostinfoView{} + + tests := []struct { + name string + filt *filter.Filter + self tailcfg.NodeView + peer tailcfg.NodeView + want netip.AddrPort + }{ + { + name: "match v4", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("1.1.1.1/32"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv4.View(), + peer: peerOnlyIPv4.View(), + want: netip.MustParseAddrPort("2.2.2.2:4"), + }, + { + name: "match v6", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("::1/128"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv6.View(), + peer: peerOnlyIPv6.View(), + want: netip.MustParseAddrPort("[::2]:6"), + }, + { + name: "no match dst", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("::3/128"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv6.View(), + peer: peerOnlyIPv6.View(), + }, + { + name: "no match peer cap", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("::2/128")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("::1/128"), + Cap: tailcfg.PeerCapabilityIngress, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv6.View(), + peer: peerOnlyIPv6.View(), + }, + { + name: "cap ver not relay capable", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("1.1.1.1/32"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: peerOnlyIPv4.View(), + peer: peerOnlyIPv4ZeroCapVer.View(), + }, + { + name: "nil filt", + filt: nil, + self: selfOnlyIPv4.View(), + peer: peerOnlyIPv4.View(), + }, + { + name: "nil self", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("1.1.1.1/32"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: tailcfg.NodeView{}, + peer: peerOnlyIPv4.View(), + }, + { + name: "nil peer", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("1.1.1.1/32"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv4.View(), + peer: tailcfg.NodeView{}, + }, + { + name: "nil peer hostinfo", + filt: filter.New([]filtertype.Match{ + { + Srcs: []netip.Prefix{netip.MustParsePrefix("2.2.2.2/32")}, + Caps: []filtertype.CapMatch{ + { + Dst: netip.MustParsePrefix("1.1.1.1/32"), + Cap: tailcfg.PeerCapabilityRelayTarget, + }, + }, + }, + }, nil, nil, nil, nil, nil), + self: selfOnlyIPv4.View(), + peer: peerOnlyIPv4NilHostinfo.View(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := peerAPIIfCandidateRelayServer(tt.filt, tt.self, tt.peer); !reflect.DeepEqual(got, tt.want) { + t.Errorf("peerAPIIfCandidateRelayServer() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/wgengine/magicsock/relaymanager.go b/wgengine/magicsock/relaymanager.go index 2b636dc57..3c8ceb2de 100644 --- a/wgengine/magicsock/relaymanager.go +++ b/wgengine/magicsock/relaymanager.go @@ -51,6 +51,7 @@ type relayManager struct { cancelWorkCh chan *endpoint newServerEndpointCh chan newRelayServerEndpointEvent rxHandshakeDiscoMsgCh chan relayHandshakeDiscoMsgEvent + serversCh chan set.Set[netip.AddrPort] discoInfoMu sync.Mutex // guards the following field discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo @@ -174,10 +175,32 @@ func (r *relayManager) runLoop() { if !r.hasActiveWorkRunLoop() { return } + case serversUpdate := <-r.serversCh: + r.handleServersUpdateRunLoop(serversUpdate) + if !r.hasActiveWorkRunLoop() { + return + } } } } +func (r *relayManager) handleServersUpdateRunLoop(update set.Set[netip.AddrPort]) { + for k, v := range r.serversByAddrPort { + if !update.Contains(k) { + delete(r.serversByAddrPort, k) + delete(r.serversByDisco, v) + } + } + for _, v := range update.Slice() { + _, ok := r.serversByAddrPort[v] + if ok { + // don't zero known disco keys + continue + } + r.serversByAddrPort[v] = key.DiscoPublic{} + } +} + type relayHandshakeDiscoMsgEvent struct { conn *Conn // for access to [Conn] if there is no associated [relayHandshakeWork] msg disco.Message @@ -215,6 +238,7 @@ func (r *relayManager) init() { r.cancelWorkCh = make(chan *endpoint) r.newServerEndpointCh = make(chan newRelayServerEndpointEvent) r.rxHandshakeDiscoMsgCh = make(chan relayHandshakeDiscoMsgEvent) + r.serversCh = make(chan set.Set[netip.AddrPort]) r.runLoopStoppedCh = make(chan struct{}, 1) r.runLoopStoppedCh <- struct{}{} }) @@ -299,6 +323,11 @@ func (r *relayManager) handleGeneveEncapDiscoMsgNotBestAddr(dm disco.Message, di relayManagerInputEvent(r, nil, &r.rxHandshakeDiscoMsgCh, relayHandshakeDiscoMsgEvent{msg: dm, disco: di.discoKey, from: src.ap, vni: src.vni.get(), at: time.Now()}) } +// handleRelayServersSet handles an update of the complete relay server set. +func (r *relayManager) handleRelayServersSet(servers set.Set[netip.AddrPort]) { + relayManagerInputEvent(r, nil, &r.serversCh, servers) +} + // relayManagerInputEvent initializes [relayManager] if necessary, starts // relayManager.runLoop() if it is not running, and writes 'event' on 'eventCh'. // diff --git a/wgengine/magicsock/relaymanager_test.go b/wgengine/magicsock/relaymanager_test.go index be0582669..6055c2d72 100644 --- a/wgengine/magicsock/relaymanager_test.go +++ b/wgengine/magicsock/relaymanager_test.go @@ -4,10 +4,12 @@ package magicsock import ( + "net/netip" "testing" "tailscale.com/disco" "tailscale.com/types/key" + "tailscale.com/util/set" ) func TestRelayManagerInitAndIdle(t *testing.T) { @@ -26,4 +28,8 @@ func TestRelayManagerInitAndIdle(t *testing.T) { rm = relayManager{} rm.handleGeneveEncapDiscoMsgNotBestAddr(&disco.BindUDPRelayEndpointChallenge{}, &discoInfo{}, epAddr{}) <-rm.runLoopStoppedCh + + rm = relayManager{} + rm.handleRelayServersSet(make(set.Set[netip.AddrPort])) + <-rm.runLoopStoppedCh }