control/controlclient, types/netmap: start plumbing delta netmap updates

Currently only the top four most popular changes: endpoints, DERP
home, online, and LastSeen.

Updates #1909

Change-Id: I03152da176b2b95232b56acabfb55dcdfaa16b79
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2023-09-01 19:28:00 -07:00
committed by Brad Fitzpatrick
parent c0ade132e6
commit 3af051ea27
14 changed files with 715 additions and 13 deletions

View File

@@ -432,6 +432,8 @@ type mapRoutineState struct {
bo *backoff.Backoff
}
var _ NetmapDeltaUpdater = mapRoutineState{}
func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) {
c := mrs.c
@@ -453,6 +455,28 @@ func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) {
mrs.bo.BackOff(ctx, nil)
}
func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool {
c := mrs.c
c.mu.Lock()
goodState := c.loggedIn && c.inMapPoll
ndu, canDelta := c.observer.(NetmapDeltaUpdater)
c.mu.Unlock()
if !goodState || !canDelta {
return false
}
ctx, cancel := context.WithTimeout(c.mapCtx, 2*time.Second)
defer cancel()
var ok bool
err := c.observerQueue.RunSync(ctx, func() {
ok = ndu.UpdateNetmapDelta(muts)
})
return err == nil && ok
}
// mapRoutine is responsible for keeping a read-only streaming connection to the
// control server, and keeping the netmap up to date.
func (c *Auto) mapRoutine() {
@@ -752,6 +776,27 @@ func (q *execQueue) Add(f func()) {
}
}
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *execQueue) RunSync(ctx context.Context, f func()) error {
for {
if err := q.wait(ctx); err != nil {
return err
}
q.mu.Lock()
if q.inFlight {
q.mu.Unlock()
continue
}
defer q.mu.Unlock()
if q.closed {
return errors.New("closed")
}
f()
return nil
}
}
func (q *execQueue) run(f func()) {
f()

View File

@@ -194,6 +194,19 @@ type NetmapUpdater interface {
// the diff themselves between the previous full & next full network maps.
}
// NetmapDeltaUpdater is an optional interface that can be implemented by
// NetmapUpdater implementations to receive delta updates from the controlclient
// rather than just full updates.
type NetmapDeltaUpdater interface {
// UpdateNetmapDelta is called with discrete changes to the network map.
//
// The ok result is whether the implementation was able to apply the
// mutations. It might return false if its internal state doesn't
// support applying them or a NetmapUpdater it's wrapping doesn't
// implement the NetmapDeltaUpdater optional method.
UpdateNetmapDelta([]netmap.NodeMutation) (ok bool)
}
// NewDirect returns a new Direct client.
func NewDirect(opts Options) (*Direct, error) {
if opts.ServerURL == "" {
@@ -1301,6 +1314,7 @@ func (ms *mapSession) setControlKnobsFromNodeAttrs(selfNodeAttrs []string) {
disableDRPO bool
disableUPnP bool
randomizeClientPort bool
disableDeltaUpdates bool
oneCGNAT opt.Bool
forceBackgroundSTUN bool
)
@@ -1320,6 +1334,8 @@ func (ms *mapSession) setControlKnobsFromNodeAttrs(selfNodeAttrs []string) {
oneCGNAT.Set(false)
case tailcfg.NodeAttrDebugForceBackgroundSTUN:
forceBackgroundSTUN = true
case tailcfg.NodeAttrDisableDeltaUpdates:
disableDeltaUpdates = true
}
}
k.KeepFullWGConfig.Store(keepFullWG)
@@ -1328,6 +1344,7 @@ func (ms *mapSession) setControlKnobsFromNodeAttrs(selfNodeAttrs []string) {
k.RandomizeClientPort.Store(randomizeClientPort)
k.OneCGNAT.Store(oneCGNAT)
k.ForceBackgroundSTUN.Store(forceBackgroundSTUN)
k.DisableDeltaUpdates.Store(disableDeltaUpdates)
}
// ipForwardingBroken reports whether the system's IP forwarding is disabled

View File

@@ -14,6 +14,7 @@ import (
"sort"
"strconv"
"sync"
"time"
"tailscale.com/control/controlknobs"
"tailscale.com/envknob"
@@ -39,7 +40,7 @@ import (
// one MapRequest).
type mapSession struct {
// Immutable fields.
nu NetmapUpdater // called on changes (in addition to the optional hooks below)
netmapUpdater NetmapUpdater // called on changes (in addition to the optional hooks below)
controlKnobs *controlknobs.Knobs // or nil
privateNodeKey key.NodePrivate
publicNodeKey key.NodePublic
@@ -98,7 +99,7 @@ type mapSession struct {
// It must have its Close method called to release resources.
func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnobs *controlknobs.Knobs) *mapSession {
ms := &mapSession{
nu: nu,
netmapUpdater: nu,
controlKnobs: controlKnobs,
privateNodeKey: privateNodeKey,
publicNodeKey: privateNodeKey.Public(),
@@ -197,8 +198,16 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t
ms.updateStateFromResponse(resp)
nm := ms.netmap()
if ms.tryHandleIncrementally(resp) {
ms.onConciseNetMapSummary(ms.lastNetmapSummary) // every 5s log
return nil
}
// We have to rebuild the whole netmap (lots of garbage & work downstream of
// our UpdateFullNetmap call). This is the part we tried to avoid but
// some field mutations (especially rare ones) aren't yet handled.
nm := ms.netmap()
ms.lastNetmapSummary = nm.VeryConcise()
ms.onConciseNetMapSummary(ms.lastNetmapSummary)
@@ -207,10 +216,25 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t
ms.onSelfNodeChanged(nm)
}
ms.nu.UpdateFullNetmap(nm)
ms.netmapUpdater.UpdateFullNetmap(nm)
return nil
}
func (ms *mapSession) tryHandleIncrementally(res *tailcfg.MapResponse) bool {
if ms.controlKnobs != nil && ms.controlKnobs.DisableDeltaUpdates.Load() {
return false
}
nud, ok := ms.netmapUpdater.(NetmapDeltaUpdater)
if !ok {
return false
}
mutations, ok := netmap.MutationsFromMapResponse(res, time.Now())
if ok && len(mutations) > 0 {
return nud.UpdateNetmapDelta(mutations)
}
return ok
}
// updateStats are some stats from updateStateFromResponse, primarily for
// testing. It's meant to be cheap enough to always compute, though. It doesn't
// allocate.

View File

@@ -38,6 +38,11 @@ type Knobs struct {
// ForceBackgroundSTUN forces netcheck STUN queries to keep
// running in magicsock, even when idle.
ForceBackgroundSTUN atomic.Bool
// DisableDeltaUpdates is whether the node should not process
// incremental (delta) netmap updates and should treat all netmap
// changes as "full" ones as tailscaled did in 1.48.x and earlier.
DisableDeltaUpdates atomic.Bool
}
// AsDebugJSON returns k as something that can be marshalled with json.Marshal
@@ -53,5 +58,6 @@ func (k *Knobs) AsDebugJSON() map[string]any {
"RandomizeClientPort": k.RandomizeClientPort.Load(),
"OneCGNAT": k.OneCGNAT.Load(),
"ForceBackgroundSTUN": k.ForceBackgroundSTUN.Load(),
"DisableDeltaUpdates": k.DisableDeltaUpdates.Load(),
}
}

View File

@@ -0,0 +1,21 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package controlknobs
import (
"reflect"
"testing"
)
func TestAsDebugJSON(t *testing.T) {
var nilPtr *Knobs
if got := nilPtr.AsDebugJSON(); got != nil {
t.Errorf("AsDebugJSON(nil) = %v; want nil", got)
}
k := new(Knobs)
got := k.AsDebugJSON()
if want := reflect.TypeOf(Knobs{}).NumField(); len(got) != want {
t.Errorf("AsDebugJSON map has %d fields; want %v", len(got), want)
}
}