diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 7154b942c..aa172f93d 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -203,6 +203,8 @@ type LocalBackend struct { keyLogf logger.Logf // for printing list of peers on change statsLogf logger.Logf // for printing peers stats on change sys *tsd.System + eventbus *eventbus.Bus + eventClient *eventbus.Client health *health.Tracker // always non-nil metrics metrics e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys @@ -428,6 +430,8 @@ type LocalBackend struct { // // See tailscale/corp#29969. overrideExitNodePolicy bool + + magicSockConfChangeSub *eventbus.Subscriber[magicsock.ConfigurationChanged] } // HealthTracker returns the health tracker for the backend. @@ -533,7 +537,12 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), } - nb := newNodeBackend(ctx, b.sys.Bus.Get()) + if bus, ok := sys.Bus.GetOK(); ok { + b.eventbus = bus + b.eventClient = bus.Client("ipnlocal.LocalBackend") + b.magicSockConfChangeSub = eventbus.Subscribe[magicsock.ConfigurationChanged](b.eventClient) + } + nb := newNodeBackend(ctx, b.eventbus) b.currentNodeAtomic.Store(nb) nb.ready() @@ -606,6 +615,21 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo return b, nil } +func (b *LocalBackend) consumeEventbusTopics() { + for { + select { + case <-b.ctx.Done(): + b.magicSockConfChangeSub.Close() + return + case <-b.magicSockConfChangeSub.Events(): + if b.ctx.Err() != nil { + return + } + go b.authReconfig() + } + } +} + func (b *LocalBackend) Clock() tstime.Clock { return b.clock } func (b *LocalBackend) Sys() *tsd.System { return b.sys } @@ -621,10 +645,10 @@ func (b *LocalBackend) currentNode() *nodeBackend { // Auto-init [nodeBackend] in tests for LocalBackend created without the // NewLocalBackend() constructor. Same reasoning for checking b.sys. var bus *eventbus.Bus - if b.sys == nil { + if b.eventbus == nil { bus = eventbus.New() } else { - bus = b.sys.Bus.Get() + bus = b.eventbus } v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus) if b.currentNodeAtomic.CompareAndSwap(nil, v) { @@ -1765,9 +1789,6 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control b.setAuthURL(st.URL) } b.stateMachine() - // This is currently (2020-07-28) necessary; conditionally disabling it is fragile! - // This is where netmap information gets propagated to router and magicsock. - b.authReconfig() } type preferencePolicyInfo struct { @@ -2283,6 +2304,7 @@ func (b *LocalBackend) getNewControlClientFuncLocked() clientGen { // initOnce is called on the first call to [LocalBackend.Start]. func (b *LocalBackend) initOnce() { + go b.consumeEventbusTopics() b.extHost.Init() } @@ -4443,7 +4465,6 @@ func (b *LocalBackend) changeDisablesExitNodeLocked(prefs ipn.PrefsView, change // but wasn't empty before, then the change disables // exit node usage. return tmpPrefs.ExitNodeID == "" - } // adjustEditPrefsLocked applies additional changes to mp if necessary, @@ -5108,11 +5129,6 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() { // updates are not currently blocked, based on the cached netmap and // user prefs. func (b *LocalBackend) authReconfig() { - // Wait for magicsock to process pending [eventbus] events, - // such as netmap updates. This should be completed before - // wireguard-go is reconfigured. See tailscale/tailscale#16369. - b.MagicConn().Synchronize() - b.mu.Lock() blocked := b.blocked prefs := b.pm.CurrentPrefs() @@ -7358,7 +7374,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err // down, so no need to do any work. return nil } - newNode := newNodeBackend(b.ctx, b.sys.Bus.Get()) + newNode := newNodeBackend(b.ctx, b.eventbus) if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil { oldNode.shutdown(errNodeContextChanged) } @@ -7697,10 +7713,8 @@ var ( // allowedAutoRoute determines if the route being added via AdvertiseRoute (the app connector featuge) should be allowed. func allowedAutoRoute(ipp netip.Prefix) bool { // Note: blocking the addrs for globals, not solely the prefixes. - for _, addr := range disallowedAddrs { - if ipp.Addr() == addr { - return false - } + if slices.Contains(disallowedAddrs, ipp.Addr()) { + return false } for _, pfx := range disallowedRanges { if pfx.Overlaps(ipp) { @@ -8134,7 +8148,6 @@ func isAllowedAutoExitNodeID(exitNodeID tailcfg.StableNodeID) bool { } if nodes, _ := syspolicy.GetStringArray(syspolicy.AllowedSuggestedExitNodes, nil); nodes != nil { return slices.Contains(nodes, string(exitNodeID)) - } return true // no policy configured; allow all exit nodes } @@ -8278,9 +8291,7 @@ func (b *LocalBackend) vipServicesFromPrefsLocked(prefs ipn.PrefsView) []*tailcf return servicesList } -var ( - metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus") -) +var metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus") func (b *LocalBackend) stateEncrypted() opt.Bool { switch runtime.GOOS { diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index fb7f5edcb..90720374a 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -14,6 +14,7 @@ import ( "expvar" "fmt" "io" + "maps" "net" "net/netip" "reflect" @@ -179,11 +180,11 @@ type Conn struct { filterSub *eventbus.Subscriber[FilterUpdate] nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate] nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate] - syncSub *eventbus.Subscriber[syncPoint] - syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp] + configChangedPub *eventbus.Publisher[ConfigurationChanged] subsDoneCh chan struct{} // closed when consumeEventbusTopics returns + netInfoPub *eventbus.Publisher[tailcfg.NetInfo] // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -423,6 +424,8 @@ type Conn struct { // metrics contains the metrics for the magicsock instance. metrics *metrics + + hasReconfigured chan any } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -562,20 +565,7 @@ type FilterUpdate struct { *filter.Filter } -// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize]. -// It serves as a synchronization point, allowing to wait until magicsock -// has processed all pending events. -type syncPoint chan struct{} - -// Wait blocks until [syncPoint.Signal] is called. -func (s syncPoint) Wait() { - <-s -} - -// Signal signals the sync point, unblocking the [syncPoint.Wait] call. -func (s syncPoint) Signal() { - close(s) -} +type ConfigurationChanged struct{} // UDPRelayAllocReq represents a [*disco.AllocateUDPRelayEndpointRequest] // reception event. This is signaled over an [eventbus.Bus] from @@ -612,15 +602,16 @@ type UDPRelayAllocResp struct { func newConn(logf logger.Logf) *Conn { discoPrivate := key.NewDisco() c := &Conn{ - logf: logf, - derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736 - derpStarted: make(chan struct{}), - peerLastDerp: make(map[key.NodePublic]int), - peerMap: newPeerMap(), - discoInfo: make(map[key.DiscoPublic]*discoInfo), - discoPrivate: discoPrivate, - discoPublic: discoPrivate.Public(), - cloudInfo: newCloudInfo(logf), + logf: logf, + derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736 + derpStarted: make(chan struct{}), + peerLastDerp: make(map[key.NodePublic]int), + peerMap: newPeerMap(), + discoInfo: make(map[key.DiscoPublic]*discoInfo), + discoPrivate: discoPrivate, + discoPublic: discoPrivate.Public(), + cloudInfo: newCloudInfo(logf), + hasReconfigured: make(chan any, 25), } c.discoShort = c.discoPublic.ShortString() c.bind = &connBind{Conn: c, closed: true} @@ -658,15 +649,22 @@ func (c *Conn) consumeEventbusTopics() { c.onPortMapChanged() case filterUpdate := <-c.filterSub.Events(): c.onFilterUpdate(filterUpdate) + c.hasReconfigured <- new(any) case nodeViews := <-c.nodeViewsSub.Events(): c.onNodeViewsUpdate(nodeViews) + c.hasReconfigured <- new(any) case nodeMuts := <-c.nodeMutsSub.Events(): c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-c.syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() case allocResp := <-c.allocRelayEndpointSub.Events(): c.onUDPRelayAllocResp(allocResp) + c.hasReconfigured <- new(any) + case <-c.hasReconfigured: + c.dlogf("magicsock: configuration has changed") + // Drain channel as we only want to reconfigure once + for len(c.hasReconfigured) > 0 { + <-c.hasReconfigured + } + c.configChangedPub.Publish(ConfigurationChanged{}) } } } @@ -699,18 +697,6 @@ func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) { go c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, disco.key, allocResp.Message, discoVerboseLog) } -// Synchronize waits for all [eventbus] events published -// prior to this call to be processed by the receiver. -func (c *Conn) Synchronize() { - if c.syncPub == nil { - // Eventbus is not used; no need to synchronize (in certain tests). - return - } - sp := syncPoint(make(chan struct{})) - c.syncPub.Publish(sp) - sp.Wait() -} - // NewConn creates a magic Conn listening on opts.Port. // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. @@ -738,10 +724,10 @@ func NewConn(opts Options) (*Conn, error) { c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) - c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) - c.syncPub = eventbus.Publish[syncPoint](c.eventClient) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) + c.netInfoPub = eventbus.Publish[tailcfg.NetInfo](c.eventClient) + c.configChangedPub = eventbus.Publish[ConfigurationChanged](c.eventClient) c.subsDoneCh = make(chan struct{}) go c.consumeEventbusTopics() } @@ -1121,12 +1107,21 @@ func (c *Conn) callNetInfoCallback(ni *tailcfg.NetInfo) { func (c *Conn) callNetInfoCallbackLocked(ni *tailcfg.NetInfo) { c.netInfoLast = ni + c.publishNetInfo(ni) if c.netInfoFunc != nil { c.dlogf("[v1] magicsock: netInfo update: %+v", ni) go c.netInfoFunc(ni) } } +func (c *Conn) publishNetInfo(ni *tailcfg.NetInfo) { + if c.netInfoPub != nil { + newNetInfo := *ni + newNetInfo.DERPLatency = maps.Clone(ni.DERPLatency) + c.netInfoPub.Publish(newNetInfo) + } +} + // addValidDiscoPathForTest makes addr a validated disco address for // discoKey. It's used in tests to enable receiving of packets from // addr without having to spin up the entire active discovery @@ -4097,9 +4092,11 @@ type lazyEndpoint struct { src epAddr } -var _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil) -var _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil) -var _ conn.Endpoint = (*lazyEndpoint)(nil) +var ( + _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil) + _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil) + _ conn.Endpoint = (*lazyEndpoint)(nil) +) // InitiationMessagePublicKey implements [conn.InitiationAwareEndpoint]. // wireguard-go calls us here if we passed it a [*lazyEndpoint] for an