ipn/ipnlocal,wgengine{/magicsock}: replace SetNetworkMap with eventbus (#16299)

Same with UpdateNetmapDelta.

Updates tailscale/corp#27502
Updates #15160

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited 2025-06-18 10:31:00 -07:00 committed by GitHub
parent 45a4b69ce0
commit fcab50b276
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 194 additions and 129 deletions

View File

@ -1946,10 +1946,6 @@ var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil)
// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater.
func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
if !b.MagicConn().UpdateNetmapDelta(muts) {
return false
}
var notify *ipn.Notify // non-nil if we need to send a Notify
defer func() {
if notify != nil {

View File

@ -5,6 +5,7 @@ package ipnlocal
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
@ -23,6 +24,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
memro "go4.org/mem"
"go4.org/netipx"
"golang.org/x/net/dns/dnsmessage"
"tailscale.com/appc"
@ -77,6 +79,12 @@ func inRemove(ip netip.Addr) bool {
return false
}
func makeNodeKeyFromID(nodeID tailcfg.NodeID) key.NodePublic {
raw := make([]byte, 32)
binary.BigEndian.PutUint64(raw[24:], uint64(nodeID))
return key.NodePublicFromRaw32(memro.B(raw))
}
func TestShrinkDefaultRoute(t *testing.T) {
tests := []struct {
route string
@ -794,6 +802,7 @@ func TestStatusPeerCapabilities(t *testing.T) {
(&tailcfg.Node{
ID: 1,
StableID: "foo",
Key: makeNodeKeyFromID(1),
IsWireGuardOnly: true,
Hostinfo: (&tailcfg.Hostinfo{}).View(),
Capabilities: []tailcfg.NodeCapability{tailcfg.CapabilitySSH},
@ -804,6 +813,7 @@ func TestStatusPeerCapabilities(t *testing.T) {
(&tailcfg.Node{
ID: 2,
StableID: "bar",
Key: makeNodeKeyFromID(2),
Hostinfo: (&tailcfg.Hostinfo{}).View(),
Capabilities: []tailcfg.NodeCapability{tailcfg.CapabilityAdmin},
CapMap: (tailcfg.NodeCapMap)(map[tailcfg.NodeCapability][]tailcfg.RawMessage{
@ -830,12 +840,14 @@ func TestStatusPeerCapabilities(t *testing.T) {
(&tailcfg.Node{
ID: 1,
StableID: "foo",
Key: makeNodeKeyFromID(1),
IsWireGuardOnly: true,
Hostinfo: (&tailcfg.Hostinfo{}).View(),
}).View(),
(&tailcfg.Node{
ID: 2,
StableID: "bar",
Key: makeNodeKeyFromID(2),
Hostinfo: (&tailcfg.Hostinfo{}).View(),
}).View(),
},
@ -927,7 +939,11 @@ func TestUpdateNetmapDelta(t *testing.T) {
nm := &netmap.NetworkMap{}
for i := range 5 {
nm.Peers = append(nm.Peers, (&tailcfg.Node{ID: (tailcfg.NodeID(i) + 1)}).View())
id := tailcfg.NodeID(i + 1)
nm.Peers = append(nm.Peers, (&tailcfg.Node{
ID: id,
Key: makeNodeKeyFromID(id),
}).View())
}
b.currentNode().SetNetMap(nm)
@ -963,18 +979,22 @@ func TestUpdateNetmapDelta(t *testing.T) {
wants := []*tailcfg.Node{
{
ID: 1,
Key: makeNodeKeyFromID(1),
HomeDERP: 1,
},
{
ID: 2,
Key: makeNodeKeyFromID(2),
Online: ptr.To(true),
},
{
ID: 3,
Key: makeNodeKeyFromID(3),
Online: ptr.To(false),
},
{
ID: 4,
Key: makeNodeKeyFromID(4),
LastSeen: ptr.To(someTime),
},
}
@ -998,12 +1018,14 @@ func TestWhoIs(t *testing.T) {
SelfNode: (&tailcfg.Node{
ID: 1,
User: 10,
Key: makeNodeKeyFromID(1),
Addresses: []netip.Prefix{netip.MustParsePrefix("100.101.102.103/32")},
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
ID: 2,
User: 20,
Key: makeNodeKeyFromID(2),
Addresses: []netip.Prefix{netip.MustParsePrefix("100.200.200.200/32")},
}).View(),
},
@ -1593,6 +1615,7 @@ func dnsResponse(domain, address string) []byte {
}
func TestSetExitNodeIDPolicy(t *testing.T) {
zeroValHostinfoView := new(tailcfg.Hostinfo).View()
pfx := netip.MustParsePrefix
tests := []struct {
name string
@ -1669,14 +1692,18 @@ func TestSetExitNodeIDPolicy(t *testing.T) {
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
ID: 201,
Name: "a.tailnet",
Key: makeNodeKeyFromID(201),
Addresses: []netip.Prefix{
pfx("100.0.0.201/32"),
pfx("100::201/128"),
},
}).View(),
(&tailcfg.Node{
ID: 202,
Name: "b.tailnet",
Key: makeNodeKeyFromID(202),
Addresses: []netip.Prefix{
pfx("100::202/128"),
},
@ -1702,18 +1729,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) {
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
ID: 123,
Name: "a.tailnet",
StableID: tailcfg.StableNodeID("123"),
Key: makeNodeKeyFromID(123),
Addresses: []netip.Prefix{
pfx("127.0.0.1/32"),
pfx("100::201/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
(&tailcfg.Node{
ID: 202,
Name: "b.tailnet",
Key: makeNodeKeyFromID(202),
Addresses: []netip.Prefix{
pfx("100::202/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
},
},
@ -1734,18 +1767,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) {
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
ID: 123,
Name: "a.tailnet",
StableID: tailcfg.StableNodeID("123"),
Key: makeNodeKeyFromID(123),
Addresses: []netip.Prefix{
pfx("127.0.0.1/32"),
pfx("100::201/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
(&tailcfg.Node{
ID: 202,
Name: "b.tailnet",
Key: makeNodeKeyFromID(202),
Addresses: []netip.Prefix{
pfx("100::202/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
},
},
@ -1768,18 +1807,24 @@ func TestSetExitNodeIDPolicy(t *testing.T) {
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
ID: 123,
Name: "a.tailnet",
StableID: tailcfg.StableNodeID("123"),
Key: makeNodeKeyFromID(123),
Addresses: []netip.Prefix{
pfx("100.64.5.6/32"),
pfx("100::201/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
(&tailcfg.Node{
ID: 202,
Name: "b.tailnet",
Key: makeNodeKeyFromID(202),
Addresses: []netip.Prefix{
pfx("100::202/128"),
},
Hostinfo: zeroValHostinfoView,
}).View(),
},
},
@ -1827,7 +1872,6 @@ func TestSetExitNodeIDPolicy(t *testing.T) {
b.currentNode().SetNetMap(test.nm)
b.pm = pm
b.lastSuggestedExitNode = test.lastSuggestedExitNode
prefs := b.pm.prefs.AsStruct()
if changed := applySysPolicy(prefs, test.lastSuggestedExitNode, false) || setExitNodeID(prefs, test.nm); changed != test.prefsChanged {
t.Errorf("wanted prefs changed %v, got prefs changed %v", test.prefsChanged, changed)
@ -3218,6 +3262,7 @@ type peerOptFunc func(*tailcfg.Node)
func makePeer(id tailcfg.NodeID, opts ...peerOptFunc) tailcfg.NodeView {
node := &tailcfg.Node{
ID: id,
Key: makeNodeKeyFromID(id),
StableID: tailcfg.StableNodeID(fmt.Sprintf("stable%d", id)),
Name: fmt.Sprintf("peer%d", id),
HomeDERP: int(id),

View File

@ -72,9 +72,10 @@ type nodeBackend struct {
filterAtomic atomic.Pointer[filter.Filter]
// initialized once and immutable
eventClient *eventbus.Client
filterUpdates *eventbus.Publisher[magicsock.FilterUpdate]
nodeUpdates *eventbus.Publisher[magicsock.NodeAddrsHostInfoUpdate]
eventClient *eventbus.Client
filterPub *eventbus.Publisher[magicsock.FilterUpdate]
nodeViewsPub *eventbus.Publisher[magicsock.NodeViewsUpdate]
nodeMutsPub *eventbus.Publisher[magicsock.NodeMutationsUpdate]
// TODO(nickkhyl): maybe use sync.RWMutex?
mu sync.Mutex // protects the following fields
@ -113,9 +114,10 @@ func newNodeBackend(ctx context.Context, bus *eventbus.Bus) *nodeBackend {
// Default filter blocks everything and logs nothing.
noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{})
nb.filterAtomic.Store(noneFilter)
nb.filterUpdates = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient)
nb.nodeUpdates = eventbus.Publish[magicsock.NodeAddrsHostInfoUpdate](nb.eventClient)
nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()})
nb.filterPub = eventbus.Publish[magicsock.FilterUpdate](nb.eventClient)
nb.nodeViewsPub = eventbus.Publish[magicsock.NodeViewsUpdate](nb.eventClient)
nb.nodeMutsPub = eventbus.Publish[magicsock.NodeMutationsUpdate](nb.eventClient)
nb.filterPub.Publish(magicsock.FilterUpdate{Filter: nb.filterAtomic.Load()})
return nb
}
@ -379,6 +381,12 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) {
nb.netMap = nm
nb.updateNodeByAddrLocked()
nb.updatePeersLocked()
nv := magicsock.NodeViewsUpdate{}
if nm != nil {
nv.SelfNode = nm.SelfNode
nv.Peers = nm.Peers
}
nb.nodeViewsPub.Publish(nv)
}
func (nb *nodeBackend) updateNodeByAddrLocked() {
@ -429,16 +437,9 @@ func (nb *nodeBackend) updatePeersLocked() {
nb.peers[k] = tailcfg.NodeView{}
}
changed := magicsock.NodeAddrsHostInfoUpdate{
Complete: true,
}
// Second pass, add everything wanted.
for _, p := range nm.Peers {
mak.Set(&nb.peers, p.ID(), p)
mak.Set(&changed.NodesByID, p.ID(), magicsock.NodeAddrsHostInfo{
Addresses: p.Addresses(),
Hostinfo: p.Hostinfo(),
})
}
// Third pass, remove deleted things.
@ -447,7 +448,6 @@ func (nb *nodeBackend) updatePeersLocked() {
delete(nb.peers, k)
}
}
nb.nodeUpdates.Publish(changed)
}
func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
@ -462,8 +462,8 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
// call (e.g. its endpoints + online status both change)
var mutableNodes map[tailcfg.NodeID]*tailcfg.Node
changed := magicsock.NodeAddrsHostInfoUpdate{
Complete: false,
update := magicsock.NodeMutationsUpdate{
Mutations: make([]netmap.NodeMutation, 0, len(muts)),
}
for _, m := range muts {
n, ok := mutableNodes[m.NodeIDBeingMutated()]
@ -475,18 +475,14 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
}
n = nv.AsStruct()
mak.Set(&mutableNodes, nv.ID(), n)
update.Mutations = append(update.Mutations, m)
}
m.Apply(n)
}
for nid, n := range mutableNodes {
nv := n.View()
nb.peers[nid] = nv
mak.Set(&changed.NodesByID, nid, magicsock.NodeAddrsHostInfo{
Addresses: nv.Addresses(),
Hostinfo: nv.Hostinfo(),
})
nb.peers[nid] = n.View()
}
nb.nodeUpdates.Publish(changed)
nb.nodeMutsPub.Publish(update)
return true
}
@ -508,7 +504,7 @@ func (nb *nodeBackend) filter() *filter.Filter {
func (nb *nodeBackend) setFilter(f *filter.Filter) {
nb.filterAtomic.Store(f)
nb.filterUpdates.Publish(magicsock.FilterUpdate{Filter: f})
nb.filterPub.Publish(magicsock.FilterUpdate{Filter: f})
}
func (nb *nodeBackend) dnsConfigForNetmap(prefs ipn.PrefsView, selfExpired bool, logf logger.Logf, versionOS string) *dns.Config {

View File

@ -918,6 +918,7 @@ func newTestBackend(t *testing.T) *LocalBackend {
ID: 152,
ComputedName: "some-peer",
User: tailcfg.UserID(1),
Key: makeNodeKeyFromID(152),
Addresses: []netip.Prefix{
netip.MustParsePrefix("100.150.151.152/32"),
},
@ -927,6 +928,7 @@ func newTestBackend(t *testing.T) *LocalBackend {
ComputedName: "some-tagged-peer",
Tags: []string{"tag:server", "tag:test"},
User: tailcfg.UserID(1),
Key: makeNodeKeyFromID(153),
Addresses: []netip.Prefix{
netip.MustParsePrefix("100.150.151.153/32"),
},

View File

@ -160,6 +160,14 @@ type Conn struct {
connCtxCancel func() // closes connCtx
donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call
// These [eventbus.Subscriber] fields are solely accessed by
// consumeEventbusTopics once initialized.
pmSub *eventbus.Subscriber[portmapper.Mapping]
filterSub *eventbus.Subscriber[FilterUpdate]
nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate]
nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
// pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock
// protocols.
@ -341,9 +349,9 @@ type Conn struct {
netInfoLast *tailcfg.NetInfo
derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled
peers views.Slice[tailcfg.NodeView] // from last SetNetworkMap update
lastFlags debugFlags // at time of last SetNetworkMap
firstAddrForTest netip.Addr // from last SetNetworkMap update; for tests only
peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate update
lastFlags debugFlags // at time of last onNodeViewsUpdate
firstAddrForTest netip.Addr // from last onNodeViewsUpdate update; for tests only
privateKey key.NodePrivate // WireGuard private key for this node
everHadKey bool // whether we ever had a non-zero private key
myDerp int // nearest DERP region ID; 0 means none/unknown
@ -411,10 +419,8 @@ func (c *Conn) dlogf(format string, a ...any) {
// Options contains options for Listen.
type Options struct {
// EventBus, if non-nil, is used for event publication and subscription by
// each Conn created from these Options.
//
// TODO(creachadair): As of 2025-03-19 this is optional, but is intended to
// become required non-nil.
// each Conn created from these Options. It must not be nil outside of
// tests.
EventBus *eventbus.Bus
// Logf provides a log function to use. It must not be nil.
@ -503,20 +509,22 @@ func (o *Options) derpActiveFunc() func() {
return o.DERPActiveFunc
}
// NodeAddrsHostInfoUpdate represents an update event of the addresses and
// [tailcfg.HostInfoView] for a node set. This event is published over an
// [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as of 2025-06. If
// you are adding more subscribers consider moving this type out of magicsock.
type NodeAddrsHostInfoUpdate struct {
NodesByID map[tailcfg.NodeID]NodeAddrsHostInfo
Complete bool // true if NodesByID contains all known nodes, false if it may be a subset
// NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all
// nodes. This event is published over an [eventbus.Bus]. It may be published
// with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole
// subscriber as of 2025-06. If you are adding more subscribers consider moving
// this type out of magicsock.
type NodeViewsUpdate struct {
SelfNode tailcfg.NodeView
Peers []tailcfg.NodeView
}
// NodeAddrsHostInfo represents the addresses and [tailcfg.HostinfoView] for a
// Tailscale node.
type NodeAddrsHostInfo struct {
Addresses views.Slice[netip.Prefix]
Hostinfo tailcfg.HostinfoView
// NodeMutationsUpdate represents an update event of one or more
// [netmap.NodeMutation]. This event is published over an [eventbus.Bus].
// [magicsock.Conn] is the sole subscriber as of 2025-06. If you are adding more
// subscribers consider moving this type out of magicsock.
type NodeMutationsUpdate struct {
Mutations []netmap.NodeMutation
}
// FilterUpdate represents an update event for a [*filter.Filter]. This event is
@ -560,16 +568,28 @@ func newConn(logf logger.Logf) *Conn {
return c
}
// consumeEventbusTopic consumes events from sub and passes them to
// handlerFn until sub.Done() is closed.
func consumeEventbusTopic[T any](sub *eventbus.Subscriber[T], handlerFn func(t T)) {
defer sub.Close()
// consumeEventbusTopics consumes events from all [Conn]-relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [portmapper.Mapping] subscriber is closed, which is interpreted to be the
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
// all open or all closed).
func (c *Conn) consumeEventbusTopics() {
defer close(c.subsDoneCh)
for {
select {
case evt := <-sub.Events():
handlerFn(evt)
case <-sub.Done():
case <-c.pmSub.Done():
return
case <-c.pmSub.Events():
c.onPortMapChanged()
case filterUpdate := <-c.filterSub.Events():
c.onFilterUpdate(filterUpdate)
case nodeViews := <-c.nodeViewsSub.Events():
c.onNodeViewsUpdate(nodeViews)
case nodeMuts := <-c.nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
}
}
}
@ -592,29 +612,17 @@ func NewConn(opts Options) (*Conn, error) {
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
// If an event bus is enabled, subscribe to portmapping changes; otherwise
// use the callback mechanism of portmapper.Client.
//
// TODO(creachadair): Remove the switch once the event bus is mandatory.
onPortMapChanged := c.onPortMapChanged
if c.eventBus != nil {
c.eventClient = c.eventBus.Client("magicsock.Conn")
pmSub := eventbus.Subscribe[portmapper.Mapping](c.eventClient)
go consumeEventbusTopic(pmSub, func(_ portmapper.Mapping) {
c.onPortMapChanged()
})
filterSub := eventbus.Subscribe[FilterUpdate](c.eventClient)
go consumeEventbusTopic(filterSub, func(t FilterUpdate) {
// TODO(jwhited): implement
})
nodeSub := eventbus.Subscribe[NodeAddrsHostInfoUpdate](c.eventClient)
go consumeEventbusTopic(nodeSub, func(t NodeAddrsHostInfoUpdate) {
// TODO(jwhited): implement
})
// Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil
// Subscribe calls must return before NewConn otherwise published
// events can be missed.
c.pmSub = eventbus.Subscribe[portmapper.Mapping](c.eventClient)
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
}
// Don't log the same log messages possibly every few seconds in our
@ -630,7 +638,6 @@ func NewConn(opts Options) (*Conn, error) {
NetMon: opts.NetMon,
DebugKnobs: portMapOpts,
ControlKnobs: opts.ControlKnobs,
OnChange: onPortMapChanged,
})
c.portMapper.SetGatewayLookupFunc(opts.NetMon.GatewayAndSelfIP)
c.netMon = opts.NetMon
@ -2551,12 +2558,13 @@ func capVerIsRelayCapable(version tailcfg.CapabilityVersion) bool {
return false
}
// SetNetworkMap is called when the control client gets a new network
// map from the control server. It must always be non-nil.
//
// It should not use the DERPMap field of NetworkMap; that's
// conditionally sent to SetDERPMap instead.
func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
func (c *Conn) onFilterUpdate(f FilterUpdate) {
// TODO(jwhited): implement
}
// onNodeViewsUpdate is called when a [NodeViewsUpdate] is received over the
// [eventbus.Bus].
func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
@ -2565,15 +2573,15 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
}
priorPeers := c.peers
metricNumPeers.Set(int64(len(nm.Peers)))
metricNumPeers.Set(int64(len(update.Peers)))
// Update c.netMap regardless, before the following early return.
curPeers := views.SliceOf(nm.Peers)
curPeers := views.SliceOf(update.Peers)
c.peers = curPeers
flags := c.debugFlagsLocked()
if addrs := nm.GetAddresses(); addrs.Len() > 0 {
c.firstAddrForTest = addrs.At(0).Addr()
if update.SelfNode.Valid() && update.SelfNode.Addresses().Len() > 0 {
c.firstAddrForTest = update.SelfNode.Addresses().At(0).Addr()
} else {
c.firstAddrForTest = netip.Addr{}
}
@ -2588,16 +2596,16 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
c.lastFlags = flags
c.logf("[v1] magicsock: got updated network map; %d peers", len(nm.Peers))
c.logf("[v1] magicsock: got updated network map; %d peers", len(update.Peers))
entriesPerBuffer := debugRingBufferSize(len(nm.Peers))
entriesPerBuffer := debugRingBufferSize(len(update.Peers))
// Try a pass of just upserting nodes and creating missing
// endpoints. If the set of nodes is the same, this is an
// efficient alloc-free update. If the set of nodes is different,
// we'll fall through to the next pass, which allocates but can
// handle full set updates.
for _, n := range nm.Peers {
for _, n := range update.Peers {
if n.ID() == 0 {
devPanicf("node with zero ID")
continue
@ -2697,14 +2705,14 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
c.peerMap.upsertEndpoint(ep, key.DiscoPublic{})
}
// If the set of nodes changed since the last SetNetworkMap, the
// If the set of nodes changed since the last onNodeViewsUpdate, the
// upsert loop just above made c.peerMap contain the union of the
// old and new peers - which will be larger than the set from the
// current netmap. If that happens, go through the allocful
// deletion path to clean up moribund nodes.
if c.peerMap.nodeCount() != len(nm.Peers) {
if c.peerMap.nodeCount() != len(update.Peers) {
keep := set.Set[key.NodePublic]{}
for _, n := range nm.Peers {
for _, n := range update.Peers {
keep.Add(n.Key())
}
c.peerMap.forEachEndpoint(func(ep *endpoint) {
@ -2837,10 +2845,6 @@ func (c *connBind) Close() error {
return nil
}
c.closed = true
// Close the [eventbus.Client].
if c.eventClient != nil {
c.eventClient.Close()
}
// Unblock all outstanding receives.
c.pconn4.Close()
c.pconn6.Close()
@ -2850,9 +2854,6 @@ func (c *connBind) Close() error {
if c.closeDisco6 != nil {
c.closeDisco6.Close()
}
if c.eventClient != nil {
c.eventClient.Close()
}
// Send an empty read result to unblock receiveDERP,
// which will then check connBind.Closed.
// connBind.Closed takes c.mu, but c.derpRecvCh is buffered.
@ -2871,6 +2872,17 @@ func (c *connBind) isClosed() bool {
//
// Only the first close does anything. Any later closes return nil.
func (c *Conn) Close() error {
// Close the [eventbus.Client] and wait for Conn.consumeEventbusTopics to
// return. Do this before acquiring c.mu:
// 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
if c.eventClient != nil {
c.eventClient.Close()
<-c.subsDoneCh
}
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
@ -2901,7 +2913,6 @@ func (c *Conn) Close() error {
if c.closeDisco6 != nil {
c.closeDisco6.Close()
}
// Wait on goroutines updating right at the end, once everything is
// already closed. We want everything else in the Conn to be
// consistently in the closed state before we release mu to wait
@ -3233,12 +3244,13 @@ func simpleDur(d time.Duration) time.Duration {
return d.Round(time.Minute)
}
// UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater.
func (c *Conn) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
// onNodeMutationsUpdate is called when a [NodeMutationsUpdate] is received over
// the [eventbus.Bus].
func (c *Conn) onNodeMutationsUpdate(update NodeMutationsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
for _, m := range muts {
for _, m := range update.Mutations {
nodeID := m.NodeIDBeingMutated()
ep, ok := c.peerMap.endpointForNodeID(nodeID)
if !ok {
@ -3257,7 +3269,6 @@ func (c *Conn) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) {
ep.mu.Unlock()
}
}
return true
}
// UpdateStatus implements the interface nede by ipnstate.StatusBuilder.

View File

@ -166,7 +166,7 @@ type magicStack struct {
}
// newMagicStack builds and initializes an idle magicsock and
// friends. You need to call conn.SetNetworkMap and dev.Reconfig
// friends. You need to call conn.onNodeViewsUpdate and dev.Reconfig
// before anything interesting happens.
func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap) *magicStack {
privateKey := key.NewNode()
@ -339,9 +339,13 @@ func meshStacks(logf logger.Logf, mutateNetmap func(idx int, nm *netmap.NetworkM
for i, m := range ms {
nm := buildNetmapLocked(i)
m.conn.SetNetworkMap(nm)
peerSet := make(set.Set[key.NodePublic], len(nm.Peers))
for _, peer := range nm.Peers {
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
peerSet := make(set.Set[key.NodePublic], len(nv.Peers))
for _, peer := range nv.Peers {
peerSet.Add(peer.Key())
}
m.conn.UpdatePeers(peerSet)
@ -1366,7 +1370,7 @@ func newTestConn(t testing.TB) *Conn {
return conn
}
// addTestEndpoint sets conn's network map to a single peer expected
// addTestEndpoint sets conn's node views to a single peer expected
// to receive packets from sendConn (or DERP), and returns that peer's
// nodekey and discokey.
func addTestEndpoint(tb testing.TB, conn *Conn, sendConn net.PacketConn) (key.NodePublic, key.DiscoPublic) {
@ -1375,7 +1379,7 @@ func addTestEndpoint(tb testing.TB, conn *Conn, sendConn net.PacketConn) (key.No
// codepath.
discoKey := key.DiscoPublicFromRaw32(mem.B([]byte{31: 1}))
nodeKey := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 31: 0}))
conn.SetNetworkMap(&netmap.NetworkMap{
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 1,
@ -1564,11 +1568,11 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView {
return nv
}
// Test that a netmap update where node changes its node key but
// Test that a node views update where node changes its node key but
// doesn't change its disco key doesn't result in a broken state.
//
// https://github.com/tailscale/tailscale/issues/1391
func TestSetNetworkMapChangingNodeKey(t *testing.T) {
func TestOnNodeViewsUpdateChangingNodeKey(t *testing.T) {
conn := newTestConn(t)
t.Cleanup(func() { conn.Close() })
var buf tstest.MemLogger
@ -1580,7 +1584,7 @@ func TestSetNetworkMapChangingNodeKey(t *testing.T) {
nodeKey1 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '1', 31: 0}))
nodeKey2 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '2', 31: 0}))
conn.SetNetworkMap(&netmap.NetworkMap{
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 1,
@ -1596,7 +1600,7 @@ func TestSetNetworkMapChangingNodeKey(t *testing.T) {
}
for range 3 {
conn.SetNetworkMap(&netmap.NetworkMap{
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 2,
@ -1921,7 +1925,7 @@ func eps(s ...string) []netip.AddrPort {
return eps
}
func TestStressSetNetworkMap(t *testing.T) {
func TestStressOnNodeViewsUpdate(t *testing.T) {
t.Parallel()
conn := newTestConn(t)
@ -1969,15 +1973,15 @@ func TestStressSetNetworkMap(t *testing.T) {
allPeers[j].Key = randNodeKey()
}
}
// Clone existing peers into a new netmap.
// Clone existing peers.
peers := make([]*tailcfg.Node, 0, len(allPeers))
for peerIdx, p := range allPeers {
if present[peerIdx] {
peers = append(peers, p.Clone())
}
}
// Set the netmap.
conn.SetNetworkMap(&netmap.NetworkMap{
// Set the node views.
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews(peers),
})
// Check invariants.
@ -2102,10 +2106,10 @@ func TestRebindingUDPConn(t *testing.T) {
}
// https://github.com/tailscale/tailscale/issues/6680: don't ignore
// SetNetworkMap calls when there are no peers. (A too aggressive fast path was
// onNodeViewsUpdate calls when there are no peers. (A too aggressive fast path was
// previously bailing out early, thinking there were no changes since all zero
// peers didn't change, but the netmap has non-peer info in it too we shouldn't discard)
func TestSetNetworkMapWithNoPeers(t *testing.T) {
// peers didn't change, but the node views has non-peer info in it too we shouldn't discard)
func TestOnNodeViewsUpdateWithNoPeers(t *testing.T) {
var c Conn
knobs := &controlknobs.Knobs{}
c.logf = logger.Discard
@ -2114,9 +2118,9 @@ func TestSetNetworkMapWithNoPeers(t *testing.T) {
for i := 1; i <= 3; i++ {
v := !debugEnableSilentDisco()
envknob.Setenv("TS_DEBUG_ENABLE_SILENT_DISCO", fmt.Sprint(v))
nm := &netmap.NetworkMap{}
c.SetNetworkMap(nm)
t.Logf("ptr %d: %p", i, nm)
nv := NodeViewsUpdate{}
c.onNodeViewsUpdate(nv)
t.Logf("ptr %d: %p", i, nv)
if c.lastFlags.heartbeatDisabled != v {
t.Fatalf("call %d: didn't store netmap", i)
}
@ -2213,7 +2217,11 @@ func TestIsWireGuardOnlyPeer(t *testing.T) {
},
}),
}
m.conn.SetNetworkMap(nm)
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
cfg, err := nmcfg.WGCfg(nm, t.Logf, netmap.AllowSubnetRoutes, "")
if err != nil {
@ -2275,7 +2283,11 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) {
},
}),
}
m.conn.SetNetworkMap(nm)
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
cfg, err := nmcfg.WGCfg(nm, t.Logf, netmap.AllowSubnetRoutes, "")
if err != nil {
@ -2312,7 +2324,11 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) {
// configures WG.
func applyNetworkMap(t *testing.T, m *magicStack, nm *netmap.NetworkMap) {
t.Helper()
m.conn.SetNetworkMap(nm)
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
// Make sure we can't use v6 to avoid test failures.
m.conn.noV6.Store(true)

View File

@ -1300,7 +1300,6 @@ func (e *userspaceEngine) linkChange(delta *netmon.ChangeDelta) {
}
func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) {
e.magicConn.SetNetworkMap(nm)
e.mu.Lock()
e.netMap = nm
e.mu.Unlock()